001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.Message; 028import org.apache.activemq.command.MessageAck; 029import org.apache.activemq.command.MessageId; 030import org.apache.activemq.command.SubscriptionInfo; 031import org.apache.activemq.store.MessageRecoveryListener; 032import org.apache.activemq.store.MessageStoreStatistics; 033import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 034import org.apache.activemq.store.TopicMessageStore; 035import org.apache.activemq.util.LRUCache; 036import org.apache.activemq.util.SubscriptionKey; 037 038public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore { 039 040 private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase; 041 private Map<SubscriptionKey, MemoryTopicSub> topicSubMap; 042 private final Map<MessageId, Message> originalMessageTable; 043 044 public MemoryTopicMessageStore(ActiveMQDestination destination) { 045 this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap()); 046 047 // Set the messageStoreStatistics after the super class is initialized 048 // so that the stats can be properly updated on cache eviction 049 MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable; 050 cache.setMessageStoreStatistics(messageStoreStatistics); 051 } 052 053 public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, 054 Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) { 055 super(destination, messageTable); 056 this.subscriberDatabase = subscriberDatabase; 057 this.topicSubMap = makeSubMap(); 058 // this is only necessary so that messageStoreStatistics can be set if 059 // necessary We need the original reference since messageTable is wrapped 060 // in a synchronized map in the parent class 061 this.originalMessageTable = messageTable; 062 } 063 064 protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() { 065 return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>()); 066 } 067 068 protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() { 069 return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>()); 070 } 071 072 @Override 073 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 074 super.addMessage(context, message); 075 for (MemoryTopicSub sub : topicSubMap.values()) { 076 sub.addMessage(message.getMessageId(), message); 077 } 078 } 079 080 @Override 081 public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { 082 super.removeMessage(messageId); 083 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 084 MemoryTopicSub sub = topicSubMap.get(key); 085 if (sub != null) { 086 sub.removeMessage(messageId); 087 } 088 } 089 090 @Override 091 public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 092 return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName)); 093 } 094 095 @Override 096 public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException { 097 SubscriptionKey key = new SubscriptionKey(info); 098 MemoryTopicSub sub = new MemoryTopicSub(key); 099 topicSubMap.put(key, sub); 100 if (retroactive) { 101 for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) { 102 sub.addMessage(entry.getKey(), entry.getValue()); 103 } 104 } 105 subscriberDatabase.put(key, info); 106 } 107 108 @Override 109 public synchronized void deleteSubscription(String clientId, String subscriptionName) { 110 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 111 subscriberDatabase.remove(key); 112 MemoryTopicSub subscription = topicSubMap.get(key); 113 if (subscription != null) { 114 List<Message> storedMessages = subscription.getStoredMessages(); 115 for (Message message : storedMessages) { 116 try { 117 acknowledge(null, key.getClientId(), key.getSubscriptionName(), message.getMessageId(), null); 118 } catch (IOException e) { 119 } 120 } 121 } 122 123 subscriberDatabase.remove(key); 124 topicSubMap.remove(key); 125 } 126 127 @Override 128 public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { 129 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 130 if (sub != null) { 131 sub.recoverSubscription(listener); 132 } 133 } 134 135 @Override 136 public synchronized void delete() { 137 super.delete(); 138 subscriberDatabase.clear(); 139 topicSubMap.clear(); 140 } 141 142 @Override 143 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 144 return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); 145 } 146 147 @Override 148 public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException { 149 int result = 0; 150 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName)); 151 if (sub != null) { 152 result = sub.size(); 153 } 154 return result; 155 } 156 157 @Override 158 public synchronized long getMessageSize(String clientId, String subscriberName) throws IOException { 159 long result = 0; 160 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName)); 161 if (sub != null) { 162 result = sub.messageSize(); 163 } 164 return result; 165 } 166 167 @Override 168 public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { 169 MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 170 if (sub != null) { 171 sub.recoverNextMessages(maxReturned, listener); 172 } 173 } 174 175 @Override 176 public void resetBatching(String clientId, String subscriptionName) { 177 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 178 if (sub != null) { 179 sub.resetBatching(); 180 } 181 } 182 183 // Disabled for the memory store, can be enabled later if necessary 184 private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false); 185 186 @Override 187 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 188 return stats; 189 } 190 191 /** 192 * Since we initialize the store with a LRUCache in some cases, we need to 193 * account for cache evictions when computing the message store statistics. 194 * 195 */ 196 private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> { 197 private static final long serialVersionUID = -342098639681884413L; 198 private MessageStoreStatistics messageStoreStatistics; 199 200 public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize, float loadFactor, boolean accessOrder) { 201 super(initialCapacity, maximumCacheSize, loadFactor, accessOrder); 202 } 203 204 public void setMessageStoreStatistics(MessageStoreStatistics messageStoreStatistics) { 205 this.messageStoreStatistics = messageStoreStatistics; 206 } 207 208 @Override 209 protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) { 210 decMessageStoreStatistics(messageStoreStatistics, eldest.getValue()); 211 212 // We aren't tracking this anymore so remove our reference to it. 213 eldest.getValue().decrementReferenceCount(); 214 } 215 } 216}