001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.util.Set; 020import java.util.concurrent.CopyOnWriteArraySet; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import org.apache.activemq.command.ConsumerId; 026import org.apache.activemq.command.ConsumerInfo; 027import org.apache.activemq.command.NetworkBridgeFilter; 028import org.apache.activemq.command.SubscriptionInfo; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Represents a network bridge interface 034 */ 035public class DemandSubscription { 036 private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class); 037 038 private final ConsumerInfo remoteInfo; 039 private final ConsumerInfo localInfo; 040 private final Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>(); 041 private final AtomicInteger dispatched = new AtomicInteger(0); 042 private final AtomicBoolean activeWaiter = new AtomicBoolean(); 043 private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>(); 044 private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>(); 045 private SubscriptionInfo localDurableSubscriber; 046 047 private NetworkBridgeFilter networkBridgeFilter; 048 private boolean staticallyIncluded; 049 050 DemandSubscription(ConsumerInfo info) { 051 remoteInfo = info; 052 localInfo = info.copy(); 053 localInfo.setNetworkSubscription(true); 054 remoteSubsIds.add(info.getConsumerId()); 055 } 056 057 @Override 058 public String toString() { 059 return "DemandSub{" + localInfo.getConsumerId() + ",remotes:" + remoteSubsIds + "}"; 060 } 061 062 /** 063 * Increment the consumers associated with this subscription 064 * 065 * @param id 066 * @return true if added 067 */ 068 public boolean add(ConsumerId id) { 069 return remoteSubsIds.add(id); 070 } 071 072 /** 073 * Increment the consumers associated with this subscription 074 * 075 * @param id 076 * @return true if removed 077 */ 078 public boolean remove(ConsumerId id) { 079 return remoteSubsIds.remove(id); 080 } 081 082 public Set<SubscriptionInfo> getDurableRemoteSubs() { 083 return durableRemoteSubs; 084 } 085 086 /** 087 * @return true if there are no interested consumers 088 */ 089 public boolean isEmpty() { 090 return remoteSubsIds.isEmpty(); 091 } 092 093 public int size() { 094 return remoteSubsIds.size(); 095 } 096 /** 097 * @return Returns the localInfo. 098 */ 099 public ConsumerInfo getLocalInfo() { 100 return localInfo; 101 } 102 103 /** 104 * @return Returns the remoteInfo. 105 */ 106 public ConsumerInfo getRemoteInfo() { 107 return remoteInfo; 108 } 109 110 public boolean addForcedDurableConsumer(ConsumerId id) { 111 return forcedDurableConsumers.add(id); 112 } 113 114 public boolean removeForcedDurableConsumer(ConsumerId id) { 115 return forcedDurableConsumers.remove(id); 116 } 117 118 public int getForcedDurableConsumersSize() { 119 return forcedDurableConsumers.size(); 120 } 121 122 public void waitForCompletion() { 123 if (dispatched.get() > 0) { 124 LOG.debug("Waiting for completion for sub: {}, dispatched: {}", localInfo.getConsumerId(), this.dispatched.get()); 125 activeWaiter.set(true); 126 if (dispatched.get() > 0) { 127 synchronized (activeWaiter) { 128 try { 129 activeWaiter.wait(TimeUnit.SECONDS.toMillis(30)); 130 } catch (InterruptedException ignored) { 131 } 132 } 133 if (this.dispatched.get() > 0) { 134 LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially {} duplicate forwards", this.dispatched.get()); 135 } 136 } 137 } 138 } 139 140 public void decrementOutstandingResponses() { 141 if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) { 142 synchronized (activeWaiter) { 143 activeWaiter.notifyAll(); 144 } 145 } 146 } 147 148 public boolean incrementOutstandingResponses() { 149 dispatched.incrementAndGet(); 150 if (activeWaiter.get()) { 151 decrementOutstandingResponses(); 152 return false; 153 } 154 return true; 155 } 156 157 public NetworkBridgeFilter getNetworkBridgeFilter() { 158 return networkBridgeFilter; 159 } 160 161 public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) { 162 this.networkBridgeFilter = networkBridgeFilter; 163 } 164 165 public SubscriptionInfo getLocalDurableSubscriber() { 166 return localDurableSubscriber; 167 } 168 169 public void setLocalDurableSubscriber(SubscriptionInfo localDurableSubscriber) { 170 this.localDurableSubscriber = localDurableSubscriber; 171 } 172 173 public boolean isStaticallyIncluded() { 174 return staticallyIncluded; 175 } 176 177 public void setStaticallyIncluded(boolean staticallyIncluded) { 178 this.staticallyIncluded = staticallyIncluded; 179 } 180}