001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.policy; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.List; 022 023import org.apache.activemq.broker.Broker; 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.broker.region.DurableTopicSubscription; 026import org.apache.activemq.broker.region.MessageReference; 027import org.apache.activemq.broker.region.SubscriptionRecovery; 028import org.apache.activemq.broker.region.Topic; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.filter.DestinationFilter; 032 033/** 034 * This implementation of {@link org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy} will only keep the 035 * last non-zero length message with the {@link org.apache.activemq.command.ActiveMQMessage}.RETAIN_PROPERTY. 036 * 037 * @org.apache.xbean.XBean 038 * 039 */ 040public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { 041 042 public static final String RETAIN_PROPERTY = "ActiveMQ.Retain"; 043 public static final String RETAINED_PROPERTY = "ActiveMQ.Retained"; 044 private volatile MessageReference retainedMessage; 045 private SubscriptionRecoveryPolicy wrapped; 046 047 public RetainedMessageSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy wrapped) { 048 this.wrapped = wrapped; 049 } 050 051 public boolean add(ConnectionContext context, MessageReference node) throws Exception { 052 final Message message = node.getMessage(); 053 final Object retainValue = message.getProperty(RETAIN_PROPERTY); 054 // retain property set to true 055 final boolean retain = retainValue != null && Boolean.parseBoolean(retainValue.toString()); 056 if (retain) { 057 if (message.getContent().getLength() > 0) { 058 // non zero length message content 059 retainedMessage = message.copy(); 060 retainedMessage.getMessage().removeProperty(RETAIN_PROPERTY); 061 retainedMessage.getMessage().setProperty(RETAINED_PROPERTY, true); 062 } else { 063 // clear retained message 064 retainedMessage = null; 065 } 066 // TODO should we remove the publisher's retain property?? 067 node.getMessage().removeProperty(RETAIN_PROPERTY); 068 } 069 return wrapped == null ? true : wrapped.add(context, node); 070 } 071 072 public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception { 073 // Re-dispatch the last retained message seen. 074 if (retainedMessage != null) { 075 sub.addRecoveredMessage(context, retainedMessage); 076 } 077 if (wrapped != null) { 078 // retain default ActiveMQ behaviour of recovering messages only for empty durable subscriptions 079 boolean recover = true; 080 if (sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isEmpty(topic)) { 081 recover = false; 082 } 083 if (recover) { 084 wrapped.recover(context, topic, sub); 085 } 086 } 087 } 088 089 public void start() throws Exception { 090 if (wrapped != null) { 091 wrapped.start(); 092 } 093 } 094 095 public void stop() throws Exception { 096 if (wrapped != null) { 097 wrapped.stop(); 098 } 099 } 100 101 public Message[] browse(ActiveMQDestination destination) throws Exception { 102 final List<Message> result = new ArrayList<Message>(); 103 if (retainedMessage != null) { 104 DestinationFilter filter = DestinationFilter.parseFilter(destination); 105 if (filter.matches(retainedMessage.getMessage().getDestination())) { 106 result.add(retainedMessage.getMessage()); 107 } 108 } 109 Message[] messages = result.toArray(new Message[result.size()]); 110 if (wrapped != null) { 111 final Message[] wrappedMessages = wrapped.browse(destination); 112 if (wrappedMessages != null && wrappedMessages.length > 0) { 113 final int origLen = messages.length; 114 messages = Arrays.copyOf(messages, origLen + wrappedMessages.length); 115 System.arraycopy(wrappedMessages, 0, messages, origLen, wrappedMessages.length); 116 } 117 } 118 return messages; 119 } 120 121 public SubscriptionRecoveryPolicy copy() { 122 return new RetainedMessageSubscriptionRecoveryPolicy(wrapped); 123 } 124 125 public void setBroker(Broker broker) { 126 } 127 128 public void setWrapped(SubscriptionRecoveryPolicy wrapped) { 129 this.wrapped = wrapped; 130 } 131 132 public SubscriptionRecoveryPolicy getWrapped() { 133 return wrapped; 134 } 135}