001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.Future;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import org.apache.activemq.advisory.AdvisorySupport;
032import org.apache.activemq.broker.BrokerService;
033import org.apache.activemq.broker.ConnectionContext;
034import org.apache.activemq.broker.ProducerBrokerExchange;
035import org.apache.activemq.broker.region.policy.DispatchPolicy;
036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
040import org.apache.activemq.broker.util.InsertionCountList;
041import org.apache.activemq.command.ActiveMQDestination;
042import org.apache.activemq.command.ConsumerInfo;
043import org.apache.activemq.command.ExceptionResponse;
044import org.apache.activemq.command.Message;
045import org.apache.activemq.command.MessageAck;
046import org.apache.activemq.command.MessageId;
047import org.apache.activemq.command.ProducerAck;
048import org.apache.activemq.command.ProducerInfo;
049import org.apache.activemq.command.Response;
050import org.apache.activemq.command.SubscriptionInfo;
051import org.apache.activemq.filter.MessageEvaluationContext;
052import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
053import org.apache.activemq.store.MessageRecoveryListener;
054import org.apache.activemq.store.NoLocalSubscriptionAware;
055import org.apache.activemq.store.PersistenceAdapter;
056import org.apache.activemq.store.TopicMessageStore;
057import org.apache.activemq.thread.Task;
058import org.apache.activemq.thread.TaskRunner;
059import org.apache.activemq.thread.TaskRunnerFactory;
060import org.apache.activemq.transaction.Synchronization;
061import org.apache.activemq.util.SubscriptionKey;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * The Topic is a destination that sends a copy of a message to every active
067 * Subscription registered.
068 */
069public class Topic extends BaseDestination implements Task {
070    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
071    private final TopicMessageStore topicStore;
072    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
073    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
074    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
075    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
076    private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
077    private final TaskRunner taskRunner;
078    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
079    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
080        @Override
081        public void run() {
082            try {
083                Topic.this.taskRunner.wakeup();
084            } catch (InterruptedException e) {
085            }
086        };
087    };
088
089    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
090            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
091        super(brokerService, store, destination, parentStats);
092        this.topicStore = store;
093        subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
094        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
095    }
096
097    @Override
098    public void initialize() throws Exception {
099        super.initialize();
100        // set non default subscription recovery policy (override policyEntries)
101        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
102            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
103            setAlwaysRetroactive(true);
104        }
105        if (store != null) {
106            // AMQ-2586: Better to leave this stat at zero than to give the user
107            // misleading metrics.
108            // int messageCount = store.getMessageCount();
109            // destinationStatistics.getMessages().setCount(messageCount);
110            store.start();
111        }
112    }
113
114    @Override
115    public List<Subscription> getConsumers() {
116        synchronized (consumers) {
117            return new ArrayList<Subscription>(consumers);
118        }
119    }
120
121    public boolean lock(MessageReference node, LockOwner sub) {
122        return true;
123    }
124
125    @Override
126    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
127        if (!sub.getConsumerInfo().isDurable()) {
128
129            // Do a retroactive recovery if needed.
130            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
131
132                // synchronize with dispatch method so that no new messages are sent
133                // while we are recovering a subscription to avoid out of order messages.
134                dispatchLock.writeLock().lock();
135                try {
136                    boolean applyRecovery = false;
137                    synchronized (consumers) {
138                        if (!consumers.contains(sub)){
139                            sub.add(context, this);
140                            consumers.add(sub);
141                            applyRecovery=true;
142                            super.addSubscription(context, sub);
143                        }
144                    }
145                    if (applyRecovery){
146                        subscriptionRecoveryPolicy.recover(context, this, sub);
147                    }
148                } finally {
149                    dispatchLock.writeLock().unlock();
150                }
151
152            } else {
153                synchronized (consumers) {
154                    if (!consumers.contains(sub)){
155                        sub.add(context, this);
156                        consumers.add(sub);
157                        super.addSubscription(context, sub);
158                    }
159                }
160            }
161        } else {
162            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
163            super.addSubscription(context, sub);
164            sub.add(context, this);
165            if(dsub.isActive()) {
166                synchronized (consumers) {
167                    boolean hasSubscription = false;
168
169                    if (consumers.size() == 0) {
170                        hasSubscription = false;
171                    } else {
172                        for (Subscription currentSub : consumers) {
173                            if (currentSub.getConsumerInfo().isDurable()) {
174                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
175                                if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
176                                    hasSubscription = true;
177                                    break;
178                                }
179                            }
180                        }
181                    }
182
183                    if (!hasSubscription) {
184                        consumers.add(sub);
185                    }
186                }
187            }
188            durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
189        }
190    }
191
192    @Override
193    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
194        if (!sub.getConsumerInfo().isDurable()) {
195            super.removeSubscription(context, sub, lastDeliveredSequenceId);
196            synchronized (consumers) {
197                consumers.remove(sub);
198            }
199        }
200        sub.remove(context, this);
201    }
202
203    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
204        if (topicStore != null) {
205            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
206            DurableTopicSubscription removed = durableSubscribers.remove(key);
207            if (removed != null) {
208                destinationStatistics.getConsumers().decrement();
209                // deactivate and remove
210                removed.deactivate(false, 0l);
211                consumers.remove(removed);
212            }
213        }
214    }
215
216    private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException {
217        if (hasSelectorChanged(info1, info2)) {
218            return true;
219        }
220
221        return hasNoLocalChanged(info1, info2);
222    }
223
224    private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException {
225        //Not all persistence adapters store the noLocal value for a subscription
226        PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter();
227        if (adapter instanceof NoLocalSubscriptionAware) {
228            if (info1.isNoLocal() ^ info2.isNoLocal()) {
229                return true;
230            }
231        }
232
233        return false;
234    }
235
236    private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) {
237        if (info1.getSelector() != null ^ info2.getSelector() != null) {
238            return true;
239        }
240
241        if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
242            return true;
243        }
244
245        return false;
246    }
247
248    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
249        // synchronize with dispatch method so that no new messages are sent
250        // while we are recovering a subscription to avoid out of order messages.
251        dispatchLock.writeLock().lock();
252        try {
253
254            if (topicStore == null) {
255                return;
256            }
257
258            // Recover the durable subscription.
259            String clientId = subscription.getSubscriptionKey().getClientId();
260            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
261            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
262            if (info != null) {
263                // Check to see if selector changed.
264                if (hasDurableSubChanged(info, subscription.getConsumerInfo())) {
265                    // Need to delete the subscription
266                    topicStore.deleteSubscription(clientId, subscriptionName);
267                    info = null;
268                    // Force a rebuild of the selector chain for the subscription otherwise
269                    // the stored subscription is updated but the selector expression is not
270                    // and the subscription will not behave according to the new configuration.
271                    subscription.setSelector(subscription.getConsumerInfo().getSelector());
272                    synchronized (consumers) {
273                        consumers.remove(subscription);
274                    }
275                } else {
276                    synchronized (consumers) {
277                        if (!consumers.contains(subscription)) {
278                            consumers.add(subscription);
279                        }
280                    }
281                }
282            }
283
284            // Do we need to create the subscription?
285            if (info == null) {
286                info = new SubscriptionInfo();
287                info.setClientId(clientId);
288                info.setSelector(subscription.getConsumerInfo().getSelector());
289                info.setSubscriptionName(subscriptionName);
290                info.setDestination(getActiveMQDestination());
291                info.setNoLocal(subscription.getConsumerInfo().isNoLocal());
292                // This destination is an actual destination id.
293                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
294                // This destination might be a pattern
295                synchronized (consumers) {
296                    consumers.add(subscription);
297                    topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive());
298                }
299            }
300
301            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
302            msgContext.setDestination(destination);
303            if (subscription.isRecoveryRequired()) {
304                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
305                    @Override
306                    public boolean recoverMessage(Message message) throws Exception {
307                        message.setRegionDestination(Topic.this);
308                        try {
309                            msgContext.setMessageReference(message);
310                            if (subscription.matches(message, msgContext)) {
311                                subscription.add(message);
312                            }
313                        } catch (IOException e) {
314                            LOG.error("Failed to recover this message {}", message, e);
315                        }
316                        return true;
317                    }
318
319                    @Override
320                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
321                        throw new RuntimeException("Should not be called.");
322                    }
323
324                    @Override
325                    public boolean hasSpace() {
326                        return true;
327                    }
328
329                    @Override
330                    public boolean isDuplicate(MessageId id) {
331                        return false;
332                    }
333                });
334            }
335        } finally {
336            dispatchLock.writeLock().unlock();
337        }
338    }
339
340    public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
341        synchronized (consumers) {
342            consumers.remove(sub);
343        }
344        sub.remove(context, this, dispatched);
345    }
346
347    public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
348        if (subscription.getConsumerInfo().isRetroactive()) {
349            subscriptionRecoveryPolicy.recover(context, this, subscription);
350        }
351    }
352
353    @Override
354    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
355        final ConnectionContext context = producerExchange.getConnectionContext();
356
357        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
358        producerExchange.incrementSend();
359        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
360                && !context.isInRecoveryMode();
361
362        message.setRegionDestination(this);
363
364        // There is delay between the client sending it and it arriving at the
365        // destination.. it may have expired.
366        if (message.isExpired()) {
367            broker.messageExpired(context, message, null);
368            getDestinationStatistics().getExpired().increment();
369            if (sendProducerAck) {
370                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
371                context.getConnection().dispatchAsync(ack);
372            }
373            return;
374        }
375
376        if (memoryUsage.isFull()) {
377            isFull(context, memoryUsage);
378            fastProducer(context, producerInfo);
379
380            if (isProducerFlowControl() && context.isProducerFlowControl()) {
381
382                if (warnOnProducerFlowControl) {
383                    warnOnProducerFlowControl = false;
384                    LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
385                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
386                }
387
388                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
389                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
390                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
391                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
392                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
393                }
394
395                // We can avoid blocking due to low usage if the producer is sending a sync message or
396                // if it is using a producer window
397                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
398                    synchronized (messagesWaitingForSpace) {
399                        messagesWaitingForSpace.add(new Runnable() {
400                            @Override
401                            public void run() {
402                                try {
403
404                                    // While waiting for space to free up... the
405                                    // message may have expired.
406                                    if (message.isExpired()) {
407                                        broker.messageExpired(context, message, null);
408                                        getDestinationStatistics().getExpired().increment();
409                                    } else {
410                                        doMessageSend(producerExchange, message);
411                                    }
412
413                                    if (sendProducerAck) {
414                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
415                                                .getSize());
416                                        context.getConnection().dispatchAsync(ack);
417                                    } else {
418                                        Response response = new Response();
419                                        response.setCorrelationId(message.getCommandId());
420                                        context.getConnection().dispatchAsync(response);
421                                    }
422
423                                } catch (Exception e) {
424                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
425                                        ExceptionResponse response = new ExceptionResponse(e);
426                                        response.setCorrelationId(message.getCommandId());
427                                        context.getConnection().dispatchAsync(response);
428                                    }
429                                }
430                            }
431                        });
432
433                        registerCallbackForNotFullNotification();
434                        context.setDontSendReponse(true);
435                        return;
436                    }
437
438                } else {
439                    // Producer flow control cannot be used, so we have do the flow control
440                    // at the broker by blocking this thread until there is space available.
441
442                    if (memoryUsage.isFull()) {
443                        if (context.isInTransaction()) {
444
445                            int count = 0;
446                            while (!memoryUsage.waitForSpace(1000)) {
447                                if (context.getStopping().get()) {
448                                    throw new IOException("Connection closed, send aborted.");
449                                }
450                                if (count > 2 && context.isInTransaction()) {
451                                    count = 0;
452                                    int size = context.getTransaction().size();
453                                    LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message);
454                                }
455                                count++;
456                            }
457                        } else {
458                            waitForSpace(
459                                    context,
460                                    producerExchange,
461                                    memoryUsage,
462                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
463                                            + message.getProducerId()
464                                            + ") to prevent flooding "
465                                            + getActiveMQDestination().getQualifiedName()
466                                            + "."
467                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
468                        }
469                    }
470
471                    // The usage manager could have delayed us by the time
472                    // we unblock the message could have expired..
473                    if (message.isExpired()) {
474                        getDestinationStatistics().getExpired().increment();
475                        LOG.debug("Expired message: {}", message);
476                        return;
477                    }
478                }
479            }
480        }
481
482        doMessageSend(producerExchange, message);
483        messageDelivered(context, message);
484        if (sendProducerAck) {
485            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
486            context.getConnection().dispatchAsync(ack);
487        }
488    }
489
490    /**
491     * do send the message - this needs to be synchronized to ensure messages
492     * are stored AND dispatched in the right order
493     *
494     * @param producerExchange
495     * @param message
496     * @throws IOException
497     * @throws Exception
498     */
499    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
500            throws IOException, Exception {
501        final ConnectionContext context = producerExchange.getConnectionContext();
502        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
503        Future<Object> result = null;
504
505        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
506            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
507                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
508                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
509                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
510                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
511                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
512                    throw new javax.jms.ResourceAllocationException(logMessage);
513                }
514
515                waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
516            }
517            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
518
519            //Moved the reduceMemoryfootprint clearing to the dispatch method
520        }
521
522        message.incrementReferenceCount();
523
524        if (context.isInTransaction()) {
525            context.getTransaction().addSynchronization(new Synchronization() {
526                @Override
527                public void afterCommit() throws Exception {
528                    // It could take while before we receive the commit
529                    // operation.. by that time the message could have
530                    // expired..
531                    if (message.isExpired()) {
532                        if (broker.isExpired(message)) {
533                            getDestinationStatistics().getExpired().increment();
534                            broker.messageExpired(context, message, null);
535                        }
536                        message.decrementReferenceCount();
537                        return;
538                    }
539                    try {
540                        dispatch(context, message);
541                    } finally {
542                        message.decrementReferenceCount();
543                    }
544                }
545
546                @Override
547                public void afterRollback() throws Exception {
548                    message.decrementReferenceCount();
549                }
550            });
551
552        } else {
553            try {
554                dispatch(context, message);
555            } finally {
556                message.decrementReferenceCount();
557            }
558        }
559
560        if (result != null && !result.isCancelled()) {
561            try {
562                result.get();
563            } catch (CancellationException e) {
564                // ignore - the task has been cancelled if the message
565                // has already been deleted
566            }
567        }
568    }
569
570    private boolean canOptimizeOutPersistence() {
571        return durableSubscribers.size() == 0;
572    }
573
574    @Override
575    public String toString() {
576        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
577    }
578
579    @Override
580    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
581            final MessageReference node) throws IOException {
582        if (topicStore != null && node.isPersistent()) {
583            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
584            SubscriptionKey key = dsub.getSubscriptionKey();
585            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
586                    convertToNonRangedAck(ack, node));
587        }
588        messageConsumed(context, node);
589    }
590
591    @Override
592    public void gc() {
593    }
594
595    public Message loadMessage(MessageId messageId) throws IOException {
596        return topicStore != null ? topicStore.getMessage(messageId) : null;
597    }
598
599    @Override
600    public void start() throws Exception {
601        this.subscriptionRecoveryPolicy.start();
602        if (memoryUsage != null) {
603            memoryUsage.start();
604        }
605
606        if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
607            scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
608        }
609    }
610
611    @Override
612    public void stop() throws Exception {
613        if (taskRunner != null) {
614            taskRunner.shutdown();
615        }
616        this.subscriptionRecoveryPolicy.stop();
617        if (memoryUsage != null) {
618            memoryUsage.stop();
619        }
620        if (this.topicStore != null) {
621            this.topicStore.stop();
622        }
623
624         scheduler.cancel(expireMessagesTask);
625    }
626
627    @Override
628    public Message[] browse() {
629        final List<Message> result = new ArrayList<Message>();
630        doBrowse(result, getMaxBrowsePageSize());
631        return result.toArray(new Message[result.size()]);
632    }
633
634    private void doBrowse(final List<Message> browseList, final int max) {
635        try {
636            if (topicStore != null) {
637                final List<Message> toExpire = new ArrayList<Message>();
638                topicStore.recover(new MessageRecoveryListener() {
639                    @Override
640                    public boolean recoverMessage(Message message) throws Exception {
641                        if (message.isExpired()) {
642                            toExpire.add(message);
643                        }
644                        browseList.add(message);
645                        return true;
646                    }
647
648                    @Override
649                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
650                        return true;
651                    }
652
653                    @Override
654                    public boolean hasSpace() {
655                        return browseList.size() < max;
656                    }
657
658                    @Override
659                    public boolean isDuplicate(MessageId id) {
660                        return false;
661                    }
662                });
663                final ConnectionContext connectionContext = createConnectionContext();
664                for (Message message : toExpire) {
665                    for (DurableTopicSubscription sub : durableSubscribers.values()) {
666                        if (!sub.isActive()) {
667                            message.setRegionDestination(this);
668                            messageExpired(connectionContext, sub, message);
669                        }
670                    }
671                }
672                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
673                if (msgs != null) {
674                    for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
675                        browseList.add(msgs[i]);
676                    }
677                }
678            }
679        } catch (Throwable e) {
680            LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e);
681        }
682    }
683
684    @Override
685    public boolean iterate() {
686        synchronized (messagesWaitingForSpace) {
687            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
688                Runnable op = messagesWaitingForSpace.removeFirst();
689                op.run();
690            }
691
692            if (!messagesWaitingForSpace.isEmpty()) {
693                registerCallbackForNotFullNotification();
694            }
695        }
696        return false;
697    }
698
699    private void registerCallbackForNotFullNotification() {
700        // If the usage manager is not full, then the task will not
701        // get called..
702        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
703            // so call it directly here.
704            sendMessagesWaitingForSpaceTask.run();
705        }
706    }
707
708    // Properties
709    // -------------------------------------------------------------------------
710
711    public DispatchPolicy getDispatchPolicy() {
712        return dispatchPolicy;
713    }
714
715    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
716        this.dispatchPolicy = dispatchPolicy;
717    }
718
719    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
720        return subscriptionRecoveryPolicy;
721    }
722
723    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
724        if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
725            // allow users to combine retained message policy with other ActiveMQ policies
726            RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
727            policy.setWrapped(recoveryPolicy);
728        } else {
729            this.subscriptionRecoveryPolicy = recoveryPolicy;
730        }
731    }
732
733    // Implementation methods
734    // -------------------------------------------------------------------------
735
736    @Override
737    public final void wakeup() {
738    }
739
740    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
741        // AMQ-2586: Better to leave this stat at zero than to give the user
742        // misleading metrics.
743        // destinationStatistics.getMessages().increment();
744        destinationStatistics.getEnqueues().increment();
745        destinationStatistics.getMessageSize().addSize(message.getSize());
746        MessageEvaluationContext msgContext = null;
747
748        dispatchLock.readLock().lock();
749        try {
750            if (!subscriptionRecoveryPolicy.add(context, message)) {
751                return;
752            }
753            synchronized (consumers) {
754                if (consumers.isEmpty()) {
755                    onMessageWithNoConsumers(context, message);
756                    return;
757                }
758            }
759
760            // Clear memory before dispatch - need to clear here because the call to
761            //subscriptionRecoveryPolicy.add() will unmarshall the state
762            if (isReduceMemoryFootprint() && message.isMarshalled()) {
763                message.clearUnMarshalledState();
764            }
765
766            msgContext = context.getMessageEvaluationContext();
767            msgContext.setDestination(destination);
768            msgContext.setMessageReference(message);
769            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
770                onMessageWithNoConsumers(context, message);
771            }
772
773        } finally {
774            dispatchLock.readLock().unlock();
775            if (msgContext != null) {
776                msgContext.clear();
777            }
778        }
779    }
780
781    private final Runnable expireMessagesTask = new Runnable() {
782        @Override
783        public void run() {
784            List<Message> browsedMessages = new InsertionCountList<Message>();
785            doBrowse(browsedMessages, getMaxExpirePageSize());
786        }
787    };
788
789    @Override
790    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
791        broker.messageExpired(context, reference, subs);
792        // AMQ-2586: Better to leave this stat at zero than to give the user
793        // misleading metrics.
794        // destinationStatistics.getMessages().decrement();
795        destinationStatistics.getExpired().increment();
796        MessageAck ack = new MessageAck();
797        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
798        ack.setDestination(destination);
799        ack.setMessageID(reference.getMessageId());
800        try {
801            if (subs instanceof DurableTopicSubscription) {
802                ((DurableTopicSubscription)subs).removePending(reference);
803            }
804            acknowledge(context, subs, ack, reference);
805        } catch (Exception e) {
806            LOG.error("Failed to remove expired Message from the store ", e);
807        }
808    }
809
810    @Override
811    protected Logger getLog() {
812        return LOG;
813    }
814
815    protected boolean isOptimizeStorage(){
816        boolean result = false;
817
818        if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
819                result = true;
820                for (DurableTopicSubscription s : durableSubscribers.values()) {
821                    if (s.isActive()== false){
822                        result = false;
823                        break;
824                    }
825                    if (s.getPrefetchSize()==0){
826                        result = false;
827                        break;
828                    }
829                    if (s.isSlowConsumer()){
830                        result = false;
831                        break;
832                    }
833                    if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
834                        result = false;
835                        break;
836                    }
837                }
838        }
839        return result;
840    }
841
842    /**
843     * force a reread of the store - after transaction recovery completion
844     */
845    @Override
846    public void clearPendingMessages() {
847        dispatchLock.readLock().lock();
848        try {
849            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
850                clearPendingAndDispatch(durableTopicSubscription);
851            }
852        } finally {
853            dispatchLock.readLock().unlock();
854        }
855    }
856
857    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
858        synchronized (durableTopicSubscription.pendingLock) {
859            durableTopicSubscription.pending.clear();
860            try {
861                durableTopicSubscription.dispatchPending();
862            } catch (IOException exception) {
863                LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{
864                        durableTopicSubscription,
865                        destination,
866                        durableTopicSubscription.pending }, exception);
867            }
868        }
869    }
870
871    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
872        return durableSubscribers;
873    }
874}