001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.virtual; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.Iterator; 022import java.util.List; 023import java.util.Set; 024 025import org.apache.activemq.broker.Broker; 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.broker.ProducerBrokerExchange; 028import org.apache.activemq.broker.region.Destination; 029import org.apache.activemq.broker.region.DestinationFilter; 030import org.apache.activemq.broker.region.DestinationInterceptor; 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.Message; 033import org.apache.activemq.filter.DestinationMap; 034 035/** 036 * Implements <a 037 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 038 * Topics</a>. 039 * 040 * @org.apache.xbean.XBean 041 * 042 */ 043public class VirtualDestinationInterceptor implements DestinationInterceptor { 044 045 private DestinationMap destinationMap = new DestinationMap(); 046 private DestinationMap mappedDestinationMap = new DestinationMap(); 047 048 private VirtualDestination[] virtualDestinations; 049 050 @Override 051 public Destination intercept(Destination destination) { 052 final ActiveMQDestination activeMQDestination = destination.getActiveMQDestination(); 053 Set matchingDestinations = destinationMap.get(activeMQDestination); 054 List<Destination> destinations = new ArrayList<Destination>(); 055 for (Iterator iter = matchingDestinations.iterator(); iter.hasNext();) { 056 VirtualDestination virtualDestination = (VirtualDestination) iter.next(); 057 Destination newDestination = virtualDestination.intercept(destination); 058 destinations.add(newDestination); 059 } 060 if (!destinations.isEmpty()) { 061 if (destinations.size() == 1) { 062 return destinations.get(0); 063 } else { 064 // should rarely be used but here just in case 065 return createCompositeDestination(destination, destinations); 066 } 067 } 068 // check if the destination instead matches any mapped destinations 069 Set mappedDestinations = mappedDestinationMap.get(activeMQDestination); 070 if (!mappedDestinations.isEmpty()) { 071 // create a mapped destination interceptor 072 VirtualDestination virtualDestination = (VirtualDestination) 073 mappedDestinations.toArray(new VirtualDestination[mappedDestinations.size()])[0]; 074 return virtualDestination.interceptMappedDestination(destination); 075 } 076 077 return destination; 078 } 079 080 @Override 081 public synchronized void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { 082 for (VirtualDestination virt : virtualDestinations) { 083 virt.create(broker, context, destination); 084 } 085 } 086 087 @Override 088 public synchronized void remove(Destination destination) { 089 } 090 091 public VirtualDestination[] getVirtualDestinations() { 092 return virtualDestinations; 093 } 094 095 public void setVirtualDestinations(VirtualDestination[] virtualDestinations) { 096 destinationMap = new DestinationMap(); 097 mappedDestinationMap = new DestinationMap(); 098 this.virtualDestinations = virtualDestinations; 099 for (int i = 0; i < virtualDestinations.length; i++) { 100 VirtualDestination virtualDestination = virtualDestinations[i]; 101 destinationMap.put(virtualDestination.getVirtualDestination(), virtualDestination); 102 mappedDestinationMap.put(virtualDestination.getMappedDestinations(), virtualDestination); 103 } 104 } 105 106 protected Destination createCompositeDestination(Destination destination, final List<Destination> destinations) { 107 return new DestinationFilter(destination) { 108 @Override 109 public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { 110 for (Iterator<Destination> iter = destinations.iterator(); iter.hasNext();) { 111 Destination destination = iter.next(); 112 destination.send(context, messageSend); 113 } 114 } 115 }; 116 } 117 118 @Override 119 public String toString() { 120 return "VirtualDestinationInterceptor" + Arrays.asList(virtualDestinations); 121 } 122}