001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.group;
018
019import java.util.HashMap;
020import java.util.Iterator;
021import java.util.Map;
022
023import org.apache.activemq.broker.region.Destination;
024import org.apache.activemq.broker.region.Subscription;
025import org.apache.activemq.command.ConsumerId;
026import org.apache.activemq.memory.LRUMap;
027
028/**
029 * A simple implementation which tracks every individual GroupID value in a LRUCache
030 * 
031 * 
032 */
033public class CachedMessageGroupMap implements MessageGroupMap {
034    private final LRUMap<String, ConsumerId> cache;
035    private final int maximumCacheSize;
036    Destination destination;
037
038    CachedMessageGroupMap(int size){
039      cache = new LRUMap<String, ConsumerId>(size) {
040          @Override
041          public boolean removeEldestEntry(final Map.Entry eldest) {
042              boolean remove = super.removeEldestEntry(eldest);
043              if (remove) {
044                  if (destination != null) {
045                      for (Subscription s : destination.getConsumers()) {
046                        if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue())) {
047                            s.getConsumerInfo().decrementAssignedGroupCount(destination.getActiveMQDestination());
048                            break;
049                          }
050                      }
051                  }
052              }
053              return remove;
054          }
055      };
056      maximumCacheSize = size;
057    }
058    public synchronized void put(String groupId, ConsumerId consumerId) {
059        cache.put(groupId, consumerId);
060    }
061
062    public synchronized ConsumerId get(String groupId) {
063        return cache.get(groupId);
064    }
065
066    public synchronized ConsumerId removeGroup(String groupId) {
067        return cache.remove(groupId);
068    }
069
070    public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
071        SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet();
072        Map<String,ConsumerId> map = new HashMap<String, ConsumerId>();
073        map.putAll(cache);
074        for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) {
075            String group = iter.next();
076            ConsumerId owner = map.get(group);
077            if (owner.equals(consumerId)) {
078                ownedGroups.add(group);
079            }
080        }
081        for (String group:ownedGroups.getUnderlyingSet()){
082            cache.remove(group);
083        }
084        return ownedGroups;
085    }
086
087
088    @Override
089    public synchronized void removeAll(){
090        cache.clear();
091        if (destination != null) {
092            for (Subscription s : destination.getConsumers()) {
093                s.getConsumerInfo().clearAssignedGroupCount(destination.getActiveMQDestination());
094            }
095        }
096    }
097
098    @Override
099    public synchronized Map<String, String> getGroups() {
100        Map<String,String> result = new HashMap<String,String>();
101        for (Map.Entry<String,ConsumerId>entry: cache.entrySet()){
102            result.put(entry.getKey(),entry.getValue().toString());
103        }
104        return result;
105    }
106
107    @Override
108    public String getType() {
109        return "cached";
110    }
111
112    public int getMaximumCacheSize(){
113        return maximumCacheSize;
114    }
115
116    public String toString() {
117        return "message groups: " + cache.size();
118    }
119
120    public void setDestination(Destination destination) {
121        this.destination = destination;
122    }
123}