001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.util.HashMap; 020import java.util.HashSet; 021import java.util.Map; 022import java.util.Set; 023import java.util.Timer; 024import java.util.TimerTask; 025 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.thread.TaskRunnerFactory; 029import org.apache.activemq.usage.SystemUsage; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * 035 */ 036public abstract class AbstractTempRegion extends AbstractRegion { 037 private static final Logger LOG = LoggerFactory.getLogger(TempQueueRegion.class); 038 039 private Map<CachedDestination, Destination> cachedDestinations = new HashMap<CachedDestination, Destination>(); 040 private final boolean doCacheTempDestinations; 041 private final int purgeTime; 042 private Timer purgeTimer; 043 private TimerTask purgeTask; 044 045 046 /** 047 * @param broker 048 * @param destinationStatistics 049 * @param memoryManager 050 * @param taskRunnerFactory 051 * @param destinationFactory 052 */ 053 public AbstractTempRegion(RegionBroker broker, 054 DestinationStatistics destinationStatistics, 055 SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, 056 DestinationFactory destinationFactory) { 057 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, 058 destinationFactory); 059 this.doCacheTempDestinations=broker.getBrokerService().isCacheTempDestinations(); 060 this.purgeTime = broker.getBrokerService().getTimeBeforePurgeTempDestinations(); 061 if (this.doCacheTempDestinations) { 062 this.purgeTimer = new Timer("ActiveMQ Temp destination purge timer", true); 063 this.purgeTask = new TimerTask() { 064 public void run() { 065 doPurge(); 066 } 067 068 }; 069 this.purgeTimer.schedule(purgeTask, purgeTime, purgeTime); 070 } 071 072 } 073 074 public void stop() throws Exception { 075 super.stop(); 076 if (purgeTimer != null) { 077 purgeTimer.cancel(); 078 } 079 } 080 081 protected synchronized Destination createDestination( 082 ConnectionContext context, ActiveMQDestination destination) 083 throws Exception { 084 Destination result = cachedDestinations.remove(new CachedDestination( 085 destination)); 086 if (result == null) { 087 result = destinationFactory.createDestination(context, destination, destinationStatistics); 088 } 089 return result; 090 } 091 092 protected final synchronized void dispose(ConnectionContext context, 093 Destination dest) throws Exception { 094 // add to cache 095 if (this.doCacheTempDestinations) { 096 cachedDestinations.put(new CachedDestination(dest 097 .getActiveMQDestination()), dest); 098 }else { 099 try { 100 dest.dispose(context); 101 dest.stop(); 102 } catch (Exception e) { 103 LOG.warn("Failed to dispose of {}", dest, e); 104 } 105 } 106 } 107 108 private void doDispose(Destination dest) { 109 ConnectionContext context = new ConnectionContext(); 110 try { 111 dest.dispose(context); 112 dest.stop(); 113 } catch (Exception e) { 114 LOG.warn("Failed to dispose of {}", dest, e); 115 } 116 117 } 118 119 private synchronized void doPurge() { 120 long currentTime = System.currentTimeMillis(); 121 if (cachedDestinations.size() > 0) { 122 Set<CachedDestination> tmp = new HashSet<CachedDestination>( 123 cachedDestinations.keySet()); 124 for (CachedDestination key : tmp) { 125 if ((key.timeStamp + purgeTime) < currentTime) { 126 Destination dest = cachedDestinations.remove(key); 127 if (dest != null) { 128 doDispose(dest); 129 } 130 } 131 } 132 } 133 } 134 135 static class CachedDestination { 136 long timeStamp; 137 138 ActiveMQDestination destination; 139 140 CachedDestination(ActiveMQDestination destination) { 141 this.destination = destination; 142 this.timeStamp = System.currentTimeMillis(); 143 } 144 145 public int hashCode() { 146 return destination.hashCode(); 147 } 148 149 public boolean equals(Object o) { 150 if (o instanceof CachedDestination) { 151 CachedDestination other = (CachedDestination) o; 152 return other.destination.equals(this.destination); 153 } 154 return false; 155 } 156 157 } 158 159}