001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.BufferedReader; 020import java.io.File; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InputStreamReader; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.net.UnknownHostException; 027import java.security.Provider; 028import java.security.Security; 029import java.util.ArrayList; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Locale; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.CopyOnWriteArrayList; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.RejectedExecutionException; 042import java.util.concurrent.RejectedExecutionHandler; 043import java.util.concurrent.SynchronousQueue; 044import java.util.concurrent.ThreadFactory; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050 051import javax.annotation.PostConstruct; 052import javax.annotation.PreDestroy; 053import javax.management.MalformedObjectNameException; 054import javax.management.ObjectName; 055 056import org.apache.activemq.ActiveMQConnectionMetaData; 057import org.apache.activemq.ConfigurationException; 058import org.apache.activemq.Service; 059import org.apache.activemq.advisory.AdvisoryBroker; 060import org.apache.activemq.broker.cluster.ConnectionSplitBroker; 061import org.apache.activemq.broker.jmx.AnnotatedMBean; 062import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 063import org.apache.activemq.broker.jmx.BrokerView; 064import org.apache.activemq.broker.jmx.ConnectorView; 065import org.apache.activemq.broker.jmx.ConnectorViewMBean; 066import org.apache.activemq.broker.jmx.HealthView; 067import org.apache.activemq.broker.jmx.HealthViewMBean; 068import org.apache.activemq.broker.jmx.JmsConnectorView; 069import org.apache.activemq.broker.jmx.JobSchedulerView; 070import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; 071import org.apache.activemq.broker.jmx.Log4JConfigView; 072import org.apache.activemq.broker.jmx.ManagedRegionBroker; 073import org.apache.activemq.broker.jmx.ManagementContext; 074import org.apache.activemq.broker.jmx.NetworkConnectorView; 075import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; 076import org.apache.activemq.broker.jmx.ProxyConnectorView; 077import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 078import org.apache.activemq.broker.region.Destination; 079import org.apache.activemq.broker.region.DestinationFactory; 080import org.apache.activemq.broker.region.DestinationFactoryImpl; 081import org.apache.activemq.broker.region.DestinationInterceptor; 082import org.apache.activemq.broker.region.RegionBroker; 083import org.apache.activemq.broker.region.policy.PolicyMap; 084import org.apache.activemq.broker.region.virtual.MirroredQueue; 085import org.apache.activemq.broker.region.virtual.VirtualDestination; 086import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 087import org.apache.activemq.broker.region.virtual.VirtualTopic; 088import org.apache.activemq.broker.scheduler.JobSchedulerStore; 089import org.apache.activemq.broker.scheduler.SchedulerBroker; 090import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore; 091import org.apache.activemq.command.ActiveMQDestination; 092import org.apache.activemq.command.ActiveMQQueue; 093import org.apache.activemq.command.BrokerId; 094import org.apache.activemq.command.ProducerInfo; 095import org.apache.activemq.filter.DestinationFilter; 096import org.apache.activemq.network.ConnectionFilter; 097import org.apache.activemq.network.DiscoveryNetworkConnector; 098import org.apache.activemq.network.NetworkConnector; 099import org.apache.activemq.network.jms.JmsConnector; 100import org.apache.activemq.openwire.OpenWireFormat; 101import org.apache.activemq.proxy.ProxyConnector; 102import org.apache.activemq.security.MessageAuthorizationPolicy; 103import org.apache.activemq.selector.SelectorParser; 104import org.apache.activemq.store.JournaledStore; 105import org.apache.activemq.store.PListStore; 106import org.apache.activemq.store.PersistenceAdapter; 107import org.apache.activemq.store.PersistenceAdapterFactory; 108import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 109import org.apache.activemq.thread.Scheduler; 110import org.apache.activemq.thread.TaskRunnerFactory; 111import org.apache.activemq.transport.TransportFactorySupport; 112import org.apache.activemq.transport.TransportServer; 113import org.apache.activemq.transport.vm.VMTransportFactory; 114import org.apache.activemq.usage.StoreUsage; 115import org.apache.activemq.usage.SystemUsage; 116import org.apache.activemq.usage.Usage; 117import org.apache.activemq.util.BrokerSupport; 118import org.apache.activemq.util.DefaultIOExceptionHandler; 119import org.apache.activemq.util.IOExceptionHandler; 120import org.apache.activemq.util.IOExceptionSupport; 121import org.apache.activemq.util.IOHelper; 122import org.apache.activemq.util.InetAddressUtil; 123import org.apache.activemq.util.ServiceStopper; 124import org.apache.activemq.util.StoreUtil; 125import org.apache.activemq.util.ThreadPoolUtils; 126import org.apache.activemq.util.TimeUtils; 127import org.apache.activemq.util.URISupport; 128import org.slf4j.Logger; 129import org.slf4j.LoggerFactory; 130import org.slf4j.MDC; 131 132/** 133 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a 134 * number of transport connectors, network connectors and a bunch of properties 135 * which can be used to configure the broker as its lazily created. 136 * 137 * @org.apache.xbean.XBean 138 */ 139public class BrokerService implements Service { 140 public static final String DEFAULT_PORT = "61616"; 141 public static final String LOCAL_HOST_NAME; 142 public static final String BROKER_VERSION; 143 public static final String DEFAULT_BROKER_NAME = "localhost"; 144 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 145 public static final long DEFAULT_START_TIMEOUT = 600000L; 146 147 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); 148 149 @SuppressWarnings("unused") 150 private static final long serialVersionUID = 7353129142305630237L; 151 152 private boolean useJmx = true; 153 private boolean enableStatistics = true; 154 private boolean persistent = true; 155 private boolean populateJMSXUserID; 156 private boolean useAuthenticatedPrincipalForJMSXUserID; 157 private boolean populateUserNameInMBeans; 158 private long mbeanInvocationTimeout = 0; 159 160 private boolean useShutdownHook = true; 161 private boolean useLoggingForShutdownErrors; 162 private boolean shutdownOnMasterFailure; 163 private boolean shutdownOnSlaveFailure; 164 private boolean waitForSlave; 165 private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT; 166 private boolean passiveSlave; 167 private String brokerName = DEFAULT_BROKER_NAME; 168 private File dataDirectoryFile; 169 private File tmpDataDirectory; 170 private Broker broker; 171 private BrokerView adminView; 172 private ManagementContext managementContext; 173 private ObjectName brokerObjectName; 174 private TaskRunnerFactory taskRunnerFactory; 175 private TaskRunnerFactory persistenceTaskRunnerFactory; 176 private SystemUsage systemUsage; 177 private SystemUsage producerSystemUsage; 178 private SystemUsage consumerSystemUsaage; 179 private PersistenceAdapter persistenceAdapter; 180 private PersistenceAdapterFactory persistenceFactory; 181 protected DestinationFactory destinationFactory; 182 private MessageAuthorizationPolicy messageAuthorizationPolicy; 183 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>(); 184 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); 185 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>(); 186 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>(); 187 private final List<Service> services = new ArrayList<Service>(); 188 private transient Thread shutdownHook; 189 private String[] transportConnectorURIs; 190 private String[] networkConnectorURIs; 191 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges 192 // to other jms messaging systems 193 private boolean deleteAllMessagesOnStartup; 194 private boolean advisorySupport = true; 195 private URI vmConnectorURI; 196 private String defaultSocketURIString; 197 private PolicyMap destinationPolicy; 198 private final AtomicBoolean started = new AtomicBoolean(false); 199 private final AtomicBoolean stopped = new AtomicBoolean(false); 200 private final AtomicBoolean stopping = new AtomicBoolean(false); 201 private BrokerPlugin[] plugins; 202 private boolean keepDurableSubsActive = true; 203 private boolean useVirtualTopics = true; 204 private boolean useMirroredQueues = false; 205 private boolean useTempMirroredQueues = true; 206 /** 207 * Whether or not virtual destination subscriptions should cause network demand 208 */ 209 private boolean useVirtualDestSubs = false; 210 /** 211 * Whether or not the creation of destinations that match virtual destinations 212 * should cause network demand 213 */ 214 private boolean useVirtualDestSubsOnCreation = false; 215 private BrokerId brokerId; 216 private volatile DestinationInterceptor[] destinationInterceptors; 217 private ActiveMQDestination[] destinations; 218 private PListStore tempDataStore; 219 private int persistenceThreadPriority = Thread.MAX_PRIORITY; 220 private boolean useLocalHostBrokerName; 221 private final CountDownLatch stoppedLatch = new CountDownLatch(1); 222 private final CountDownLatch startedLatch = new CountDownLatch(1); 223 private Broker regionBroker; 224 private int producerSystemUsagePortion = 60; 225 private int consumerSystemUsagePortion = 40; 226 private boolean splitSystemUsageForProducersConsumers; 227 private boolean monitorConnectionSplits = false; 228 private int taskRunnerPriority = Thread.NORM_PRIORITY; 229 private boolean dedicatedTaskRunner; 230 private boolean cacheTempDestinations = false;// useful for failover 231 private int timeBeforePurgeTempDestinations = 5000; 232 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>(); 233 private boolean systemExitOnShutdown; 234 private int systemExitOnShutdownExitCode; 235 private SslContext sslContext; 236 private boolean forceStart = false; 237 private IOExceptionHandler ioExceptionHandler; 238 private boolean schedulerSupport = false; 239 private File schedulerDirectoryFile; 240 private Scheduler scheduler; 241 private ThreadPoolExecutor executor; 242 private int schedulePeriodForDestinationPurge= 0; 243 private int maxPurgedDestinationsPerSweep = 0; 244 private int schedulePeriodForDiskUsageCheck = 0; 245 private int diskUsageCheckRegrowThreshold = -1; 246 private boolean adjustUsageLimits = true; 247 private BrokerContext brokerContext; 248 private boolean networkConnectorStartAsync = false; 249 private boolean allowTempAutoCreationOnSend; 250 private JobSchedulerStore jobSchedulerStore; 251 private final AtomicLong totalConnections = new AtomicLong(); 252 private final AtomicInteger currentConnections = new AtomicInteger(); 253 254 private long offlineDurableSubscriberTimeout = -1; 255 private long offlineDurableSubscriberTaskSchedule = 300000; 256 private DestinationFilter virtualConsumerDestinationFilter; 257 258 private final AtomicBoolean persistenceAdapterStarted = new AtomicBoolean(false); 259 private Throwable startException = null; 260 private boolean startAsync = false; 261 private Date startDate; 262 private boolean slave = true; 263 264 private boolean restartAllowed = true; 265 private boolean restartRequested = false; 266 private boolean rejectDurableConsumers = false; 267 private boolean rollbackOnlyOnAsyncException = true; 268 269 private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; 270 271 static { 272 273 try { 274 ClassLoader loader = BrokerService.class.getClassLoader(); 275 Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider"); 276 Provider bouncycastle = (Provider) clazz.newInstance(); 277 Security.insertProviderAt(bouncycastle, 278 Integer.getInteger("org.apache.activemq.broker.BouncyCastlePosition", 2)); 279 LOG.info("Loaded the Bouncy Castle security provider."); 280 } catch(Throwable e) { 281 // No BouncyCastle found so we use the default Java Security Provider 282 } 283 284 String localHostName = "localhost"; 285 try { 286 localHostName = InetAddressUtil.getLocalHostName(); 287 } catch (UnknownHostException e) { 288 LOG.error("Failed to resolve localhost"); 289 } 290 LOCAL_HOST_NAME = localHostName; 291 292 String version = null; 293 try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) { 294 if (in != null) { 295 try(InputStreamReader isr = new InputStreamReader(in); 296 BufferedReader reader = new BufferedReader(isr)) { 297 version = reader.readLine(); 298 } 299 } 300 } catch (IOException ie) { 301 LOG.warn("Error reading broker version ", ie); 302 } 303 BROKER_VERSION = version; 304 } 305 306 @Override 307 public String toString() { 308 return "BrokerService[" + getBrokerName() + "]"; 309 } 310 311 private String getBrokerVersion() { 312 String version = ActiveMQConnectionMetaData.PROVIDER_VERSION; 313 if (version == null) { 314 version = BROKER_VERSION; 315 } 316 317 return version; 318 } 319 320 /** 321 * Adds a new transport connector for the given bind address 322 * 323 * @return the newly created and added transport connector 324 * @throws Exception 325 */ 326 public TransportConnector addConnector(String bindAddress) throws Exception { 327 return addConnector(new URI(bindAddress)); 328 } 329 330 /** 331 * Adds a new transport connector for the given bind address 332 * 333 * @return the newly created and added transport connector 334 * @throws Exception 335 */ 336 public TransportConnector addConnector(URI bindAddress) throws Exception { 337 return addConnector(createTransportConnector(bindAddress)); 338 } 339 340 /** 341 * Adds a new transport connector for the given TransportServer transport 342 * 343 * @return the newly created and added transport connector 344 * @throws Exception 345 */ 346 public TransportConnector addConnector(TransportServer transport) throws Exception { 347 return addConnector(new TransportConnector(transport)); 348 } 349 350 /** 351 * Adds a new transport connector 352 * 353 * @return the transport connector 354 * @throws Exception 355 */ 356 public TransportConnector addConnector(TransportConnector connector) throws Exception { 357 transportConnectors.add(connector); 358 return connector; 359 } 360 361 /** 362 * Stops and removes a transport connector from the broker. 363 * 364 * @param connector 365 * @return true if the connector has been previously added to the broker 366 * @throws Exception 367 */ 368 public boolean removeConnector(TransportConnector connector) throws Exception { 369 boolean rc = transportConnectors.remove(connector); 370 if (rc) { 371 unregisterConnectorMBean(connector); 372 } 373 return rc; 374 } 375 376 /** 377 * Adds a new network connector using the given discovery address 378 * 379 * @return the newly created and added network connector 380 * @throws Exception 381 */ 382 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { 383 return addNetworkConnector(new URI(discoveryAddress)); 384 } 385 386 /** 387 * Adds a new proxy connector using the given bind address 388 * 389 * @return the newly created and added network connector 390 * @throws Exception 391 */ 392 public ProxyConnector addProxyConnector(String bindAddress) throws Exception { 393 return addProxyConnector(new URI(bindAddress)); 394 } 395 396 /** 397 * Adds a new network connector using the given discovery address 398 * 399 * @return the newly created and added network connector 400 * @throws Exception 401 */ 402 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { 403 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); 404 return addNetworkConnector(connector); 405 } 406 407 /** 408 * Adds a new proxy connector using the given bind address 409 * 410 * @return the newly created and added network connector 411 * @throws Exception 412 */ 413 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { 414 ProxyConnector connector = new ProxyConnector(); 415 connector.setBind(bindAddress); 416 connector.setRemote(new URI("fanout:multicast://default")); 417 return addProxyConnector(connector); 418 } 419 420 /** 421 * Adds a new network connector to connect this broker to a federated 422 * network 423 */ 424 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { 425 connector.setBrokerService(this); 426 connector.setLocalUri(getVmConnectorURI()); 427 // Set a connection filter so that the connector does not establish loop 428 // back connections. 429 connector.setConnectionFilter(new ConnectionFilter() { 430 @Override 431 public boolean connectTo(URI location) { 432 List<TransportConnector> transportConnectors = getTransportConnectors(); 433 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 434 try { 435 TransportConnector tc = iter.next(); 436 if (location.equals(tc.getConnectUri())) { 437 return false; 438 } 439 } catch (Throwable e) { 440 } 441 } 442 return true; 443 } 444 }); 445 networkConnectors.add(connector); 446 return connector; 447 } 448 449 /** 450 * Removes the given network connector without stopping it. The caller 451 * should call {@link NetworkConnector#stop()} to close the connector 452 */ 453 public boolean removeNetworkConnector(NetworkConnector connector) { 454 boolean answer = networkConnectors.remove(connector); 455 if (answer) { 456 unregisterNetworkConnectorMBean(connector); 457 } 458 return answer; 459 } 460 461 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { 462 URI uri = getVmConnectorURI(); 463 connector.setLocalUri(uri); 464 proxyConnectors.add(connector); 465 if (isUseJmx()) { 466 registerProxyConnectorMBean(connector); 467 } 468 return connector; 469 } 470 471 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { 472 connector.setBrokerService(this); 473 jmsConnectors.add(connector); 474 if (isUseJmx()) { 475 registerJmsConnectorMBean(connector); 476 } 477 return connector; 478 } 479 480 public JmsConnector removeJmsConnector(JmsConnector connector) { 481 if (jmsConnectors.remove(connector)) { 482 return connector; 483 } 484 return null; 485 } 486 487 public void masterFailed() { 488 if (shutdownOnMasterFailure) { 489 LOG.error("The Master has failed ... shutting down"); 490 try { 491 stop(); 492 } catch (Exception e) { 493 LOG.error("Failed to stop for master failure", e); 494 } 495 } else { 496 LOG.warn("Master Failed - starting all connectors"); 497 try { 498 startAllConnectors(); 499 broker.nowMasterBroker(); 500 } catch (Exception e) { 501 LOG.error("Failed to startAllConnectors", e); 502 } 503 } 504 } 505 506 public String getUptime() { 507 long delta = getUptimeMillis(); 508 509 if (delta == 0) { 510 return "not started"; 511 } 512 513 return TimeUtils.printDuration(delta); 514 } 515 516 public long getUptimeMillis() { 517 if (startDate == null) { 518 return 0; 519 } 520 521 return new Date().getTime() - startDate.getTime(); 522 } 523 524 public boolean isStarted() { 525 return started.get() && startedLatch.getCount() == 0; 526 } 527 528 /** 529 * Forces a start of the broker. 530 * By default a BrokerService instance that was 531 * previously stopped using BrokerService.stop() cannot be restarted 532 * using BrokerService.start(). 533 * This method enforces a restart. 534 * It is not recommended to force a restart of the broker and will not work 535 * for most but some very trivial broker configurations. 536 * For restarting a broker instance we recommend to first call stop() on 537 * the old instance and then recreate a new BrokerService instance. 538 * 539 * @param force - if true enforces a restart. 540 * @throws Exception 541 */ 542 public void start(boolean force) throws Exception { 543 forceStart = force; 544 stopped.set(false); 545 started.set(false); 546 start(); 547 } 548 549 // Service interface 550 // ------------------------------------------------------------------------- 551 552 protected boolean shouldAutostart() { 553 return true; 554 } 555 556 /** 557 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 558 * 559 * delegates to autoStart, done to prevent backwards incompatible signature change 560 */ 561 @PostConstruct 562 private void postConstruct() { 563 try { 564 autoStart(); 565 } catch (Exception ex) { 566 throw new RuntimeException(ex); 567 } 568 } 569 570 /** 571 * 572 * @throws Exception 573 * @org. apache.xbean.InitMethod 574 */ 575 public void autoStart() throws Exception { 576 if(shouldAutostart()) { 577 start(); 578 } 579 } 580 581 @Override 582 public void start() throws Exception { 583 if (stopped.get() || !started.compareAndSet(false, true)) { 584 // lets just ignore redundant start() calls 585 // as its way too easy to not be completely sure if start() has been 586 // called or not with the gazillion of different configuration 587 // mechanisms 588 // throw new IllegalStateException("Already started."); 589 return; 590 } 591 592 setStartException(null); 593 stopping.set(false); 594 startDate = new Date(); 595 MDC.put("activemq.broker", brokerName); 596 597 try { 598 checkMemorySystemUsageLimits(); 599 if (systemExitOnShutdown && useShutdownHook) { 600 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); 601 } 602 processHelperProperties(); 603 if (isUseJmx()) { 604 // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and 605 // we cannot cleanup clear that during shutdown of the broker. 606 MDC.remove("activemq.broker"); 607 try { 608 startManagementContext(); 609 for (NetworkConnector connector : getNetworkConnectors()) { 610 registerNetworkConnectorMBean(connector); 611 } 612 } finally { 613 MDC.put("activemq.broker", brokerName); 614 } 615 } 616 617 // in jvm master slave, lets not publish over existing broker till we get the lock 618 final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance(); 619 if (brokerRegistry.lookup(getBrokerName()) == null) { 620 brokerRegistry.bind(getBrokerName(), BrokerService.this); 621 } 622 startPersistenceAdapter(startAsync); 623 startBroker(startAsync); 624 brokerRegistry.bind(getBrokerName(), BrokerService.this); 625 } catch (Exception e) { 626 LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e); 627 try { 628 if (!stopped.get()) { 629 stop(); 630 } 631 } catch (Exception ex) { 632 LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex); 633 } 634 throw e; 635 } finally { 636 MDC.remove("activemq.broker"); 637 } 638 } 639 640 private void startPersistenceAdapter(boolean async) throws Exception { 641 if (async) { 642 new Thread("Persistence Adapter Starting Thread") { 643 @Override 644 public void run() { 645 try { 646 doStartPersistenceAdapter(); 647 } catch (Throwable e) { 648 setStartException(e); 649 } finally { 650 synchronized (persistenceAdapterStarted) { 651 persistenceAdapterStarted.set(true); 652 persistenceAdapterStarted.notifyAll(); 653 } 654 } 655 } 656 }.start(); 657 } else { 658 doStartPersistenceAdapter(); 659 } 660 } 661 662 private void doStartPersistenceAdapter() throws Exception { 663 PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter(); 664 if (persistenceAdapterToStart == null) { 665 checkStartException(); 666 throw new ConfigurationException("Cannot start null persistence adapter"); 667 } 668 persistenceAdapterToStart.setUsageManager(getProducerSystemUsage()); 669 persistenceAdapterToStart.setBrokerName(getBrokerName()); 670 LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart); 671 if (deleteAllMessagesOnStartup) { 672 deleteAllMessages(); 673 } 674 persistenceAdapterToStart.start(); 675 676 getTempDataStore(); 677 if (tempDataStore != null) { 678 try { 679 // start after we have the store lock 680 tempDataStore.start(); 681 } catch (Exception e) { 682 RuntimeException exception = new RuntimeException( 683 "Failed to start temp data store: " + tempDataStore, e); 684 LOG.error(exception.getLocalizedMessage(), e); 685 throw exception; 686 } 687 } 688 689 getJobSchedulerStore(); 690 if (jobSchedulerStore != null) { 691 try { 692 jobSchedulerStore.start(); 693 } catch (Exception e) { 694 RuntimeException exception = new RuntimeException( 695 "Failed to start job scheduler store: " + jobSchedulerStore, e); 696 LOG.error(exception.getLocalizedMessage(), e); 697 throw exception; 698 } 699 } 700 } 701 702 private void startBroker(boolean async) throws Exception { 703 if (async) { 704 new Thread("Broker Starting Thread") { 705 @Override 706 public void run() { 707 try { 708 synchronized (persistenceAdapterStarted) { 709 if (!persistenceAdapterStarted.get()) { 710 persistenceAdapterStarted.wait(); 711 } 712 } 713 doStartBroker(); 714 } catch (Throwable t) { 715 setStartException(t); 716 } 717 } 718 }.start(); 719 } else { 720 doStartBroker(); 721 } 722 } 723 724 private void doStartBroker() throws Exception { 725 checkStartException(); 726 startDestinations(); 727 addShutdownHook(); 728 729 broker = getBroker(); 730 brokerId = broker.getBrokerId(); 731 732 // need to log this after creating the broker so we have its id and name 733 LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId }); 734 broker.start(); 735 736 if (isUseJmx()) { 737 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { 738 // try to restart management context 739 // typical for slaves that use the same ports as master 740 managementContext.stop(); 741 startManagementContext(); 742 } 743 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; 744 managedBroker.setContextBroker(broker); 745 adminView.setBroker(managedBroker); 746 } 747 748 if (ioExceptionHandler == null) { 749 setIoExceptionHandler(new DefaultIOExceptionHandler()); 750 } 751 752 if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) { 753 ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString()); 754 Log4JConfigView log4jConfigView = new Log4JConfigView(); 755 AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName); 756 } 757 758 startAllConnectors(); 759 760 LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 761 LOG.info("For help or more information please see: http://activemq.apache.org"); 762 763 getBroker().brokerServiceStarted(); 764 checkStoreSystemUsageLimits(); 765 startedLatch.countDown(); 766 getBroker().nowMasterBroker(); 767 } 768 769 /** 770 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 771 * 772 * delegates to stop, done to prevent backwards incompatible signature change 773 */ 774 @PreDestroy 775 private void preDestroy () { 776 try { 777 stop(); 778 } catch (Exception ex) { 779 throw new RuntimeException(); 780 } 781 } 782 783 /** 784 * 785 * @throws Exception 786 * @org.apache .xbean.DestroyMethod 787 */ 788 @Override 789 public void stop() throws Exception { 790 if (!stopping.compareAndSet(false, true)) { 791 LOG.trace("Broker already stopping/stopped"); 792 return; 793 } 794 795 setStartException(new BrokerStoppedException("Stop invoked")); 796 MDC.put("activemq.broker", brokerName); 797 798 if (systemExitOnShutdown) { 799 new Thread() { 800 @Override 801 public void run() { 802 System.exit(systemExitOnShutdownExitCode); 803 } 804 }.start(); 805 } 806 807 LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} ); 808 809 removeShutdownHook(); 810 if (this.scheduler != null) { 811 this.scheduler.stop(); 812 this.scheduler = null; 813 } 814 ServiceStopper stopper = new ServiceStopper(); 815 if (services != null) { 816 for (Service service : services) { 817 stopper.stop(service); 818 } 819 } 820 stopAllConnectors(stopper); 821 this.slave = true; 822 // remove any VMTransports connected 823 // this has to be done after services are stopped, 824 // to avoid timing issue with discovery (spinning up a new instance) 825 BrokerRegistry.getInstance().unbind(getBrokerName()); 826 VMTransportFactory.stopped(getBrokerName()); 827 if (broker != null) { 828 stopper.stop(broker); 829 broker = null; 830 } 831 832 if (jobSchedulerStore != null) { 833 jobSchedulerStore.stop(); 834 jobSchedulerStore = null; 835 } 836 if (tempDataStore != null) { 837 tempDataStore.stop(); 838 tempDataStore = null; 839 } 840 try { 841 stopper.stop(getPersistenceAdapter()); 842 persistenceAdapter = null; 843 if (isUseJmx()) { 844 stopper.stop(managementContext); 845 managementContext = null; 846 } 847 // Clear SelectorParser cache to free memory 848 SelectorParser.clearCache(); 849 } finally { 850 started.set(false); 851 stopped.set(true); 852 stoppedLatch.countDown(); 853 } 854 855 if (this.taskRunnerFactory != null) { 856 this.taskRunnerFactory.shutdown(); 857 this.taskRunnerFactory = null; 858 } 859 if (this.executor != null) { 860 ThreadPoolUtils.shutdownNow(executor); 861 this.executor = null; 862 } 863 864 this.destinationInterceptors = null; 865 this.destinationFactory = null; 866 867 if (startDate != null) { 868 LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()}); 869 } 870 LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 871 872 synchronized (shutdownHooks) { 873 for (Runnable hook : shutdownHooks) { 874 try { 875 hook.run(); 876 } catch (Throwable e) { 877 stopper.onException(hook, e); 878 } 879 } 880 } 881 882 MDC.remove("activemq.broker"); 883 884 // and clear start date 885 startDate = null; 886 887 stopper.throwFirstException(); 888 } 889 890 public boolean checkQueueSize(String queueName) { 891 long count = 0; 892 long queueSize = 0; 893 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); 894 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { 895 if (entry.getKey().isQueue()) { 896 if (entry.getValue().getName().matches(queueName)) { 897 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); 898 count += queueSize; 899 if (queueSize > 0) { 900 LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize); 901 } 902 } 903 } 904 } 905 return count == 0; 906 } 907 908 /** 909 * This method (both connectorName and queueName are using regex to match) 910 * 1. stop the connector (supposed the user input the connector which the 911 * clients connect to) 2. to check whether there is any pending message on 912 * the queues defined by queueName 3. supposedly, after stop the connector, 913 * client should failover to other broker and pending messages should be 914 * forwarded. if no pending messages, the method finally call stop to stop 915 * the broker. 916 * 917 * @param connectorName 918 * @param queueName 919 * @param timeout 920 * @param pollInterval 921 * @throws Exception 922 */ 923 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { 924 if (isUseJmx()) { 925 if (connectorName == null || queueName == null || timeout <= 0) { 926 throw new Exception( 927 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); 928 } 929 if (pollInterval <= 0) { 930 pollInterval = 30; 931 } 932 LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{ 933 connectorName, queueName, timeout, pollInterval 934 }); 935 TransportConnector connector; 936 for (int i = 0; i < transportConnectors.size(); i++) { 937 connector = transportConnectors.get(i); 938 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { 939 connector.stop(); 940 } 941 } 942 long start = System.currentTimeMillis(); 943 while (System.currentTimeMillis() - start < timeout * 1000) { 944 // check quesize until it gets zero 945 if (checkQueueSize(queueName)) { 946 stop(); 947 break; 948 } else { 949 Thread.sleep(pollInterval * 1000); 950 } 951 } 952 if (stopped.get()) { 953 LOG.info("Successfully stop the broker."); 954 } else { 955 LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); 956 } 957 } 958 } 959 960 /** 961 * A helper method to block the caller thread until the broker has been 962 * stopped 963 */ 964 public void waitUntilStopped() { 965 while (isStarted() && !stopped.get()) { 966 try { 967 stoppedLatch.await(); 968 } catch (InterruptedException e) { 969 // ignore 970 } 971 } 972 } 973 974 public boolean isStopped() { 975 return stopped.get(); 976 } 977 978 /** 979 * A helper method to block the caller thread until the broker has fully started 980 * @return boolean true if wait succeeded false if broker was not started or was stopped 981 */ 982 public boolean waitUntilStarted() { 983 return waitUntilStarted(DEFAULT_START_TIMEOUT); 984 } 985 986 /** 987 * A helper method to block the caller thread until the broker has fully started 988 * 989 * @param timeout 990 * the amount of time to wait before giving up and returning false. 991 * 992 * @return boolean true if wait succeeded false if broker was not started or was stopped 993 */ 994 public boolean waitUntilStarted(long timeout) { 995 boolean waitSucceeded = isStarted(); 996 long expiration = Math.max(0, timeout + System.currentTimeMillis()); 997 while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { 998 try { 999 if (getStartException() != null) { 1000 return waitSucceeded; 1001 } 1002 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); 1003 } catch (InterruptedException ignore) { 1004 } 1005 } 1006 return waitSucceeded; 1007 } 1008 1009 // Properties 1010 // ------------------------------------------------------------------------- 1011 /** 1012 * Returns the message broker 1013 */ 1014 public Broker getBroker() throws Exception { 1015 if (broker == null) { 1016 checkStartException(); 1017 broker = createBroker(); 1018 } 1019 return broker; 1020 } 1021 1022 /** 1023 * Returns the administration view of the broker; used to create and destroy 1024 * resources such as queues and topics. Note this method returns null if JMX 1025 * is disabled. 1026 */ 1027 public BrokerView getAdminView() throws Exception { 1028 if (adminView == null) { 1029 // force lazy creation 1030 getBroker(); 1031 } 1032 return adminView; 1033 } 1034 1035 public void setAdminView(BrokerView adminView) { 1036 this.adminView = adminView; 1037 } 1038 1039 public String getBrokerName() { 1040 return brokerName; 1041 } 1042 1043 /** 1044 * Sets the name of this broker; which must be unique in the network 1045 * 1046 * @param brokerName 1047 */ 1048 private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]"; 1049 public void setBrokerName(String brokerName) { 1050 if (brokerName == null) { 1051 throw new NullPointerException("The broker name cannot be null"); 1052 } 1053 String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_"); 1054 if (!str.equals(brokerName)) { 1055 LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str); 1056 } 1057 this.brokerName = str.trim(); 1058 } 1059 1060 public PersistenceAdapterFactory getPersistenceFactory() { 1061 return persistenceFactory; 1062 } 1063 1064 public File getDataDirectoryFile() { 1065 if (dataDirectoryFile == null) { 1066 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); 1067 } 1068 return dataDirectoryFile; 1069 } 1070 1071 public File getBrokerDataDirectory() { 1072 String brokerDir = getBrokerName(); 1073 return new File(getDataDirectoryFile(), brokerDir); 1074 } 1075 1076 /** 1077 * Sets the directory in which the data files will be stored by default for 1078 * the JDBC and Journal persistence adaptors. 1079 * 1080 * @param dataDirectory 1081 * the directory to store data files 1082 */ 1083 public void setDataDirectory(String dataDirectory) { 1084 setDataDirectoryFile(new File(dataDirectory)); 1085 } 1086 1087 /** 1088 * Sets the directory in which the data files will be stored by default for 1089 * the JDBC and Journal persistence adaptors. 1090 * 1091 * @param dataDirectoryFile 1092 * the directory to store data files 1093 */ 1094 public void setDataDirectoryFile(File dataDirectoryFile) { 1095 this.dataDirectoryFile = dataDirectoryFile; 1096 } 1097 1098 /** 1099 * @return the tmpDataDirectory 1100 */ 1101 public File getTmpDataDirectory() { 1102 if (tmpDataDirectory == null) { 1103 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); 1104 } 1105 return tmpDataDirectory; 1106 } 1107 1108 /** 1109 * @param tmpDataDirectory 1110 * the tmpDataDirectory to set 1111 */ 1112 public void setTmpDataDirectory(File tmpDataDirectory) { 1113 this.tmpDataDirectory = tmpDataDirectory; 1114 } 1115 1116 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { 1117 this.persistenceFactory = persistenceFactory; 1118 } 1119 1120 public void setDestinationFactory(DestinationFactory destinationFactory) { 1121 this.destinationFactory = destinationFactory; 1122 } 1123 1124 public boolean isPersistent() { 1125 return persistent; 1126 } 1127 1128 /** 1129 * Sets whether or not persistence is enabled or disabled. 1130 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1131 */ 1132 public void setPersistent(boolean persistent) { 1133 this.persistent = persistent; 1134 } 1135 1136 public boolean isPopulateJMSXUserID() { 1137 return populateJMSXUserID; 1138 } 1139 1140 /** 1141 * Sets whether or not the broker should populate the JMSXUserID header. 1142 */ 1143 public void setPopulateJMSXUserID(boolean populateJMSXUserID) { 1144 this.populateJMSXUserID = populateJMSXUserID; 1145 } 1146 1147 public SystemUsage getSystemUsage() { 1148 try { 1149 if (systemUsage == null) { 1150 1151 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore()); 1152 systemUsage.setExecutor(getExecutor()); 1153 systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB 1154 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1155 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB 1156 systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1157 addService(this.systemUsage); 1158 } 1159 return systemUsage; 1160 } catch (IOException e) { 1161 LOG.error("Cannot create SystemUsage", e); 1162 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e); 1163 } 1164 } 1165 1166 public void setSystemUsage(SystemUsage memoryManager) { 1167 if (this.systemUsage != null) { 1168 removeService(this.systemUsage); 1169 } 1170 this.systemUsage = memoryManager; 1171 if (this.systemUsage.getExecutor()==null) { 1172 this.systemUsage.setExecutor(getExecutor()); 1173 } 1174 addService(this.systemUsage); 1175 } 1176 1177 /** 1178 * @return the consumerUsageManager 1179 * @throws IOException 1180 */ 1181 public SystemUsage getConsumerSystemUsage() throws IOException { 1182 if (this.consumerSystemUsaage == null) { 1183 if (splitSystemUsageForProducersConsumers) { 1184 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); 1185 float portion = consumerSystemUsagePortion / 100f; 1186 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); 1187 addService(this.consumerSystemUsaage); 1188 } else { 1189 consumerSystemUsaage = getSystemUsage(); 1190 } 1191 } 1192 return this.consumerSystemUsaage; 1193 } 1194 1195 /** 1196 * @param consumerSystemUsaage 1197 * the storeSystemUsage to set 1198 */ 1199 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { 1200 if (this.consumerSystemUsaage != null) { 1201 removeService(this.consumerSystemUsaage); 1202 } 1203 this.consumerSystemUsaage = consumerSystemUsaage; 1204 addService(this.consumerSystemUsaage); 1205 } 1206 1207 /** 1208 * @return the producerUsageManager 1209 * @throws IOException 1210 */ 1211 public SystemUsage getProducerSystemUsage() throws IOException { 1212 if (producerSystemUsage == null) { 1213 if (splitSystemUsageForProducersConsumers) { 1214 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); 1215 float portion = producerSystemUsagePortion / 100f; 1216 producerSystemUsage.getMemoryUsage().setUsagePortion(portion); 1217 addService(producerSystemUsage); 1218 } else { 1219 producerSystemUsage = getSystemUsage(); 1220 } 1221 } 1222 return producerSystemUsage; 1223 } 1224 1225 /** 1226 * @param producerUsageManager 1227 * the producerUsageManager to set 1228 */ 1229 public void setProducerSystemUsage(SystemUsage producerUsageManager) { 1230 if (this.producerSystemUsage != null) { 1231 removeService(this.producerSystemUsage); 1232 } 1233 this.producerSystemUsage = producerUsageManager; 1234 addService(this.producerSystemUsage); 1235 } 1236 1237 public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException { 1238 if (persistenceAdapter == null && !hasStartException()) { 1239 persistenceAdapter = createPersistenceAdapter(); 1240 configureService(persistenceAdapter); 1241 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1242 } 1243 return persistenceAdapter; 1244 } 1245 1246 /** 1247 * Sets the persistence adaptor implementation to use for this broker 1248 * 1249 * @throws IOException 1250 */ 1251 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { 1252 if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) { 1253 LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter); 1254 return; 1255 } 1256 this.persistenceAdapter = persistenceAdapter; 1257 configureService(this.persistenceAdapter); 1258 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1259 } 1260 1261 public TaskRunnerFactory getTaskRunnerFactory() { 1262 if (this.taskRunnerFactory == null) { 1263 this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, 1264 isDedicatedTaskRunner()); 1265 this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader()); 1266 } 1267 return this.taskRunnerFactory; 1268 } 1269 1270 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 1271 this.taskRunnerFactory = taskRunnerFactory; 1272 } 1273 1274 public TaskRunnerFactory getPersistenceTaskRunnerFactory() { 1275 if (taskRunnerFactory == null) { 1276 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, 1277 true, 1000, isDedicatedTaskRunner()); 1278 } 1279 return persistenceTaskRunnerFactory; 1280 } 1281 1282 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { 1283 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; 1284 } 1285 1286 public boolean isUseJmx() { 1287 return useJmx; 1288 } 1289 1290 public boolean isEnableStatistics() { 1291 return enableStatistics; 1292 } 1293 1294 /** 1295 * Sets whether or not the Broker's services enable statistics or not. 1296 */ 1297 public void setEnableStatistics(boolean enableStatistics) { 1298 this.enableStatistics = enableStatistics; 1299 } 1300 1301 /** 1302 * Sets whether or not the Broker's services should be exposed into JMX or 1303 * not. 1304 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1305 */ 1306 public void setUseJmx(boolean useJmx) { 1307 this.useJmx = useJmx; 1308 } 1309 1310 public ObjectName getBrokerObjectName() throws MalformedObjectNameException { 1311 if (brokerObjectName == null) { 1312 brokerObjectName = createBrokerObjectName(); 1313 } 1314 return brokerObjectName; 1315 } 1316 1317 /** 1318 * Sets the JMX ObjectName for this broker 1319 */ 1320 public void setBrokerObjectName(ObjectName brokerObjectName) { 1321 this.brokerObjectName = brokerObjectName; 1322 } 1323 1324 public ManagementContext getManagementContext() { 1325 if (managementContext == null) { 1326 checkStartException(); 1327 managementContext = new ManagementContext(); 1328 } 1329 return managementContext; 1330 } 1331 1332 synchronized private void checkStartException() { 1333 if (startException != null) { 1334 throw new BrokerStoppedException(startException); 1335 } 1336 } 1337 1338 synchronized private boolean hasStartException() { 1339 return startException != null; 1340 } 1341 1342 synchronized private void setStartException(Throwable t) { 1343 startException = t; 1344 } 1345 1346 public void setManagementContext(ManagementContext managementContext) { 1347 this.managementContext = managementContext; 1348 } 1349 1350 public NetworkConnector getNetworkConnectorByName(String connectorName) { 1351 for (NetworkConnector connector : networkConnectors) { 1352 if (connector.getName().equals(connectorName)) { 1353 return connector; 1354 } 1355 } 1356 return null; 1357 } 1358 1359 public String[] getNetworkConnectorURIs() { 1360 return networkConnectorURIs; 1361 } 1362 1363 public void setNetworkConnectorURIs(String[] networkConnectorURIs) { 1364 this.networkConnectorURIs = networkConnectorURIs; 1365 } 1366 1367 public TransportConnector getConnectorByName(String connectorName) { 1368 for (TransportConnector connector : transportConnectors) { 1369 if (connector.getName().equals(connectorName)) { 1370 return connector; 1371 } 1372 } 1373 return null; 1374 } 1375 1376 public Map<String, String> getTransportConnectorURIsAsMap() { 1377 Map<String, String> answer = new HashMap<String, String>(); 1378 for (TransportConnector connector : transportConnectors) { 1379 try { 1380 URI uri = connector.getConnectUri(); 1381 if (uri != null) { 1382 String scheme = uri.getScheme(); 1383 if (scheme != null) { 1384 answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString()); 1385 } 1386 } 1387 } catch (Exception e) { 1388 LOG.debug("Failed to read URI to build transportURIsAsMap", e); 1389 } 1390 } 1391 return answer; 1392 } 1393 1394 public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){ 1395 ProducerBrokerExchange result = null; 1396 1397 for (TransportConnector connector : transportConnectors) { 1398 for (TransportConnection tc: connector.getConnections()){ 1399 result = tc.getProducerBrokerExchangeIfExists(producerInfo); 1400 if (result !=null){ 1401 return result; 1402 } 1403 } 1404 } 1405 return result; 1406 } 1407 1408 public String[] getTransportConnectorURIs() { 1409 return transportConnectorURIs; 1410 } 1411 1412 public void setTransportConnectorURIs(String[] transportConnectorURIs) { 1413 this.transportConnectorURIs = transportConnectorURIs; 1414 } 1415 1416 /** 1417 * @return Returns the jmsBridgeConnectors. 1418 */ 1419 public JmsConnector[] getJmsBridgeConnectors() { 1420 return jmsBridgeConnectors; 1421 } 1422 1423 /** 1424 * @param jmsConnectors 1425 * The jmsBridgeConnectors to set. 1426 */ 1427 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { 1428 this.jmsBridgeConnectors = jmsConnectors; 1429 } 1430 1431 public Service[] getServices() { 1432 return services.toArray(new Service[0]); 1433 } 1434 1435 /** 1436 * Sets the services associated with this broker. 1437 */ 1438 public void setServices(Service[] services) { 1439 this.services.clear(); 1440 if (services != null) { 1441 for (int i = 0; i < services.length; i++) { 1442 this.services.add(services[i]); 1443 } 1444 } 1445 } 1446 1447 /** 1448 * Adds a new service so that it will be started as part of the broker 1449 * lifecycle 1450 */ 1451 public void addService(Service service) { 1452 services.add(service); 1453 } 1454 1455 public void removeService(Service service) { 1456 services.remove(service); 1457 } 1458 1459 public boolean isUseLoggingForShutdownErrors() { 1460 return useLoggingForShutdownErrors; 1461 } 1462 1463 /** 1464 * Sets whether or not we should use commons-logging when reporting errors 1465 * when shutting down the broker 1466 */ 1467 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { 1468 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; 1469 } 1470 1471 public boolean isUseShutdownHook() { 1472 return useShutdownHook; 1473 } 1474 1475 /** 1476 * Sets whether or not we should use a shutdown handler to close down the 1477 * broker cleanly if the JVM is terminated. It is recommended you leave this 1478 * enabled. 1479 */ 1480 public void setUseShutdownHook(boolean useShutdownHook) { 1481 this.useShutdownHook = useShutdownHook; 1482 } 1483 1484 public boolean isAdvisorySupport() { 1485 return advisorySupport; 1486 } 1487 1488 /** 1489 * Allows the support of advisory messages to be disabled for performance 1490 * reasons. 1491 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1492 */ 1493 public void setAdvisorySupport(boolean advisorySupport) { 1494 this.advisorySupport = advisorySupport; 1495 } 1496 1497 public List<TransportConnector> getTransportConnectors() { 1498 return new ArrayList<TransportConnector>(transportConnectors); 1499 } 1500 1501 /** 1502 * Sets the transport connectors which this broker will listen on for new 1503 * clients 1504 * 1505 * @org.apache.xbean.Property 1506 * nestedType="org.apache.activemq.broker.TransportConnector" 1507 */ 1508 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { 1509 for (TransportConnector connector : transportConnectors) { 1510 addConnector(connector); 1511 } 1512 } 1513 1514 public TransportConnector getTransportConnectorByName(String name){ 1515 for (TransportConnector transportConnector : transportConnectors){ 1516 if (name.equals(transportConnector.getName())){ 1517 return transportConnector; 1518 } 1519 } 1520 return null; 1521 } 1522 1523 public TransportConnector getTransportConnectorByScheme(String scheme){ 1524 for (TransportConnector transportConnector : transportConnectors){ 1525 if (scheme.equals(transportConnector.getUri().getScheme())){ 1526 return transportConnector; 1527 } 1528 } 1529 return null; 1530 } 1531 1532 public List<NetworkConnector> getNetworkConnectors() { 1533 return new ArrayList<NetworkConnector>(networkConnectors); 1534 } 1535 1536 public List<ProxyConnector> getProxyConnectors() { 1537 return new ArrayList<ProxyConnector>(proxyConnectors); 1538 } 1539 1540 /** 1541 * Sets the network connectors which this broker will use to connect to 1542 * other brokers in a federated network 1543 * 1544 * @org.apache.xbean.Property 1545 * nestedType="org.apache.activemq.network.NetworkConnector" 1546 */ 1547 public void setNetworkConnectors(List<?> networkConnectors) throws Exception { 1548 for (Object connector : networkConnectors) { 1549 addNetworkConnector((NetworkConnector) connector); 1550 } 1551 } 1552 1553 /** 1554 * Sets the network connectors which this broker will use to connect to 1555 * other brokers in a federated network 1556 */ 1557 public void setProxyConnectors(List<?> proxyConnectors) throws Exception { 1558 for (Object connector : proxyConnectors) { 1559 addProxyConnector((ProxyConnector) connector); 1560 } 1561 } 1562 1563 public PolicyMap getDestinationPolicy() { 1564 return destinationPolicy; 1565 } 1566 1567 /** 1568 * Sets the destination specific policies available either for exact 1569 * destinations or for wildcard areas of destinations. 1570 */ 1571 public void setDestinationPolicy(PolicyMap policyMap) { 1572 this.destinationPolicy = policyMap; 1573 } 1574 1575 public BrokerPlugin[] getPlugins() { 1576 return plugins; 1577 } 1578 1579 /** 1580 * Sets a number of broker plugins to install such as for security 1581 * authentication or authorization 1582 */ 1583 public void setPlugins(BrokerPlugin[] plugins) { 1584 this.plugins = plugins; 1585 } 1586 1587 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1588 return messageAuthorizationPolicy; 1589 } 1590 1591 /** 1592 * Sets the policy used to decide if the current connection is authorized to 1593 * consume a given message 1594 */ 1595 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1596 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1597 } 1598 1599 /** 1600 * Delete all messages from the persistent store 1601 * 1602 * @throws IOException 1603 */ 1604 public void deleteAllMessages() throws IOException { 1605 getPersistenceAdapter().deleteAllMessages(); 1606 } 1607 1608 public boolean isDeleteAllMessagesOnStartup() { 1609 return deleteAllMessagesOnStartup; 1610 } 1611 1612 /** 1613 * Sets whether or not all messages are deleted on startup - mostly only 1614 * useful for testing. 1615 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1616 */ 1617 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { 1618 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; 1619 } 1620 1621 public URI getVmConnectorURI() { 1622 if (vmConnectorURI == null) { 1623 try { 1624 vmConnectorURI = new URI("vm://" + getBrokerName()); 1625 } catch (URISyntaxException e) { 1626 LOG.error("Badly formed URI from {}", getBrokerName(), e); 1627 } 1628 } 1629 return vmConnectorURI; 1630 } 1631 1632 public void setVmConnectorURI(URI vmConnectorURI) { 1633 this.vmConnectorURI = vmConnectorURI; 1634 } 1635 1636 public String getDefaultSocketURIString() { 1637 if (started.get()) { 1638 if (this.defaultSocketURIString == null) { 1639 for (TransportConnector tc:this.transportConnectors) { 1640 String result = null; 1641 try { 1642 result = tc.getPublishableConnectString(); 1643 } catch (Exception e) { 1644 LOG.warn("Failed to get the ConnectURI for {}", tc, e); 1645 } 1646 if (result != null) { 1647 // find first publishable uri 1648 if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { 1649 this.defaultSocketURIString = result; 1650 break; 1651 } else { 1652 // or use the first defined 1653 if (this.defaultSocketURIString == null) { 1654 this.defaultSocketURIString = result; 1655 } 1656 } 1657 } 1658 } 1659 1660 } 1661 return this.defaultSocketURIString; 1662 } 1663 return null; 1664 } 1665 1666 /** 1667 * @return Returns the shutdownOnMasterFailure. 1668 */ 1669 public boolean isShutdownOnMasterFailure() { 1670 return shutdownOnMasterFailure; 1671 } 1672 1673 /** 1674 * @param shutdownOnMasterFailure 1675 * The shutdownOnMasterFailure to set. 1676 */ 1677 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { 1678 this.shutdownOnMasterFailure = shutdownOnMasterFailure; 1679 } 1680 1681 public boolean isKeepDurableSubsActive() { 1682 return keepDurableSubsActive; 1683 } 1684 1685 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 1686 this.keepDurableSubsActive = keepDurableSubsActive; 1687 } 1688 1689 public boolean isUseVirtualTopics() { 1690 return useVirtualTopics; 1691 } 1692 1693 /** 1694 * Sets whether or not <a 1695 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 1696 * Topics</a> should be supported by default if they have not been 1697 * explicitly configured. 1698 */ 1699 public void setUseVirtualTopics(boolean useVirtualTopics) { 1700 this.useVirtualTopics = useVirtualTopics; 1701 } 1702 1703 public DestinationInterceptor[] getDestinationInterceptors() { 1704 return destinationInterceptors; 1705 } 1706 1707 public boolean isUseMirroredQueues() { 1708 return useMirroredQueues; 1709 } 1710 1711 /** 1712 * Sets whether or not <a 1713 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored 1714 * Queues</a> should be supported by default if they have not been 1715 * explicitly configured. 1716 */ 1717 public void setUseMirroredQueues(boolean useMirroredQueues) { 1718 this.useMirroredQueues = useMirroredQueues; 1719 } 1720 1721 /** 1722 * Sets the destination interceptors to use 1723 */ 1724 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { 1725 this.destinationInterceptors = destinationInterceptors; 1726 } 1727 1728 public ActiveMQDestination[] getDestinations() { 1729 return destinations; 1730 } 1731 1732 /** 1733 * Sets the destinations which should be loaded/created on startup 1734 */ 1735 public void setDestinations(ActiveMQDestination[] destinations) { 1736 this.destinations = destinations; 1737 } 1738 1739 /** 1740 * @return the tempDataStore 1741 */ 1742 public synchronized PListStore getTempDataStore() { 1743 if (tempDataStore == null) { 1744 if (!isPersistent()) { 1745 return null; 1746 } 1747 1748 try { 1749 PersistenceAdapter pa = getPersistenceAdapter(); 1750 if( pa!=null && pa instanceof PListStore) { 1751 return (PListStore) pa; 1752 } 1753 } catch (IOException e) { 1754 throw new RuntimeException(e); 1755 } 1756 1757 try { 1758 String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl"; 1759 this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance(); 1760 this.tempDataStore.setDirectory(getTmpDataDirectory()); 1761 configureService(tempDataStore); 1762 } catch (Exception e) { 1763 throw new RuntimeException(e); 1764 } 1765 } 1766 return tempDataStore; 1767 } 1768 1769 /** 1770 * @param tempDataStore 1771 * the tempDataStore to set 1772 */ 1773 public void setTempDataStore(PListStore tempDataStore) { 1774 this.tempDataStore = tempDataStore; 1775 configureService(tempDataStore); 1776 } 1777 1778 public int getPersistenceThreadPriority() { 1779 return persistenceThreadPriority; 1780 } 1781 1782 public void setPersistenceThreadPriority(int persistenceThreadPriority) { 1783 this.persistenceThreadPriority = persistenceThreadPriority; 1784 } 1785 1786 /** 1787 * @return the useLocalHostBrokerName 1788 */ 1789 public boolean isUseLocalHostBrokerName() { 1790 return this.useLocalHostBrokerName; 1791 } 1792 1793 /** 1794 * @param useLocalHostBrokerName 1795 * the useLocalHostBrokerName to set 1796 */ 1797 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { 1798 this.useLocalHostBrokerName = useLocalHostBrokerName; 1799 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { 1800 brokerName = LOCAL_HOST_NAME; 1801 } 1802 } 1803 1804 /** 1805 * Looks up and lazily creates if necessary the destination for the given 1806 * JMS name 1807 */ 1808 public Destination getDestination(ActiveMQDestination destination) throws Exception { 1809 return getBroker().addDestination(getAdminConnectionContext(), destination,false); 1810 } 1811 1812 public void removeDestination(ActiveMQDestination destination) throws Exception { 1813 getBroker().removeDestination(getAdminConnectionContext(), destination, 0); 1814 } 1815 1816 public int getProducerSystemUsagePortion() { 1817 return producerSystemUsagePortion; 1818 } 1819 1820 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { 1821 this.producerSystemUsagePortion = producerSystemUsagePortion; 1822 } 1823 1824 public int getConsumerSystemUsagePortion() { 1825 return consumerSystemUsagePortion; 1826 } 1827 1828 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { 1829 this.consumerSystemUsagePortion = consumerSystemUsagePortion; 1830 } 1831 1832 public boolean isSplitSystemUsageForProducersConsumers() { 1833 return splitSystemUsageForProducersConsumers; 1834 } 1835 1836 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { 1837 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; 1838 } 1839 1840 public boolean isMonitorConnectionSplits() { 1841 return monitorConnectionSplits; 1842 } 1843 1844 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { 1845 this.monitorConnectionSplits = monitorConnectionSplits; 1846 } 1847 1848 public int getTaskRunnerPriority() { 1849 return taskRunnerPriority; 1850 } 1851 1852 public void setTaskRunnerPriority(int taskRunnerPriority) { 1853 this.taskRunnerPriority = taskRunnerPriority; 1854 } 1855 1856 public boolean isDedicatedTaskRunner() { 1857 return dedicatedTaskRunner; 1858 } 1859 1860 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 1861 this.dedicatedTaskRunner = dedicatedTaskRunner; 1862 } 1863 1864 public boolean isCacheTempDestinations() { 1865 return cacheTempDestinations; 1866 } 1867 1868 public void setCacheTempDestinations(boolean cacheTempDestinations) { 1869 this.cacheTempDestinations = cacheTempDestinations; 1870 } 1871 1872 public int getTimeBeforePurgeTempDestinations() { 1873 return timeBeforePurgeTempDestinations; 1874 } 1875 1876 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { 1877 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; 1878 } 1879 1880 public boolean isUseTempMirroredQueues() { 1881 return useTempMirroredQueues; 1882 } 1883 1884 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { 1885 this.useTempMirroredQueues = useTempMirroredQueues; 1886 } 1887 1888 public synchronized JobSchedulerStore getJobSchedulerStore() { 1889 1890 // If support is off don't allow any scheduler even is user configured their own. 1891 if (!isSchedulerSupport()) { 1892 return null; 1893 } 1894 1895 // If the user configured their own we use it even if persistence is disabled since 1896 // we don't know anything about their implementation. 1897 if (jobSchedulerStore == null) { 1898 1899 if (!isPersistent()) { 1900 this.jobSchedulerStore = new InMemoryJobSchedulerStore(); 1901 configureService(jobSchedulerStore); 1902 return this.jobSchedulerStore; 1903 } 1904 1905 try { 1906 PersistenceAdapter pa = getPersistenceAdapter(); 1907 if (pa != null) { 1908 this.jobSchedulerStore = pa.createJobSchedulerStore(); 1909 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1910 configureService(jobSchedulerStore); 1911 return this.jobSchedulerStore; 1912 } 1913 } catch (IOException e) { 1914 throw new RuntimeException(e); 1915 } catch (UnsupportedOperationException ex) { 1916 // It's ok if the store doesn't implement a scheduler. 1917 } catch (Exception e) { 1918 throw new RuntimeException(e); 1919 } 1920 1921 try { 1922 PersistenceAdapter pa = getPersistenceAdapter(); 1923 if (pa != null && pa instanceof JobSchedulerStore) { 1924 this.jobSchedulerStore = (JobSchedulerStore) pa; 1925 configureService(jobSchedulerStore); 1926 return this.jobSchedulerStore; 1927 } 1928 } catch (IOException e) { 1929 throw new RuntimeException(e); 1930 } 1931 1932 // Load the KahaDB store as a last resort, this only works if KahaDB is 1933 // included at runtime, otherwise this will fail. User should disable 1934 // scheduler support if this fails. 1935 try { 1936 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 1937 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 1938 jobSchedulerStore = adaptor.createJobSchedulerStore(); 1939 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1940 configureService(jobSchedulerStore); 1941 LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile()); 1942 } catch (Exception e) { 1943 throw new RuntimeException(e); 1944 } 1945 } 1946 return jobSchedulerStore; 1947 } 1948 1949 public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) { 1950 this.jobSchedulerStore = jobSchedulerStore; 1951 configureService(jobSchedulerStore); 1952 } 1953 1954 // 1955 // Implementation methods 1956 // ------------------------------------------------------------------------- 1957 /** 1958 * Handles any lazy-creation helper properties which are added to make 1959 * things easier to configure inside environments such as Spring 1960 * 1961 * @throws Exception 1962 */ 1963 protected void processHelperProperties() throws Exception { 1964 if (transportConnectorURIs != null) { 1965 for (int i = 0; i < transportConnectorURIs.length; i++) { 1966 String uri = transportConnectorURIs[i]; 1967 addConnector(uri); 1968 } 1969 } 1970 if (networkConnectorURIs != null) { 1971 for (int i = 0; i < networkConnectorURIs.length; i++) { 1972 String uri = networkConnectorURIs[i]; 1973 addNetworkConnector(uri); 1974 } 1975 } 1976 if (jmsBridgeConnectors != null) { 1977 for (int i = 0; i < jmsBridgeConnectors.length; i++) { 1978 addJmsConnector(jmsBridgeConnectors[i]); 1979 } 1980 } 1981 } 1982 1983 /** 1984 * Check that the store usage limit is not greater than max usable 1985 * space and adjust if it is 1986 */ 1987 protected void checkStoreUsageLimits() throws Exception { 1988 final SystemUsage usage = getSystemUsage(); 1989 1990 if (getPersistenceAdapter() != null) { 1991 PersistenceAdapter adapter = getPersistenceAdapter(); 1992 checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit()); 1993 1994 long maxJournalFileSize = 0; 1995 long storeLimit = usage.getStoreUsage().getLimit(); 1996 1997 if (adapter instanceof JournaledStore) { 1998 maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength(); 1999 } 2000 2001 if (storeLimit > 0 && storeLimit < maxJournalFileSize) { 2002 LOG.error("Store limit is " + storeLimit / (1024 * 1024) + 2003 " mb, whilst the max journal file size for the store is: " + 2004 maxJournalFileSize / (1024 * 1024) + " mb, " + 2005 "the store will not accept any data when used."); 2006 2007 } 2008 } 2009 } 2010 2011 /** 2012 * Check that temporary usage limit is not greater than max usable 2013 * space and adjust if it is 2014 */ 2015 protected void checkTmpStoreUsageLimits() throws Exception { 2016 final SystemUsage usage = getSystemUsage(); 2017 2018 File tmpDir = getTmpDataDirectory(); 2019 2020 if (tmpDir != null) { 2021 checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit()); 2022 2023 if (isPersistent()) { 2024 long maxJournalFileSize; 2025 2026 PListStore store = usage.getTempUsage().getStore(); 2027 if (store != null && store instanceof JournaledStore) { 2028 maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength(); 2029 } else { 2030 maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; 2031 } 2032 long storeLimit = usage.getTempUsage().getLimit(); 2033 2034 if (storeLimit > 0 && storeLimit < maxJournalFileSize) { 2035 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + 2036 " mb, whilst the max journal file size for the temporary store is: " + 2037 maxJournalFileSize / (1024 * 1024) + " mb, " + 2038 "the temp store will not accept any data when used."); 2039 } 2040 } 2041 } 2042 } 2043 2044 protected void checkUsageLimit(File dir, Usage<?> storeUsage, int percentLimit) throws ConfigurationException { 2045 if (dir != null) { 2046 dir = StoreUtil.findParentDirectory(dir); 2047 String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store"; 2048 long storeLimit = storeUsage.getLimit(); 2049 long storeCurrent = storeUsage.getUsage(); 2050 long totalSpace = dir.getTotalSpace(); 2051 long totalUsableSpace = dir.getUsableSpace() + storeCurrent; 2052 //compute byte value of the percent limit 2053 long bytePercentLimit = totalSpace * percentLimit / 100; 2054 int oneMeg = 1024 * 1024; 2055 2056 //Check if the store limit is less than the percent Limit that was set and also 2057 //the usable space...this means we can grow the store larger 2058 //Changes in partition size (total space) as well as changes in usable space should 2059 //be detected here 2060 if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0 2061 && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){ 2062 2063 // set the limit to be bytePercentLimit or usableSpace if 2064 // usableSpace is less than the percentLimit 2065 long newLimit = bytePercentLimit > totalUsableSpace ? totalUsableSpace : bytePercentLimit; 2066 2067 //To prevent changing too often, check threshold 2068 if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) { 2069 LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to " 2070 + percentLimit + "% of the partition size."); 2071 storeUsage.setLimit(newLimit); 2072 LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace 2073 + "% (" + newLimit / oneMeg + " mb) of the partition size."); 2074 } 2075 2076 //check if the limit is too large for the amount of usable space 2077 } else if (storeLimit > totalUsableSpace) { 2078 final String message = storeName + " limit is " + storeLimit / oneMeg 2079 + " mb (current store usage is " + storeCurrent / oneMeg 2080 + " mb). The data directory: " + dir.getAbsolutePath() 2081 + " only has " + totalUsableSpace / oneMeg 2082 + " mb of usable space."; 2083 2084 if (!isAdjustUsageLimits()) { 2085 LOG.error(message); 2086 throw new ConfigurationException(message); 2087 } 2088 2089 if (percentLimit > 0) { 2090 LOG.warn(storeName + " limit has been set to " 2091 + percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)" 2092 + " of the partition size but there is not enough usable space." 2093 + " The current store limit (which may have been adjusted by a" 2094 + " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)" 2095 + " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)" 2096 + " is available - resetting limit"); 2097 } else { 2098 LOG.warn(message + " - resetting to maximum available disk space: " + 2099 totalUsableSpace / oneMeg + " mb"); 2100 } 2101 storeUsage.setLimit(totalUsableSpace); 2102 } 2103 } 2104 } 2105 2106 /** 2107 * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to 2108 * update store and temporary store limits if the amount of available space 2109 * plus current store size is less than the existin configured limit 2110 */ 2111 protected void scheduleDiskUsageLimitsCheck() throws IOException { 2112 if (schedulePeriodForDiskUsageCheck > 0 && 2113 (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) { 2114 Runnable diskLimitCheckTask = new Runnable() { 2115 @Override 2116 public void run() { 2117 try { 2118 checkStoreUsageLimits(); 2119 } catch (Exception e) { 2120 LOG.error("Failed to check persistent disk usage limits", e); 2121 } 2122 2123 try { 2124 checkTmpStoreUsageLimits(); 2125 } catch (Exception e) { 2126 LOG.error("Failed to check temporary store usage limits", e); 2127 } 2128 } 2129 }; 2130 scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck); 2131 } 2132 } 2133 2134 protected void checkMemorySystemUsageLimits() throws Exception { 2135 final SystemUsage usage = getSystemUsage(); 2136 long memLimit = usage.getMemoryUsage().getLimit(); 2137 long jvmLimit = Runtime.getRuntime().maxMemory(); 2138 2139 if (memLimit > jvmLimit) { 2140 final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024) 2141 + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024); 2142 2143 if (adjustUsageLimits) { 2144 usage.getMemoryUsage().setPercentOfJvmHeap(70); 2145 LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); 2146 } else { 2147 LOG.error(message); 2148 throw new ConfigurationException(message); 2149 } 2150 } 2151 } 2152 2153 protected void checkStoreSystemUsageLimits() throws Exception { 2154 final SystemUsage usage = getSystemUsage(); 2155 2156 //Check the persistent store and temp store limits if they exist 2157 //and schedule a periodic check to update disk limits if 2158 //schedulePeriodForDiskLimitCheck is set 2159 checkStoreUsageLimits(); 2160 checkTmpStoreUsageLimits(); 2161 scheduleDiskUsageLimitsCheck(); 2162 2163 if (getJobSchedulerStore() != null) { 2164 JobSchedulerStore scheduler = getJobSchedulerStore(); 2165 File schedulerDir = scheduler.getDirectory(); 2166 if (schedulerDir != null) { 2167 2168 String schedulerDirPath = schedulerDir.getAbsolutePath(); 2169 if (!schedulerDir.isAbsolute()) { 2170 schedulerDir = new File(schedulerDirPath); 2171 } 2172 2173 while (schedulerDir != null && !schedulerDir.isDirectory()) { 2174 schedulerDir = schedulerDir.getParentFile(); 2175 } 2176 long schedulerLimit = usage.getJobSchedulerUsage().getLimit(); 2177 long dirFreeSpace = schedulerDir.getUsableSpace(); 2178 if (schedulerLimit > dirFreeSpace) { 2179 LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) + 2180 " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() + 2181 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " + 2182 dirFreeSpace / (1024 * 1024) + " mb."); 2183 usage.getJobSchedulerUsage().setLimit(dirFreeSpace); 2184 } 2185 } 2186 } 2187 } 2188 2189 public void stopAllConnectors(ServiceStopper stopper) { 2190 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2191 NetworkConnector connector = iter.next(); 2192 unregisterNetworkConnectorMBean(connector); 2193 stopper.stop(connector); 2194 } 2195 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2196 ProxyConnector connector = iter.next(); 2197 stopper.stop(connector); 2198 } 2199 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2200 JmsConnector connector = iter.next(); 2201 stopper.stop(connector); 2202 } 2203 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2204 TransportConnector connector = iter.next(); 2205 try { 2206 unregisterConnectorMBean(connector); 2207 } catch (IOException e) { 2208 } 2209 stopper.stop(connector); 2210 } 2211 } 2212 2213 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { 2214 try { 2215 ObjectName objectName = createConnectorObjectName(connector); 2216 connector = connector.asManagedConnector(getManagementContext(), objectName); 2217 ConnectorViewMBean view = new ConnectorView(connector); 2218 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2219 return connector; 2220 } catch (Throwable e) { 2221 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e); 2222 } 2223 } 2224 2225 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { 2226 if (isUseJmx()) { 2227 try { 2228 ObjectName objectName = createConnectorObjectName(connector); 2229 getManagementContext().unregisterMBean(objectName); 2230 } catch (Throwable e) { 2231 throw IOExceptionSupport.create( 2232 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); 2233 } 2234 } 2235 } 2236 2237 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2238 return adaptor; 2239 } 2240 2241 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2242 if (isUseJmx()) {} 2243 } 2244 2245 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { 2246 return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName()); 2247 } 2248 2249 public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { 2250 NetworkConnectorViewMBean view = new NetworkConnectorView(connector); 2251 try { 2252 ObjectName objectName = createNetworkConnectorObjectName(connector); 2253 connector.setObjectName(objectName); 2254 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2255 } catch (Throwable e) { 2256 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); 2257 } 2258 } 2259 2260 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { 2261 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); 2262 } 2263 2264 public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException { 2265 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport); 2266 } 2267 2268 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { 2269 if (isUseJmx()) { 2270 try { 2271 ObjectName objectName = createNetworkConnectorObjectName(connector); 2272 getManagementContext().unregisterMBean(objectName); 2273 } catch (Exception e) { 2274 LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e); 2275 } 2276 } 2277 } 2278 2279 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { 2280 ProxyConnectorView view = new ProxyConnectorView(connector); 2281 try { 2282 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName()); 2283 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2284 } catch (Throwable e) { 2285 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2286 } 2287 } 2288 2289 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { 2290 JmsConnectorView view = new JmsConnectorView(connector); 2291 try { 2292 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName()); 2293 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2294 } catch (Throwable e) { 2295 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2296 } 2297 } 2298 2299 /** 2300 * Factory method to create a new broker 2301 * 2302 * @throws Exception 2303 */ 2304 protected Broker createBroker() throws Exception { 2305 regionBroker = createRegionBroker(); 2306 Broker broker = addInterceptors(regionBroker); 2307 // Add a filter that will stop access to the broker once stopped 2308 broker = new MutableBrokerFilter(broker) { 2309 Broker old; 2310 2311 @Override 2312 public void stop() throws Exception { 2313 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { 2314 // Just ignore additional stop actions. 2315 @Override 2316 public void stop() throws Exception { 2317 } 2318 }); 2319 old.stop(); 2320 } 2321 2322 @Override 2323 public void start() throws Exception { 2324 if (forceStart && old != null) { 2325 this.next.set(old); 2326 } 2327 getNext().start(); 2328 } 2329 }; 2330 return broker; 2331 } 2332 2333 /** 2334 * Factory method to create the core region broker onto which interceptors 2335 * are added 2336 * 2337 * @throws Exception 2338 */ 2339 protected Broker createRegionBroker() throws Exception { 2340 if (destinationInterceptors == null) { 2341 destinationInterceptors = createDefaultDestinationInterceptor(); 2342 } 2343 configureServices(destinationInterceptors); 2344 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); 2345 if (destinationFactory == null) { 2346 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); 2347 } 2348 return createRegionBroker(destinationInterceptor); 2349 } 2350 2351 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { 2352 RegionBroker regionBroker; 2353 if (isUseJmx()) { 2354 try { 2355 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), 2356 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); 2357 } catch(MalformedObjectNameException me){ 2358 LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me); 2359 throw new IOException(me); 2360 } 2361 } else { 2362 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, 2363 destinationInterceptor,getScheduler(),getExecutor()); 2364 } 2365 destinationFactory.setRegionBroker(regionBroker); 2366 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); 2367 regionBroker.setBrokerName(getBrokerName()); 2368 regionBroker.getDestinationStatistics().setEnabled(enableStatistics); 2369 regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend()); 2370 if (brokerId != null) { 2371 regionBroker.setBrokerId(brokerId); 2372 } 2373 return regionBroker; 2374 } 2375 2376 /** 2377 * Create the default destination interceptor 2378 */ 2379 protected DestinationInterceptor[] createDefaultDestinationInterceptor() { 2380 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>(); 2381 if (isUseVirtualTopics()) { 2382 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); 2383 VirtualTopic virtualTopic = new VirtualTopic(); 2384 virtualTopic.setName("VirtualTopic.>"); 2385 VirtualDestination[] virtualDestinations = { virtualTopic }; 2386 interceptor.setVirtualDestinations(virtualDestinations); 2387 answer.add(interceptor); 2388 } 2389 if (isUseMirroredQueues()) { 2390 MirroredQueue interceptor = new MirroredQueue(); 2391 answer.add(interceptor); 2392 } 2393 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; 2394 answer.toArray(array); 2395 return array; 2396 } 2397 2398 /** 2399 * Strategy method to add interceptors to the broker 2400 * 2401 * @throws IOException 2402 */ 2403 protected Broker addInterceptors(Broker broker) throws Exception { 2404 if (isSchedulerSupport()) { 2405 SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); 2406 if (isUseJmx()) { 2407 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); 2408 try { 2409 ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName()); 2410 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2411 this.adminView.setJMSJobScheduler(objectName); 2412 } catch (Throwable e) { 2413 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " 2414 + e.getMessage(), e); 2415 } 2416 } 2417 broker = sb; 2418 } 2419 if (isUseJmx()) { 2420 HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker()); 2421 try { 2422 ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName()); 2423 AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName); 2424 } catch (Throwable e) { 2425 throw IOExceptionSupport.create("Status MBean could not be registered in JMX: " 2426 + e.getMessage(), e); 2427 } 2428 } 2429 if (isAdvisorySupport()) { 2430 broker = new AdvisoryBroker(broker); 2431 } 2432 broker = new CompositeDestinationBroker(broker); 2433 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); 2434 if (isPopulateJMSXUserID()) { 2435 UserIDBroker userIDBroker = new UserIDBroker(broker); 2436 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); 2437 broker = userIDBroker; 2438 } 2439 if (isMonitorConnectionSplits()) { 2440 broker = new ConnectionSplitBroker(broker); 2441 } 2442 if (plugins != null) { 2443 for (int i = 0; i < plugins.length; i++) { 2444 BrokerPlugin plugin = plugins[i]; 2445 broker = plugin.installPlugin(broker); 2446 } 2447 } 2448 return broker; 2449 } 2450 2451 protected PersistenceAdapter createPersistenceAdapter() throws IOException { 2452 if (isPersistent()) { 2453 PersistenceAdapterFactory fac = getPersistenceFactory(); 2454 if (fac != null) { 2455 return fac.createPersistenceAdapter(); 2456 } else { 2457 try { 2458 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 2459 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 2460 File dir = new File(getBrokerDataDirectory(),"KahaDB"); 2461 adaptor.setDirectory(dir); 2462 return adaptor; 2463 } catch (Throwable e) { 2464 throw IOExceptionSupport.create(e); 2465 } 2466 } 2467 } else { 2468 return new MemoryPersistenceAdapter(); 2469 } 2470 } 2471 2472 protected ObjectName createBrokerObjectName() throws MalformedObjectNameException { 2473 return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName()); 2474 } 2475 2476 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { 2477 TransportServer transport = TransportFactorySupport.bind(this, brokerURI); 2478 return new TransportConnector(transport); 2479 } 2480 2481 /** 2482 * Extracts the port from the options 2483 */ 2484 protected Object getPort(Map<?,?> options) { 2485 Object port = options.get("port"); 2486 if (port == null) { 2487 port = DEFAULT_PORT; 2488 LOG.warn("No port specified so defaulting to: {}", port); 2489 } 2490 return port; 2491 } 2492 2493 protected void addShutdownHook() { 2494 if (useShutdownHook) { 2495 shutdownHook = new Thread("ActiveMQ ShutdownHook") { 2496 @Override 2497 public void run() { 2498 containerShutdown(); 2499 } 2500 }; 2501 Runtime.getRuntime().addShutdownHook(shutdownHook); 2502 } 2503 } 2504 2505 protected void removeShutdownHook() { 2506 if (shutdownHook != null) { 2507 try { 2508 Runtime.getRuntime().removeShutdownHook(shutdownHook); 2509 } catch (Exception e) { 2510 LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e); 2511 } 2512 } 2513 } 2514 2515 /** 2516 * Sets hooks to be executed when broker shut down 2517 * 2518 * @org.apache.xbean.Property 2519 */ 2520 public void setShutdownHooks(List<Runnable> hooks) throws Exception { 2521 for (Runnable hook : hooks) { 2522 addShutdownHook(hook); 2523 } 2524 } 2525 2526 /** 2527 * Causes a clean shutdown of the container when the VM is being shut down 2528 */ 2529 protected void containerShutdown() { 2530 try { 2531 stop(); 2532 } catch (IOException e) { 2533 Throwable linkedException = e.getCause(); 2534 if (linkedException != null) { 2535 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); 2536 } else { 2537 logError("Failed to shut down: " + e, e); 2538 } 2539 if (!useLoggingForShutdownErrors) { 2540 e.printStackTrace(System.err); 2541 } 2542 } catch (Exception e) { 2543 logError("Failed to shut down: " + e, e); 2544 } 2545 } 2546 2547 protected void logError(String message, Throwable e) { 2548 if (useLoggingForShutdownErrors) { 2549 LOG.error("Failed to shut down: " + e); 2550 } else { 2551 System.err.println("Failed to shut down: " + e); 2552 } 2553 } 2554 2555 /** 2556 * Starts any configured destinations on startup 2557 */ 2558 protected void startDestinations() throws Exception { 2559 if (destinations != null) { 2560 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2561 for (int i = 0; i < destinations.length; i++) { 2562 ActiveMQDestination destination = destinations[i]; 2563 getBroker().addDestination(adminConnectionContext, destination,true); 2564 } 2565 } 2566 if (isUseVirtualTopics()) { 2567 startVirtualConsumerDestinations(); 2568 } 2569 } 2570 2571 /** 2572 * Returns the broker's administration connection context used for 2573 * configuring the broker at startup 2574 */ 2575 public ConnectionContext getAdminConnectionContext() throws Exception { 2576 return BrokerSupport.getConnectionContext(getBroker()); 2577 } 2578 2579 protected void startManagementContext() throws Exception { 2580 getManagementContext().setBrokerName(brokerName); 2581 getManagementContext().start(); 2582 adminView = new BrokerView(this, null); 2583 ObjectName objectName = getBrokerObjectName(); 2584 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); 2585 } 2586 2587 /** 2588 * Start all transport and network connections, proxies and bridges 2589 * 2590 * @throws Exception 2591 */ 2592 public void startAllConnectors() throws Exception { 2593 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); 2594 List<TransportConnector> al = new ArrayList<TransportConnector>(); 2595 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2596 TransportConnector connector = iter.next(); 2597 al.add(startTransportConnector(connector)); 2598 } 2599 if (al.size() > 0) { 2600 // let's clear the transportConnectors list and replace it with 2601 // the started transportConnector instances 2602 this.transportConnectors.clear(); 2603 setTransportConnectors(al); 2604 } 2605 this.slave = false; 2606 URI uri = getVmConnectorURI(); 2607 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 2608 map.put("async", "false"); 2609 map.put("create","false"); 2610 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 2611 2612 if (!stopped.get()) { 2613 ThreadPoolExecutor networkConnectorStartExecutor = null; 2614 if (isNetworkConnectorStartAsync()) { 2615 // spin up as many threads as needed 2616 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 2617 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 2618 new ThreadFactory() { 2619 int count=0; 2620 @Override 2621 public Thread newThread(Runnable runnable) { 2622 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); 2623 thread.setDaemon(true); 2624 return thread; 2625 } 2626 }); 2627 } 2628 2629 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2630 final NetworkConnector connector = iter.next(); 2631 connector.setLocalUri(uri); 2632 connector.setBrokerName(getBrokerName()); 2633 connector.setDurableDestinations(durableDestinations); 2634 if (getDefaultSocketURIString() != null) { 2635 connector.setBrokerURL(getDefaultSocketURIString()); 2636 } 2637 if (networkConnectorStartExecutor != null) { 2638 networkConnectorStartExecutor.execute(new Runnable() { 2639 @Override 2640 public void run() { 2641 try { 2642 LOG.info("Async start of {}", connector); 2643 connector.start(); 2644 } catch(Exception e) { 2645 LOG.error("Async start of network connector: {} failed", connector, e); 2646 } 2647 } 2648 }); 2649 } else { 2650 connector.start(); 2651 } 2652 } 2653 if (networkConnectorStartExecutor != null) { 2654 // executor done when enqueued tasks are complete 2655 ThreadPoolUtils.shutdown(networkConnectorStartExecutor); 2656 } 2657 2658 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2659 ProxyConnector connector = iter.next(); 2660 connector.start(); 2661 } 2662 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2663 JmsConnector connector = iter.next(); 2664 connector.start(); 2665 } 2666 for (Service service : services) { 2667 configureService(service); 2668 service.start(); 2669 } 2670 } 2671 } 2672 2673 public TransportConnector startTransportConnector(TransportConnector connector) throws Exception { 2674 connector.setBrokerService(this); 2675 connector.setTaskRunnerFactory(getTaskRunnerFactory()); 2676 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); 2677 if (policy != null) { 2678 connector.setMessageAuthorizationPolicy(policy); 2679 } 2680 if (isUseJmx()) { 2681 connector = registerConnectorMBean(connector); 2682 } 2683 connector.getStatistics().setEnabled(enableStatistics); 2684 connector.start(); 2685 return connector; 2686 } 2687 2688 /** 2689 * Perform any custom dependency injection 2690 */ 2691 protected void configureServices(Object[] services) { 2692 for (Object service : services) { 2693 configureService(service); 2694 } 2695 } 2696 2697 /** 2698 * Perform any custom dependency injection 2699 */ 2700 protected void configureService(Object service) { 2701 if (service instanceof BrokerServiceAware) { 2702 BrokerServiceAware serviceAware = (BrokerServiceAware) service; 2703 serviceAware.setBrokerService(this); 2704 } 2705 } 2706 2707 public void handleIOException(IOException exception) { 2708 if (ioExceptionHandler != null) { 2709 ioExceptionHandler.handle(exception); 2710 } else { 2711 LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception); 2712 } 2713 } 2714 2715 protected void startVirtualConsumerDestinations() throws Exception { 2716 checkStartException(); 2717 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2718 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 2719 DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); 2720 if (!destinations.isEmpty()) { 2721 for (ActiveMQDestination destination : destinations) { 2722 if (filter.matches(destination) == true) { 2723 broker.addDestination(adminConnectionContext, destination, false); 2724 } 2725 } 2726 } 2727 } 2728 2729 private DestinationFilter getVirtualTopicConsumerDestinationFilter() { 2730 // created at startup, so no sync needed 2731 if (virtualConsumerDestinationFilter == null) { 2732 Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>(); 2733 if (destinationInterceptors != null) { 2734 for (DestinationInterceptor interceptor : destinationInterceptors) { 2735 if (interceptor instanceof VirtualDestinationInterceptor) { 2736 VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; 2737 for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { 2738 if (virtualDestination instanceof VirtualTopic) { 2739 consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); 2740 } 2741 if (isUseVirtualDestSubs()) { 2742 try { 2743 broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination); 2744 LOG.debug("Adding virtual destination: {}", virtualDestination); 2745 } catch (Exception e) { 2746 LOG.warn("Could not fire virtual destination consumer advisory", e); 2747 } 2748 } 2749 } 2750 } 2751 } 2752 } 2753 ActiveMQQueue filter = new ActiveMQQueue(); 2754 filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); 2755 virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); 2756 } 2757 return virtualConsumerDestinationFilter; 2758 } 2759 2760 protected synchronized ThreadPoolExecutor getExecutor() { 2761 if (this.executor == null) { 2762 this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 2763 2764 private long i = 0; 2765 2766 @Override 2767 public Thread newThread(Runnable runnable) { 2768 this.i++; 2769 Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i); 2770 thread.setDaemon(true); 2771 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 2772 @Override 2773 public void uncaughtException(final Thread t, final Throwable e) { 2774 LOG.error("Error in thread '{}'", t.getName(), e); 2775 } 2776 }); 2777 return thread; 2778 } 2779 }, new RejectedExecutionHandler() { 2780 @Override 2781 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { 2782 try { 2783 executor.getQueue().offer(r, 60, TimeUnit.SECONDS); 2784 } catch (InterruptedException e) { 2785 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); 2786 } 2787 2788 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); 2789 } 2790 }); 2791 } 2792 return this.executor; 2793 } 2794 2795 public synchronized Scheduler getScheduler() { 2796 if (this.scheduler==null) { 2797 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); 2798 try { 2799 this.scheduler.start(); 2800 } catch (Exception e) { 2801 LOG.error("Failed to start Scheduler", e); 2802 } 2803 } 2804 return this.scheduler; 2805 } 2806 2807 public Broker getRegionBroker() { 2808 return regionBroker; 2809 } 2810 2811 public void setRegionBroker(Broker regionBroker) { 2812 this.regionBroker = regionBroker; 2813 } 2814 2815 public void addShutdownHook(Runnable hook) { 2816 synchronized (shutdownHooks) { 2817 shutdownHooks.add(hook); 2818 } 2819 } 2820 2821 public void removeShutdownHook(Runnable hook) { 2822 synchronized (shutdownHooks) { 2823 shutdownHooks.remove(hook); 2824 } 2825 } 2826 2827 public boolean isSystemExitOnShutdown() { 2828 return systemExitOnShutdown; 2829 } 2830 2831 /** 2832 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2833 */ 2834 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 2835 this.systemExitOnShutdown = systemExitOnShutdown; 2836 } 2837 2838 public int getSystemExitOnShutdownExitCode() { 2839 return systemExitOnShutdownExitCode; 2840 } 2841 2842 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { 2843 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; 2844 } 2845 2846 public SslContext getSslContext() { 2847 return sslContext; 2848 } 2849 2850 public void setSslContext(SslContext sslContext) { 2851 this.sslContext = sslContext; 2852 } 2853 2854 public boolean isShutdownOnSlaveFailure() { 2855 return shutdownOnSlaveFailure; 2856 } 2857 2858 /** 2859 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2860 */ 2861 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { 2862 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; 2863 } 2864 2865 public boolean isWaitForSlave() { 2866 return waitForSlave; 2867 } 2868 2869 /** 2870 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2871 */ 2872 public void setWaitForSlave(boolean waitForSlave) { 2873 this.waitForSlave = waitForSlave; 2874 } 2875 2876 public long getWaitForSlaveTimeout() { 2877 return this.waitForSlaveTimeout; 2878 } 2879 2880 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { 2881 this.waitForSlaveTimeout = waitForSlaveTimeout; 2882 } 2883 2884 /** 2885 * Get the passiveSlave 2886 * @return the passiveSlave 2887 */ 2888 public boolean isPassiveSlave() { 2889 return this.passiveSlave; 2890 } 2891 2892 /** 2893 * Set the passiveSlave 2894 * @param passiveSlave the passiveSlave to set 2895 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2896 */ 2897 public void setPassiveSlave(boolean passiveSlave) { 2898 this.passiveSlave = passiveSlave; 2899 } 2900 2901 /** 2902 * override the Default IOException handler, called when persistence adapter 2903 * has experiences File or JDBC I/O Exceptions 2904 * 2905 * @param ioExceptionHandler 2906 */ 2907 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { 2908 configureService(ioExceptionHandler); 2909 this.ioExceptionHandler = ioExceptionHandler; 2910 } 2911 2912 public IOExceptionHandler getIoExceptionHandler() { 2913 return ioExceptionHandler; 2914 } 2915 2916 /** 2917 * @return the schedulerSupport 2918 */ 2919 public boolean isSchedulerSupport() { 2920 return this.schedulerSupport; 2921 } 2922 2923 /** 2924 * @param schedulerSupport the schedulerSupport to set 2925 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2926 */ 2927 public void setSchedulerSupport(boolean schedulerSupport) { 2928 this.schedulerSupport = schedulerSupport; 2929 } 2930 2931 /** 2932 * @return the schedulerDirectory 2933 */ 2934 public File getSchedulerDirectoryFile() { 2935 if (this.schedulerDirectoryFile == null) { 2936 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler"); 2937 } 2938 return schedulerDirectoryFile; 2939 } 2940 2941 /** 2942 * @param schedulerDirectory the schedulerDirectory to set 2943 */ 2944 public void setSchedulerDirectoryFile(File schedulerDirectory) { 2945 this.schedulerDirectoryFile = schedulerDirectory; 2946 } 2947 2948 public void setSchedulerDirectory(String schedulerDirectory) { 2949 setSchedulerDirectoryFile(new File(schedulerDirectory)); 2950 } 2951 2952 public int getSchedulePeriodForDestinationPurge() { 2953 return this.schedulePeriodForDestinationPurge; 2954 } 2955 2956 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) { 2957 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; 2958 } 2959 2960 /** 2961 * @param schedulePeriodForDiskUsageCheck 2962 */ 2963 public void setSchedulePeriodForDiskUsageCheck( 2964 int schedulePeriodForDiskUsageCheck) { 2965 this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck; 2966 } 2967 2968 public int getDiskUsageCheckRegrowThreshold() { 2969 return diskUsageCheckRegrowThreshold; 2970 } 2971 2972 /** 2973 * @param diskUsageCheckRegrowThreshold 2974 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 2975 */ 2976 public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) { 2977 this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold; 2978 } 2979 2980 public int getMaxPurgedDestinationsPerSweep() { 2981 return this.maxPurgedDestinationsPerSweep; 2982 } 2983 2984 public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) { 2985 this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep; 2986 } 2987 2988 public BrokerContext getBrokerContext() { 2989 return brokerContext; 2990 } 2991 2992 public void setBrokerContext(BrokerContext brokerContext) { 2993 this.brokerContext = brokerContext; 2994 } 2995 2996 public void setBrokerId(String brokerId) { 2997 this.brokerId = new BrokerId(brokerId); 2998 } 2999 3000 public boolean isUseAuthenticatedPrincipalForJMSXUserID() { 3001 return useAuthenticatedPrincipalForJMSXUserID; 3002 } 3003 3004 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) { 3005 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID; 3006 } 3007 3008 /** 3009 * Should MBeans that support showing the Authenticated User Name information have this 3010 * value filled in or not. 3011 * 3012 * @return true if user names should be exposed in MBeans 3013 */ 3014 public boolean isPopulateUserNameInMBeans() { 3015 return this.populateUserNameInMBeans; 3016 } 3017 3018 /** 3019 * Sets whether Authenticated User Name information is shown in MBeans that support this field. 3020 * @param value if MBeans should expose user name information. 3021 */ 3022 public void setPopulateUserNameInMBeans(boolean value) { 3023 this.populateUserNameInMBeans = value; 3024 } 3025 3026 /** 3027 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 3028 * failing. The default value is to wait forever (zero). 3029 * 3030 * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 3031 */ 3032 public long getMbeanInvocationTimeout() { 3033 return mbeanInvocationTimeout; 3034 } 3035 3036 /** 3037 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 3038 * failing. The default value is to wait forever (zero). 3039 * 3040 * @param mbeanInvocationTimeout 3041 * timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 3042 */ 3043 public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) { 3044 this.mbeanInvocationTimeout = mbeanInvocationTimeout; 3045 } 3046 3047 public boolean isNetworkConnectorStartAsync() { 3048 return networkConnectorStartAsync; 3049 } 3050 3051 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { 3052 this.networkConnectorStartAsync = networkConnectorStartAsync; 3053 } 3054 3055 public boolean isAllowTempAutoCreationOnSend() { 3056 return allowTempAutoCreationOnSend; 3057 } 3058 3059 /** 3060 * enable if temp destinations need to be propagated through a network when 3061 * advisorySupport==false. This is used in conjunction with the policy 3062 * gcInactiveDestinations for matching temps so they can get removed 3063 * when inactive 3064 * 3065 * @param allowTempAutoCreationOnSend 3066 */ 3067 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 3068 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 3069 } 3070 3071 public long getOfflineDurableSubscriberTimeout() { 3072 return offlineDurableSubscriberTimeout; 3073 } 3074 3075 public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) { 3076 this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; 3077 } 3078 3079 public long getOfflineDurableSubscriberTaskSchedule() { 3080 return offlineDurableSubscriberTaskSchedule; 3081 } 3082 3083 public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) { 3084 this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; 3085 } 3086 3087 public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { 3088 return isUseVirtualTopics() && destination.isQueue() && 3089 getVirtualTopicConsumerDestinationFilter().matches(destination); 3090 } 3091 3092 synchronized public Throwable getStartException() { 3093 return startException; 3094 } 3095 3096 public boolean isStartAsync() { 3097 return startAsync; 3098 } 3099 3100 public void setStartAsync(boolean startAsync) { 3101 this.startAsync = startAsync; 3102 } 3103 3104 public boolean isSlave() { 3105 return this.slave; 3106 } 3107 3108 public boolean isStopping() { 3109 return this.stopping.get(); 3110 } 3111 3112 /** 3113 * @return true if the broker allowed to restart on shutdown. 3114 */ 3115 public boolean isRestartAllowed() { 3116 return restartAllowed; 3117 } 3118 3119 /** 3120 * Sets if the broker allowed to restart on shutdown. 3121 */ 3122 public void setRestartAllowed(boolean restartAllowed) { 3123 this.restartAllowed = restartAllowed; 3124 } 3125 3126 /** 3127 * A lifecycle manager of the BrokerService should 3128 * inspect this property after a broker shutdown has occurred 3129 * to find out if the broker needs to be re-created and started 3130 * again. 3131 * 3132 * @return true if the broker wants to be restarted after it shuts down. 3133 */ 3134 public boolean isRestartRequested() { 3135 return restartRequested; 3136 } 3137 3138 public void requestRestart() { 3139 this.restartRequested = true; 3140 } 3141 3142 public int getStoreOpenWireVersion() { 3143 return storeOpenWireVersion; 3144 } 3145 3146 public void setStoreOpenWireVersion(int storeOpenWireVersion) { 3147 this.storeOpenWireVersion = storeOpenWireVersion; 3148 } 3149 3150 /** 3151 * @return the current number of connections on this Broker. 3152 */ 3153 public int getCurrentConnections() { 3154 return this.currentConnections.get(); 3155 } 3156 3157 /** 3158 * @return the total number of connections this broker has handled since startup. 3159 */ 3160 public long getTotalConnections() { 3161 return this.totalConnections.get(); 3162 } 3163 3164 public void incrementCurrentConnections() { 3165 this.currentConnections.incrementAndGet(); 3166 } 3167 3168 public void decrementCurrentConnections() { 3169 this.currentConnections.decrementAndGet(); 3170 } 3171 3172 public void incrementTotalConnections() { 3173 this.totalConnections.incrementAndGet(); 3174 } 3175 3176 public boolean isRejectDurableConsumers() { 3177 return rejectDurableConsumers; 3178 } 3179 3180 public void setRejectDurableConsumers(boolean rejectDurableConsumers) { 3181 this.rejectDurableConsumers = rejectDurableConsumers; 3182 } 3183 3184 public boolean isUseVirtualDestSubs() { 3185 return useVirtualDestSubs; 3186 } 3187 3188 public void setUseVirtualDestSubs( 3189 boolean useVirtualDestSubs) { 3190 this.useVirtualDestSubs = useVirtualDestSubs; 3191 } 3192 3193 public boolean isUseVirtualDestSubsOnCreation() { 3194 return useVirtualDestSubsOnCreation; 3195 } 3196 3197 public void setUseVirtualDestSubsOnCreation( 3198 boolean useVirtualDestSubsOnCreation) { 3199 this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation; 3200 } 3201 3202 public boolean isAdjustUsageLimits() { 3203 return adjustUsageLimits; 3204 } 3205 3206 public void setAdjustUsageLimits(boolean adjustUsageLimits) { 3207 this.adjustUsageLimits = adjustUsageLimits; 3208 } 3209 3210 public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) { 3211 this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException; 3212 } 3213 3214 public boolean isRollbackOnlyOnAsyncException() { 3215 return rollbackOnlyOnAsyncException; 3216 } 3217}