001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026
027import org.apache.activemq.broker.BrokerFactory;
028import org.apache.activemq.broker.BrokerFactoryHandler;
029import org.apache.activemq.broker.BrokerRegistry;
030import org.apache.activemq.broker.BrokerService;
031import org.apache.activemq.broker.TransportConnector;
032import org.apache.activemq.transport.MarshallingTransportFilter;
033import org.apache.activemq.transport.Transport;
034import org.apache.activemq.transport.TransportFactory;
035import org.apache.activemq.transport.TransportServer;
036import org.apache.activemq.util.IOExceptionSupport;
037import org.apache.activemq.util.IntrospectionSupport;
038import org.apache.activemq.util.ServiceSupport;
039import org.apache.activemq.util.URISupport;
040import org.apache.activemq.util.URISupport.CompositeData;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043import org.slf4j.MDC;
044
045public class VMTransportFactory extends TransportFactory {
046
047    public static final ConcurrentMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
048    public static final ConcurrentMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
049    public static final ConcurrentMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
050    private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class);
051
052    BrokerFactoryHandler brokerFactoryHandler;
053
054    @Override
055    public Transport doConnect(URI location) throws Exception {
056        return VMTransportServer.configure(doCompositeConnect(location));
057    }
058
059    @Override
060    public Transport doCompositeConnect(URI location) throws Exception {
061        URI brokerURI;
062        String host;
063        Map<String, String> options;
064        boolean create = true;
065        int waitForStart = -1;
066        CompositeData data = URISupport.parseComposite(location);
067        if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
068            brokerURI = data.getComponents()[0];
069            CompositeData brokerData = URISupport.parseComposite(brokerURI);
070            host = brokerData.getParameters().get("brokerName");
071            if (host == null) {
072                host = "localhost";
073            }
074            if (brokerData.getPath() != null) {
075                host = brokerData.getPath();
076            }
077            options = data.getParameters();
078            location = new URI("vm://" + host);
079        } else {
080            // If using the less complex vm://localhost?broker.persistent=true
081            // form
082            try {
083                host = extractHost(location);
084                options = URISupport.parseParameters(location);
085                String config = options.remove("brokerConfig");
086                if (config != null) {
087                    brokerURI = new URI(config);
088                } else {
089                    Map<String, Object> brokerOptions = IntrospectionSupport.extractProperties(options, "broker.");
090                    brokerURI = new URI("broker://()/" + host + "?"
091                                        + URISupport.createQueryString(brokerOptions));
092                }
093                if ("false".equals(options.remove("create"))) {
094                    create = false;
095                }
096                String waitForStartString = options.remove("waitForStart");
097                if (waitForStartString != null) {
098                    waitForStart = Integer.parseInt(waitForStartString);
099                }
100            } catch (URISyntaxException e1) {
101                throw IOExceptionSupport.create(e1);
102            }
103            location = new URI("vm://" + host);
104        }
105        if (host == null) {
106            host = "localhost";
107        }
108        VMTransportServer server = SERVERS.get(host);
109        // validate the broker is still active
110        if (!validateBroker(host) || server == null) {
111            BrokerService broker = null;
112            // Synchronize on the registry so that multiple concurrent threads
113            // doing this do not think that the broker has not been created and
114            // cause multiple brokers to be started.
115            synchronized (BrokerRegistry.getInstance().getRegistryMutext()) {
116                broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart);
117                if (broker == null) {
118                    if (!create) {
119                        throw new IOException("Broker named '" + host + "' does not exist.");
120                    }
121                    try {
122                        if (brokerFactoryHandler != null) {
123                            broker = brokerFactoryHandler.createBroker(brokerURI);
124                        } else {
125                            broker = BrokerFactory.createBroker(brokerURI);
126                        }
127                        broker.start();
128                        MDC.put("activemq.broker", broker.getBrokerName());
129                    } catch (URISyntaxException e) {
130                        throw IOExceptionSupport.create(e);
131                    }
132                    BROKERS.put(host, broker);
133                    BrokerRegistry.getInstance().getRegistryMutext().notifyAll();
134                }
135
136                server = SERVERS.get(host);
137                if (server == null) {
138                    server = (VMTransportServer)bind(location, true);
139                    TransportConnector connector = new TransportConnector(server);
140                    connector.setBrokerService(broker);
141                    connector.setUri(location);
142                    connector.setTaskRunnerFactory(broker.getTaskRunnerFactory());
143                    connector.start();
144                    CONNECTORS.put(host, connector);
145                }
146
147            }
148        }
149
150        VMTransport vmtransport = server.connect();
151        IntrospectionSupport.setProperties(vmtransport.peer, new HashMap<String,String>(options));
152        IntrospectionSupport.setProperties(vmtransport, options);
153        Transport transport = vmtransport;
154        if (vmtransport.isMarshal()) {
155            Map<String, String> optionsCopy = new HashMap<String, String>(options);
156            transport = new MarshallingTransportFilter(transport, createWireFormat(options),
157                                                       createWireFormat(optionsCopy));
158        }
159        if (!options.isEmpty()) {
160            throw new IllegalArgumentException("Invalid connect parameters: " + options);
161        }
162        return transport;
163    }
164
165   private static String extractHost(URI location) {
166       String host = location.getHost();
167       if (host == null || host.length() == 0) {
168           host = location.getAuthority();
169           if (host == null || host.length() == 0) {
170               host = "localhost";
171           }
172       }
173       return host;
174    }
175
176   /**
177    * Attempt to find a Broker instance.
178    *
179    * @param registry
180    *        the registry in which to search for the BrokerService instance.
181    * @param brokerName
182    *        the name of the Broker that should be located.
183    * @param waitForStart
184    *        time in milliseconds to wait for a broker to appear and be started.
185    *
186    * @return a BrokerService instance if one is found, or null.
187    */
188    private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) {
189        BrokerService broker = null;
190        synchronized(registry.getRegistryMutext()) {
191            broker = registry.lookup(brokerName);
192            if (broker == null || waitForStart > 0) {
193                final long expiry = System.currentTimeMillis() + waitForStart;
194                while ((broker == null || !broker.isStarted()) && expiry > System.currentTimeMillis()) {
195                    long timeout = Math.max(0, expiry - System.currentTimeMillis());
196                    if (broker == null) {
197                        try {
198                            LOG.debug("waiting for broker named: " + brokerName + " to enter registry");
199                            registry.getRegistryMutext().wait(timeout);
200                            broker = registry.lookup(brokerName);
201                        } catch (InterruptedException ignored) {
202                        }
203                    }
204                    if (broker != null && !broker.isStarted()) {
205                        LOG.debug("waiting for broker named: " + brokerName + " to start");
206                        timeout = Math.max(0, expiry - System.currentTimeMillis());
207                        // Wait for however long we have left for broker to be started, if
208                        // it doesn't get started we need to clear broker so it doesn't get
209                        // returned.  A null return should throw an exception.
210                        if (!broker.waitUntilStarted(timeout)) {
211                            broker = null;
212                            break;
213                        }
214                    }
215                }
216            }
217        }
218        return broker;
219    }
220
221    @Override
222    public TransportServer doBind(URI location) throws IOException {
223        return bind(location, false);
224    }
225
226    /**
227     * @param location
228     * @return the TransportServer
229     * @throws IOException
230     */
231    private TransportServer bind(URI location, boolean dispose) throws IOException {
232        String host = extractHost(location);
233        LOG.debug("binding to broker: " + host);
234        VMTransportServer server = new VMTransportServer(location, dispose);
235        Object currentBoundValue = SERVERS.get(host);
236        if (currentBoundValue != null) {
237            throw new IOException("VMTransportServer already bound at: " + location);
238        }
239        SERVERS.put(host, server);
240        return server;
241    }
242
243    public static void stopped(VMTransportServer server) {
244        String host = extractHost(server.getBindURI());
245        stopped(host);
246    }
247
248    public static void stopped(String host) {
249        SERVERS.remove(host);
250        TransportConnector connector = CONNECTORS.remove(host);
251        if (connector != null) {
252            LOG.debug("Shutting down VM connectors for broker: " + host);
253            ServiceSupport.dispose(connector);
254            BrokerService broker = BROKERS.remove(host);
255            if (broker != null) {
256                ServiceSupport.dispose(broker);
257            }
258            MDC.remove("activemq.broker");
259        }
260    }
261
262    public BrokerFactoryHandler getBrokerFactoryHandler() {
263        return brokerFactoryHandler;
264    }
265
266    public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) {
267        this.brokerFactoryHandler = brokerFactoryHandler;
268    }
269
270    private boolean validateBroker(String host) {
271        boolean result = true;
272        if (BROKERS.containsKey(host) || SERVERS.containsKey(host) || CONNECTORS.containsKey(host)) {
273            // check the broker is still in the BrokerRegistry
274            TransportConnector connector = CONNECTORS.get(host);
275            if (BrokerRegistry.getInstance().lookup(host) == null
276                || (connector != null && connector.getBroker().isStopped())) {
277                result = false;
278                // clean-up
279                BROKERS.remove(host);
280                SERVERS.remove(host);
281                if (connector != null) {
282                    CONNECTORS.remove(host);
283                    if (connector != null) {
284                        ServiceSupport.dispose(connector);
285                    }
286                }
287            }
288        }
289        return result;
290    }
291}