001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.Serializable; 020 021import javax.jms.JMSException; 022import javax.jms.Message; 023 024import org.apache.activemq.broker.region.MessageReference; 025import org.apache.activemq.command.MessageId; 026import org.apache.activemq.command.ProducerId; 027import org.apache.activemq.util.BitArrayBin; 028import org.apache.activemq.util.IdGenerator; 029import org.apache.activemq.util.LRUCache; 030 031/** 032 * Provides basic audit functions for Messages without sync 033 */ 034public class ActiveMQMessageAuditNoSync implements Serializable { 035 036 private static final long serialVersionUID = 1L; 037 038 public static final int DEFAULT_WINDOW_SIZE = 2048; 039 public static final int MAXIMUM_PRODUCER_COUNT = 64; 040 private int auditDepth; 041 private int maximumNumberOfProducersToTrack; 042 private final LRUCache<String, BitArrayBin> map; 043 private transient boolean modified = true; 044 045 /** 046 * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 64 047 */ 048 public ActiveMQMessageAuditNoSync() { 049 this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT); 050 } 051 052 /** 053 * Construct a MessageAudit 054 * 055 * @param auditDepth range of ids to track 056 * @param maximumNumberOfProducersToTrack number of producers expected in the system 057 */ 058 public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) { 059 this.auditDepth = auditDepth; 060 this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; 061 this.map = new LRUCache<String, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true); 062 } 063 064 /** 065 * @return the auditDepth 066 */ 067 public int getAuditDepth() { 068 return auditDepth; 069 } 070 071 /** 072 * @param auditDepth the auditDepth to set 073 */ 074 public void setAuditDepth(int auditDepth) { 075 this.auditDepth = auditDepth; 076 this.modified = true; 077 } 078 079 /** 080 * @return the maximumNumberOfProducersToTrack 081 */ 082 public int getMaximumNumberOfProducersToTrack() { 083 return maximumNumberOfProducersToTrack; 084 } 085 086 /** 087 * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set 088 */ 089 public void setMaximumNumberOfProducersToTrack(int maximumNumberOfProducersToTrack) { 090 091 if (maximumNumberOfProducersToTrack < this.maximumNumberOfProducersToTrack){ 092 LRUCache<String, BitArrayBin> newMap = new LRUCache<String, BitArrayBin>(0,maximumNumberOfProducersToTrack,0.75f,true); 093 /** 094 * As putAll will access the entries in the right order, 095 * this shouldn't result in wrong cache entries being removed 096 */ 097 newMap.putAll(this.map); 098 this.map.clear(); 099 this.map.putAll(newMap); 100 } 101 this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); 102 this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; 103 this.modified = true; 104 } 105 106 /** 107 * Checks if this message has been seen before 108 * 109 * @param message 110 * @return true if the message is a duplicate 111 * @throws JMSException 112 */ 113 public boolean isDuplicate(Message message) throws JMSException { 114 return isDuplicate(message.getJMSMessageID()); 115 } 116 117 /** 118 * checks whether this messageId has been seen before and adds this 119 * messageId to the list 120 * 121 * @param id 122 * @return true if the message is a duplicate 123 */ 124 public boolean isDuplicate(String id) { 125 boolean answer = false; 126 String seed = IdGenerator.getSeedFromId(id); 127 if (seed != null) { 128 BitArrayBin bab = map.get(seed); 129 if (bab == null) { 130 bab = new BitArrayBin(auditDepth); 131 map.put(seed, bab); 132 modified = true; 133 } 134 long index = IdGenerator.getSequenceFromId(id); 135 if (index >= 0) { 136 answer = bab.setBit(index, true); 137 modified = true; 138 } 139 } 140 return answer; 141 } 142 143 /** 144 * Checks if this message has been seen before 145 * 146 * @param message 147 * @return true if the message is a duplicate 148 */ 149 public boolean isDuplicate(final MessageReference message) { 150 MessageId id = message.getMessageId(); 151 return isDuplicate(id); 152 } 153 154 /** 155 * Checks if this messageId has been seen before 156 * 157 * @param id 158 * @return true if the message is a duplicate 159 */ 160 public boolean isDuplicate(final MessageId id) { 161 boolean answer = false; 162 163 if (id != null) { 164 ProducerId pid = id.getProducerId(); 165 if (pid != null) { 166 BitArrayBin bab = map.get(pid.toString()); 167 if (bab == null) { 168 bab = new BitArrayBin(auditDepth); 169 map.put(pid.toString(), bab); 170 modified = true; 171 } 172 answer = bab.setBit(id.getProducerSequenceId(), true); 173 } 174 } 175 return answer; 176 } 177 178 /** 179 * mark this message as being received 180 * 181 * @param message 182 */ 183 public void rollback(final MessageReference message) { 184 MessageId id = message.getMessageId(); 185 rollback(id); 186 } 187 188 /** 189 * mark this message as being received 190 * 191 * @param id 192 */ 193 public void rollback(final MessageId id) { 194 if (id != null) { 195 ProducerId pid = id.getProducerId(); 196 if (pid != null) { 197 BitArrayBin bab = map.get(pid.toString()); 198 if (bab != null) { 199 bab.setBit(id.getProducerSequenceId(), false); 200 modified = true; 201 } 202 } 203 } 204 } 205 206 public void rollback(final String id) { 207 String seed = IdGenerator.getSeedFromId(id); 208 if (seed != null) { 209 BitArrayBin bab = map.get(seed); 210 if (bab != null) { 211 long index = IdGenerator.getSequenceFromId(id); 212 bab.setBit(index, false); 213 modified = true; 214 } 215 } 216 } 217 218 /** 219 * Check the message is in order 220 * 221 * @param msg 222 * 223 * @return true if the id is in order 224 * 225 * @throws JMSException 226 */ 227 public boolean isInOrder(Message msg) throws JMSException { 228 return isInOrder(msg.getJMSMessageID()); 229 } 230 231 /** 232 * Check the message id is in order 233 * 234 * @param id 235 * 236 * @return true if the id is in order 237 */ 238 public boolean isInOrder(final String id) { 239 boolean answer = true; 240 241 if (id != null) { 242 String seed = IdGenerator.getSeedFromId(id); 243 if (seed != null) { 244 BitArrayBin bab = map.get(seed); 245 if (bab != null) { 246 long index = IdGenerator.getSequenceFromId(id); 247 answer = bab.isInOrder(index); 248 modified = true; 249 } 250 } 251 } 252 return answer; 253 } 254 255 /** 256 * Check the MessageId is in order 257 * 258 * @param message 259 * 260 * @return true if the id is in order 261 */ 262 public boolean isInOrder(final MessageReference message) { 263 return isInOrder(message.getMessageId()); 264 } 265 266 /** 267 * Check the MessageId is in order 268 * 269 * @param id 270 * 271 * @return true if the id is in order 272 */ 273 public boolean isInOrder(final MessageId id) { 274 boolean answer = false; 275 276 if (id != null) { 277 ProducerId pid = id.getProducerId(); 278 if (pid != null) { 279 BitArrayBin bab = map.get(pid.toString()); 280 if (bab == null) { 281 bab = new BitArrayBin(auditDepth); 282 map.put(pid.toString(), bab); 283 modified = true; 284 } 285 answer = bab.isInOrder(id.getProducerSequenceId()); 286 287 } 288 } 289 return answer; 290 } 291 292 public long getLastSeqId(ProducerId id) { 293 long result = -1; 294 BitArrayBin bab = map.get(id.toString()); 295 if (bab != null) { 296 result = bab.getLastSetIndex(); 297 } 298 return result; 299 } 300 301 public void clear() { 302 map.clear(); 303 } 304 305 /** 306 * Returns if the Audit has been modified since last check, this method does not 307 * reset the modified flag. If the caller needs to reset the flag in order to avoid 308 * serializing an unchanged Audit then its up the them to reset it themselves. 309 * 310 * @return true if the Audit has been modified. 311 */ 312 public boolean isModified() { 313 return this.modified; 314 } 315 316 public void setModified(boolean modified) { 317 this.modified = modified; 318 } 319 320 /** 321 * Reads and returns the current modified state of the Audit, once called the state is 322 * reset to false. This method is useful for code the needs to know if it should write 323 * out the Audit or otherwise execute some logic based on the Audit having changed since 324 * last check. 325 * 326 * @return true if the Audit has been modified since last check. 327 */ 328 public boolean modified() { 329 if (this.modified) { 330 this.modified = false; 331 return true; 332 } 333 334 return false; 335 } 336}