001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.IOException; 020import java.util.Set; 021 022import javax.jms.InvalidSelectorException; 023import javax.management.ObjectName; 024 025import org.apache.activemq.broker.BrokerService; 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.broker.region.Subscription; 028import org.apache.activemq.command.ActiveMQDestination; 029import org.apache.activemq.command.ActiveMQQueue; 030import org.apache.activemq.command.ActiveMQTopic; 031import org.apache.activemq.command.ConsumerInfo; 032import org.apache.activemq.filter.DestinationFilter; 033import org.apache.activemq.util.IOExceptionSupport; 034 035/** 036 * 037 */ 038public class SubscriptionView implements SubscriptionViewMBean { 039 040 protected final Subscription subscription; 041 protected final String clientId; 042 protected final String userName; 043 044 /** 045 * Constructor 046 * 047 * @param subs 048 */ 049 public SubscriptionView(String clientId, String userName, Subscription subs) { 050 this.clientId = clientId; 051 this.subscription = subs; 052 this.userName = userName; 053 } 054 055 /** 056 * @return the clientId 057 */ 058 @Override 059 public String getClientId() { 060 return clientId; 061 } 062 063 /** 064 * @returns the ObjectName of the Connection that created this subscription 065 */ 066 @Override 067 public ObjectName getConnection() { 068 ObjectName result = null; 069 070 if (clientId != null && subscription != null) { 071 ConnectionContext ctx = subscription.getContext(); 072 if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) { 073 BrokerService service = ctx.getBroker().getBrokerService(); 074 ManagementContext managementCtx = service.getManagementContext(); 075 if (managementCtx != null) { 076 077 try { 078 ObjectName query = createConnectionQuery(managementCtx, service.getBrokerName()); 079 Set<ObjectName> names = managementCtx.queryNames(query, null); 080 if (names.size() == 1) { 081 result = names.iterator().next(); 082 } 083 } catch (Exception e) { 084 } 085 } 086 } 087 } 088 return result; 089 } 090 091 092 093 private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException { 094 try { 095 return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId); 096 } catch (Throwable e) { 097 throw IOExceptionSupport.create(e); 098 } 099 } 100 101 /** 102 * @return the id of the Connection the Subscription is on 103 */ 104 @Override 105 public String getConnectionId() { 106 ConsumerInfo info = getConsumerInfo(); 107 if (info != null) { 108 return info.getConsumerId().getConnectionId(); 109 } 110 return "NOTSET"; 111 } 112 113 /** 114 * @return the id of the Session the subscription is on 115 */ 116 @Override 117 public long getSessionId() { 118 ConsumerInfo info = getConsumerInfo(); 119 if (info != null) { 120 return info.getConsumerId().getSessionId(); 121 } 122 return 0; 123 } 124 125 /** 126 * @return the id of the Subscription 127 */ 128 @Override 129 public long getSubscriptionId() { 130 ConsumerInfo info = getConsumerInfo(); 131 if (info != null) { 132 return info.getConsumerId().getValue(); 133 } 134 return 0; 135 } 136 137 /** 138 * @return the destination name 139 */ 140 @Override 141 public String getDestinationName() { 142 ConsumerInfo info = getConsumerInfo(); 143 if (info != null) { 144 ActiveMQDestination dest = info.getDestination(); 145 return dest.getPhysicalName(); 146 } 147 return "NOTSET"; 148 } 149 150 @Override 151 public String getSelector() { 152 if (subscription != null) { 153 return subscription.getSelector(); 154 } 155 return null; 156 } 157 158 @Override 159 public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException { 160 if (subscription != null) { 161 subscription.setSelector(selector); 162 } else { 163 throw new UnsupportedOperationException("No subscription object"); 164 } 165 } 166 167 /** 168 * @return true if the destination is a Queue 169 */ 170 @Override 171 public boolean isDestinationQueue() { 172 ConsumerInfo info = getConsumerInfo(); 173 if (info != null) { 174 ActiveMQDestination dest = info.getDestination(); 175 return dest.isQueue(); 176 } 177 return false; 178 } 179 180 /** 181 * @return true of the destination is a Topic 182 */ 183 @Override 184 public boolean isDestinationTopic() { 185 ConsumerInfo info = getConsumerInfo(); 186 if (info != null) { 187 ActiveMQDestination dest = info.getDestination(); 188 return dest.isTopic(); 189 } 190 return false; 191 } 192 193 /** 194 * @return true if the destination is temporary 195 */ 196 @Override 197 public boolean isDestinationTemporary() { 198 ConsumerInfo info = getConsumerInfo(); 199 if (info != null) { 200 ActiveMQDestination dest = info.getDestination(); 201 return dest.isTemporary(); 202 } 203 return false; 204 } 205 206 /** 207 * @return true if the subscriber is active 208 */ 209 @Override 210 public boolean isActive() { 211 return true; 212 } 213 214 @Override 215 public boolean isNetwork() { 216 ConsumerInfo info = getConsumerInfo(); 217 if (info != null) { 218 return info.isNetworkSubscription(); 219 } 220 return false; 221 } 222 223 /** 224 * The subscription should release as may references as it can to help the 225 * garbage collector reclaim memory. 226 */ 227 public void gc() { 228 if (subscription != null) { 229 subscription.gc(); 230 } 231 } 232 233 /** 234 * @return whether or not the subscriber is retroactive or not 235 */ 236 @Override 237 public boolean isRetroactive() { 238 ConsumerInfo info = getConsumerInfo(); 239 return info != null ? info.isRetroactive() : false; 240 } 241 242 /** 243 * @return whether or not the subscriber is an exclusive consumer 244 */ 245 @Override 246 public boolean isExclusive() { 247 ConsumerInfo info = getConsumerInfo(); 248 return info != null ? info.isExclusive() : false; 249 } 250 251 /** 252 * @return whether or not the subscriber is durable (persistent) 253 */ 254 @Override 255 public boolean isDurable() { 256 ConsumerInfo info = getConsumerInfo(); 257 return info != null ? info.isDurable() : false; 258 } 259 260 /** 261 * @return whether or not the subscriber ignores local messages 262 */ 263 @Override 264 public boolean isNoLocal() { 265 ConsumerInfo info = getConsumerInfo(); 266 return info != null ? info.isNoLocal() : false; 267 } 268 269 /** 270 * @return whether or not the subscriber is configured for async dispatch 271 */ 272 @Override 273 public boolean isDispatchAsync() { 274 ConsumerInfo info = getConsumerInfo(); 275 return info != null ? info.isDispatchAsync() : false; 276 } 277 278 /** 279 * @return the maximum number of pending messages allowed in addition to the 280 * prefetch size. If enabled to a non-zero value then this will 281 * perform eviction of messages for slow consumers on non-durable 282 * topics. 283 */ 284 @Override 285 public int getMaximumPendingMessageLimit() { 286 ConsumerInfo info = getConsumerInfo(); 287 return info != null ? info.getMaximumPendingMessageLimit() : 0; 288 } 289 290 /** 291 * @return the consumer priority 292 */ 293 @Override 294 public byte getPriority() { 295 ConsumerInfo info = getConsumerInfo(); 296 return info != null ? info.getPriority() : 0; 297 } 298 299 /** 300 * @return the name of the consumer which is only used for durable 301 * consumers. 302 */ 303 @Override 304 public String getSubscriptionName() { 305 ConsumerInfo info = getConsumerInfo(); 306 return info != null ? info.getSubscriptionName() : null; 307 } 308 309 /** 310 * @return number of messages pending delivery 311 */ 312 @Override 313 public int getPendingQueueSize() { 314 return subscription != null ? subscription.getPendingQueueSize() : 0; 315 } 316 317 /** 318 * @return number of messages dispatched 319 */ 320 @Override 321 public int getDispatchedQueueSize() { 322 return subscription != null ? subscription.getDispatchedQueueSize() : 0; 323 } 324 325 @Override 326 public int getMessageCountAwaitingAcknowledge() { 327 return getDispatchedQueueSize(); 328 } 329 330 /** 331 * @return number of messages that matched the subscription 332 */ 333 @Override 334 public long getDispatchedCounter() { 335 return subscription != null ? subscription.getDispatchedCounter() : 0; 336 } 337 338 /** 339 * @return number of messages that matched the subscription 340 */ 341 @Override 342 public long getEnqueueCounter() { 343 return subscription != null ? subscription.getEnqueueCounter() : 0; 344 } 345 346 /** 347 * @return number of messages queued by the client 348 */ 349 @Override 350 public long getDequeueCounter() { 351 return subscription != null ? subscription.getDequeueCounter() : 0; 352 } 353 354 protected ConsumerInfo getConsumerInfo() { 355 return subscription != null ? subscription.getConsumerInfo() : null; 356 } 357 358 /** 359 * @return pretty print 360 */ 361 @Override 362 public String toString() { 363 return "SubscriptionView: " + getClientId() + ":" + getConnectionId(); 364 } 365 366 /** 367 */ 368 @Override 369 public int getPrefetchSize() { 370 return subscription != null ? subscription.getPrefetchSize() : 0; 371 } 372 373 @Override 374 public boolean isMatchingQueue(String queueName) { 375 if (isDestinationQueue()) { 376 return matchesDestination(new ActiveMQQueue(queueName)); 377 } 378 return false; 379 } 380 381 @Override 382 public boolean isMatchingTopic(String topicName) { 383 if (isDestinationTopic()) { 384 return matchesDestination(new ActiveMQTopic(topicName)); 385 } 386 return false; 387 } 388 389 /** 390 * Return true if this subscription matches the given destination 391 * 392 * @param destination the destination to compare against 393 * @return true if this subscription matches the given destination 394 */ 395 public boolean matchesDestination(ActiveMQDestination destination) { 396 ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination(); 397 DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination); 398 return filter.matches(destination); 399 } 400 401 @Override 402 public boolean isSlowConsumer() { 403 return subscription.isSlowConsumer(); 404 } 405 406 @Override 407 public String getUserName() { 408 return userName; 409 } 410 411 @Override 412 public void resetStatistics() { 413 if (subscription != null && subscription.getSubscriptionStatistics() != null){ 414 subscription.getSubscriptionStatistics().reset(); 415 } 416 } 417 418 @Override 419 public long getConsumedCount() { 420 return subscription != null ? subscription.getConsumedCount() : 0; 421 } 422}