001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.concurrent.atomic.AtomicLong;
027
028import javax.jms.InvalidSelectorException;
029import javax.jms.JMSException;
030
031import org.apache.activemq.broker.Broker;
032import org.apache.activemq.broker.ConnectionContext;
033import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
034import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
035import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
036import org.apache.activemq.broker.region.policy.PolicyEntry;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ConsumerInfo;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.command.MessageAck;
041import org.apache.activemq.command.MessageDispatch;
042import org.apache.activemq.command.MessageId;
043import org.apache.activemq.store.TopicMessageStore;
044import org.apache.activemq.usage.SystemUsage;
045import org.apache.activemq.usage.Usage;
046import org.apache.activemq.usage.UsageListener;
047import org.apache.activemq.util.SubscriptionKey;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
052
053    private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
054    private final ConcurrentMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
055    private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
056    private final SubscriptionKey subscriptionKey;
057    private boolean keepDurableSubsActive;
058    private final AtomicBoolean active = new AtomicBoolean();
059    private final AtomicLong offlineTimestamp = new AtomicLong(-1);
060
061    public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
062            throws JMSException {
063        super(broker, usageManager, context, info);
064        this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
065        this.pending.setSystemUsage(usageManager);
066        this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
067        this.keepDurableSubsActive = keepDurableSubsActive;
068        subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
069    }
070
071    public final boolean isActive() {
072        return active.get();
073    }
074
075    public final long getOfflineTimestamp() {
076        return offlineTimestamp.get();
077    }
078
079    public void setOfflineTimestamp(long timestamp) {
080        offlineTimestamp.set(timestamp);
081    }
082
083    @Override
084    public boolean isFull() {
085        return !active.get() || super.isFull();
086    }
087
088    @Override
089    public void gc() {
090    }
091
092    /**
093     * store will have a pending ack for all durables, irrespective of the
094     * selector so we need to ack if node is un-matched
095     */
096    @Override
097    public void unmatched(MessageReference node) throws IOException {
098        MessageAck ack = new MessageAck();
099        ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
100        ack.setMessageID(node.getMessageId());
101        Destination regionDestination = (Destination) node.getRegionDestination();
102        regionDestination.acknowledge(this.getContext(), this, ack, node);
103    }
104
105    @Override
106    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
107        // statically configured via maxPageSize
108    }
109
110    @Override
111    public void add(ConnectionContext context, Destination destination) throws Exception {
112        if (!destinations.contains(destination)) {
113            super.add(context, destination);
114        }
115        // do it just once per destination
116        if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
117            return;
118        }
119        durableDestinations.put(destination.getActiveMQDestination(), destination);
120
121        if (active.get() || keepDurableSubsActive) {
122            Topic topic = (Topic) destination;
123            topic.activate(context, this);
124            getSubscriptionStatistics().getEnqueues().add(pending.size());
125        } else if (destination.getMessageStore() != null) {
126            TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
127            try {
128                getSubscriptionStatistics().getEnqueues().add(store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName()));
129            } catch (IOException e) {
130                JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e);
131                jmsEx.setLinkedException(e);
132                throw jmsEx;
133            }
134        }
135        dispatchPending();
136    }
137
138    // used by RetaineMessageSubscriptionRecoveryPolicy
139    public boolean isEmpty(Topic topic) {
140        return pending.isEmpty(topic);
141    }
142
143    public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception {
144        if (!active.get()) {
145            this.context = context;
146            this.info = info;
147
148            LOG.debug("Activating {}", this);
149            if (!keepDurableSubsActive) {
150                for (Destination destination : durableDestinations.values()) {
151                    Topic topic = (Topic) destination;
152                    add(context, topic);
153                    topic.activate(context, this);
154                }
155
156                // On Activation we should update the configuration based on our new consumer info.
157                ActiveMQDestination dest = this.info.getDestination();
158                if (dest != null && regionBroker.getDestinationPolicy() != null) {
159                    PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
160                    if (entry != null) {
161                        entry.configure(broker, usageManager, this);
162                    }
163                }
164            }
165
166            synchronized (pendingLock) {
167                if (!((AbstractPendingMessageCursor) pending).isStarted() || !keepDurableSubsActive) {
168                    pending.setSystemUsage(memoryManager);
169                    pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
170                    pending.setMaxAuditDepth(getMaxAuditDepth());
171                    pending.setMaxProducersToAudit(getMaxProducersToAudit());
172                    pending.start();
173                }
174                // use recovery policy every time sub is activated for retroactive topics and consumers
175                for (Destination destination : durableDestinations.values()) {
176                    Topic topic = (Topic) destination;
177                    if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
178                        topic.recoverRetroactiveMessages(context, this);
179                    }
180                }
181            }
182            this.active.set(true);
183            this.offlineTimestamp.set(-1);
184            dispatchPending();
185            this.usageManager.getMemoryUsage().addUsageListener(this);
186        }
187    }
188
189    public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception {
190        LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this);
191        active.set(false);
192        this.keepDurableSubsActive = keepDurableSubsActive;
193        offlineTimestamp.set(System.currentTimeMillis());
194        usageManager.getMemoryUsage().removeUsageListener(this);
195
196        ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>();
197        List<MessageReference> savedDispateched = null;
198
199        synchronized (pendingLock) {
200            if (!keepDurableSubsActive) {
201                pending.stop();
202            }
203
204            synchronized (dispatchLock) {
205                for (Destination destination : durableDestinations.values()) {
206                    Topic topic = (Topic) destination;
207                    if (!keepDurableSubsActive) {
208                        topicsToDeactivate.add(topic);
209                    } else {
210                        topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
211                    }
212                }
213
214                // Before we add these back to pending they need to be in producer order not
215                // dispatch order so we can add them to the front of the pending list.
216                Collections.reverse(dispatched);
217
218                for (final MessageReference node : dispatched) {
219                    // Mark the dispatched messages as redelivered for next time.
220                    if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) {
221                        Integer count = redeliveredMessages.get(node.getMessageId());
222                        if (count != null) {
223                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
224                        } else {
225                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
226                        }
227                    }
228                    if (keepDurableSubsActive && pending.isTransient()) {
229                        pending.addMessageFirst(node);
230                        pending.rollback(node.getMessageId());
231                    }
232                    // createMessageDispatch increments on remove from pending for dispatch
233                    node.decrementReferenceCount();
234                }
235
236                if (!topicsToDeactivate.isEmpty()) {
237                    savedDispateched = new ArrayList<MessageReference>(dispatched);
238                }
239                dispatched.clear();
240                getSubscriptionStatistics().getInflightMessageSize().reset();
241            }
242            if (!keepDurableSubsActive && pending.isTransient()) {
243                try {
244                    pending.reset();
245                    while (pending.hasNext()) {
246                        MessageReference node = pending.next();
247                        node.decrementReferenceCount();
248                        pending.remove();
249                    }
250                } finally {
251                    pending.release();
252                }
253            }
254        }
255        for(Topic topic: topicsToDeactivate) {
256            topic.deactivate(context, this, savedDispateched);
257        }
258        prefetchExtension.set(0);
259    }
260
261    @Override
262    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
263        MessageDispatch md = super.createMessageDispatch(node, message);
264        if (node != QueueMessageReference.NULL_MESSAGE) {
265            node.incrementReferenceCount();
266            Integer count = redeliveredMessages.get(node.getMessageId());
267            if (count != null) {
268                md.setRedeliveryCounter(count.intValue());
269            }
270        }
271        return md;
272    }
273
274    @Override
275    public void add(MessageReference node) throws Exception {
276        if (!active.get() && !keepDurableSubsActive) {
277            return;
278        }
279        super.add(node);
280    }
281
282    @Override
283    public void dispatchPending() throws IOException {
284        if (isActive()) {
285            super.dispatchPending();
286        }
287    }
288
289    public void removePending(MessageReference node) throws IOException {
290        pending.remove(node);
291    }
292
293    @Override
294    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
295        synchronized (pending) {
296            pending.addRecoveredMessage(message);
297        }
298    }
299
300    @Override
301    public int getPendingQueueSize() {
302        if (active.get() || keepDurableSubsActive) {
303            return super.getPendingQueueSize();
304        }
305        // TODO: need to get from store
306        return 0;
307    }
308
309    @Override
310    public void setSelector(String selector) throws InvalidSelectorException {
311        if (active.get()) {
312            throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
313        } else {
314            super.setSelector(getSelector());
315        }
316    }
317
318    @Override
319    protected boolean canDispatch(MessageReference node) {
320        return true;  // let them go, our dispatchPending gates the active / inactive state.
321    }
322
323    @Override
324    protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
325        this.setTimeOfLastMessageAck(System.currentTimeMillis());
326        Destination regionDestination = (Destination) node.getRegionDestination();
327        regionDestination.acknowledge(context, this, ack, node);
328        redeliveredMessages.remove(node.getMessageId());
329        node.decrementReferenceCount();
330        ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
331        if (info.isNetworkSubscription()) {
332            ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
333        }
334    }
335
336    @Override
337    public synchronized String toString() {
338        return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
339                + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount()
340                + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
341    }
342
343    public SubscriptionKey getSubscriptionKey() {
344        return subscriptionKey;
345    }
346
347    /**
348     * Release any references that we are holding.
349     */
350    @Override
351    public void destroy() {
352        synchronized (pendingLock) {
353            try {
354                pending.reset();
355                while (pending.hasNext()) {
356                    MessageReference node = pending.next();
357                    node.decrementReferenceCount();
358                }
359            } finally {
360                pending.release();
361                pending.clear();
362            }
363        }
364        synchronized (dispatchLock) {
365            for (MessageReference node : dispatched) {
366                node.decrementReferenceCount();
367            }
368            dispatched.clear();
369        }
370        setSlowConsumer(false);
371    }
372
373    @Override
374    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
375        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
376            try {
377                dispatchPending();
378            } catch (IOException e) {
379                LOG.warn("problem calling dispatchMatched", e);
380            }
381        }
382    }
383
384    @Override
385    protected boolean isDropped(MessageReference node) {
386        return false;
387    }
388
389    public boolean isKeepDurableSubsActive() {
390        return keepDurableSubsActive;
391    }
392}