001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.List;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.locks.Lock;
024import java.util.concurrent.locks.ReentrantLock;
025
026import javax.jms.JMSException;
027import javax.jms.ServerSession;
028import javax.jms.ServerSessionPool;
029import javax.jms.Session;
030import javax.resource.spi.UnavailableException;
031import javax.resource.spi.endpoint.MessageEndpoint;
032
033import org.apache.activemq.ActiveMQConnection;
034import org.apache.activemq.ActiveMQSession;
035import org.apache.activemq.command.MessageDispatch;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 *  $Date$
041 */
042public class ServerSessionPoolImpl implements ServerSessionPool {
043
044    private static final Logger LOG = LoggerFactory.getLogger(ServerSessionPoolImpl.class);
045
046    private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
047    private final int maxSessions;
048
049    private final List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>();
050    private final List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>();
051    private final Lock sessionLock = new ReentrantLock();
052    private final AtomicBoolean closing = new AtomicBoolean(false);
053
054    public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) {
055        this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
056        this.maxSessions = maxSessions;
057    }
058
059    private ServerSessionImpl createServerSessionImpl() throws JMSException {
060        MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
061        int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
062        final ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection();
063        if (connection == null) {
064            // redispatch of pending prefetched messages after disconnect can have a null connection
065            return null;
066        }
067        final ActiveMQSession session = (ActiveMQSession)connection.createSession(activeMQAsfEndpointWorker.transacted, acknowledge);
068        MessageEndpoint endpoint;
069        try {
070            int batchSize = 0;
071            if (activationSpec.getEnableBatchBooleanValue()) {
072                batchSize = activationSpec.getMaxMessagesPerBatchIntValue();
073            }
074            if (activationSpec.isUseRAManagedTransactionEnabled()) {
075                // The RA will manage the transaction commit.
076                endpoint = createEndpoint(null);
077                return new ServerSessionImpl(this, session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize);
078            } else {
079                // Give the container an object to manage to transaction with.
080                endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext()));
081                return new ServerSessionImpl(this, session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize);
082            }
083        } catch (UnavailableException e) {
084            // The container could be limiting us on the number of endpoints
085            // that are being created.
086            if (LOG.isDebugEnabled()) {
087                LOG.debug("Could not create an endpoint.", e);
088            }
089            session.close();
090            return null;
091        }
092    }
093
094    private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException {
095        MessageEndpoint endpoint;
096        endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy);
097        MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint);
098        return endpointProxy;
099    }
100
101    /**
102     */
103    @Override
104    public ServerSession getServerSession() throws JMSException {
105        if (LOG.isDebugEnabled()) {
106            LOG.debug("ServerSession requested.");
107        }
108        if (closing.get()) {
109            throw new JMSException("Session Pool Shutting Down.");
110        }
111        ServerSessionImpl ss = null;
112        sessionLock.lock();
113        try {
114            ss = getExistingServerSession(false);
115        } finally {
116            sessionLock.unlock();
117        }
118        if (ss != null) {
119            return ss;
120        }
121        ss = createServerSessionImpl();
122        sessionLock.lock();
123        try {
124            // We may not be able to create a session due to the container
125            // restricting us.
126            if (ss == null) {
127                if (activeSessions.isEmpty() && idleSessions.isEmpty()) {
128                    throw new JMSException("Endpoint factory did not allow creation of any endpoints.");
129                }
130
131                ss = getExistingServerSession(true);
132            } else {
133                activeSessions.add(ss);
134            }
135        } finally {
136            sessionLock.unlock();
137        }
138        if (LOG.isDebugEnabled()) {
139            LOG.debug("Created a new session: " + ss);
140        }
141        return ss;
142
143    }
144
145    /**
146     * Must be called with sessionLock held.
147     * Returns an idle session if one exists or an active session if no more
148     * sessions can be created.  Sessions can not be created if force is true
149     * or activeSessions >= maxSessions.
150     * @param force do not check activeSessions >= maxSessions, return an active connection anyway.
151     * @return an already existing session.
152     */
153    private ServerSessionImpl getExistingServerSession(boolean force) {
154        ServerSessionImpl ss = null;
155        if (idleSessions.size() > 0) {
156            ss = idleSessions.remove(idleSessions.size() - 1);
157        }
158        if (ss != null) {
159            activeSessions.add(ss);
160            if (LOG.isDebugEnabled()) {
161                LOG.debug("Using idle session: " + ss);
162            }
163        } else if (force || activeSessions.size() >= maxSessions) {
164            // If we are at the upper limit
165            // then reuse the already created sessions..
166            // This is going to queue up messages into a session for
167            // processing.
168            ss = getExistingActiveServerSession();
169        }
170        return ss;
171    }
172
173    /**
174     * Must be called with sessionLock held.
175     * Returns the first session from activeSessions, shifting it to last.
176     * @return session
177     */
178    private ServerSessionImpl getExistingActiveServerSession() {
179        ServerSessionImpl ss = null;
180        if (!activeSessions.isEmpty()) {
181            if (activeSessions.size() > 1) {
182                // round robin
183                ss = activeSessions.remove(0);
184                activeSessions.add(ss);
185            } else {
186                ss = activeSessions.get(0);
187            }
188        }
189        if (LOG.isDebugEnabled()) {
190            LOG.debug("Reusing an active session: " + ss);
191        }
192        return ss;
193    }
194
195    public void returnToPool(ServerSessionImpl ss) {
196        sessionLock.lock();
197            activeSessions.remove(ss);
198        try {
199            // make sure we only return non-stale sessions to the pool
200            if ( ss.isStale() ) {
201                if ( LOG.isDebugEnabled() ) {
202                    LOG.debug("Discarding stale ServerSession to be returned to pool: " + ss);
203                }
204                ss.close();
205            } else {
206                if (LOG.isDebugEnabled()) {
207                    LOG.debug("ServerSession returned to pool: " + ss);
208                }
209            idleSessions.add(ss);
210            }
211        } finally {
212            sessionLock.unlock();
213        }
214        synchronized (closing) {
215            closing.notify();
216        }
217    }
218
219    public void removeFromPool(ServerSessionImpl ss) {
220        sessionLock.lock();
221        try {
222            activeSessions.remove(ss);
223        } finally {
224            sessionLock.unlock();
225        }
226        try {
227            ActiveMQSession session = (ActiveMQSession)ss.getSession();
228            List<MessageDispatch> l = session.getUnconsumedMessages();
229            if (!l.isEmpty()) {
230                ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection();
231                if (connection != null) {
232                    for (Iterator<MessageDispatch> i = l.iterator(); i.hasNext();) {
233                        MessageDispatch md = i.next();
234                        if (connection.hasDispatcher(md.getConsumerId())) {
235                            dispatchToSession(md);
236                            LOG.trace("on remove of {} redispatch of {}", session, md);
237                        } else {
238                            LOG.trace("on remove not redispatching {}, dispatcher no longer present on {}", md, session.getConnection());
239                        }
240                    }
241                } else {
242                    LOG.trace("on remove of {} not redispatching while disconnected", session);
243                }
244            }
245        } catch (Throwable t) {
246            LOG.error("Error redispatching unconsumed messages from stale server session {}", ss, t);
247        }
248        ss.close();
249        synchronized (closing) {
250            closing.notify();
251        }
252    }
253
254    /**
255     * @param messageDispatch
256     *            the message to dispatch
257     * @throws JMSException
258     */
259    private void dispatchToSession(MessageDispatch messageDispatch)
260            throws JMSException {
261
262        ServerSession serverSession = getServerSession();
263        Session s = serverSession.getSession();
264        ActiveMQSession session = null;
265        if (s instanceof ActiveMQSession) {
266            session = (ActiveMQSession) s;
267        } else {
268            activeMQAsfEndpointWorker.getConnection()
269                    .onAsyncException(new JMSException(
270                            "Session pool provided an invalid session type: "
271                                    + s.getClass()));
272        }
273        session.dispatch(messageDispatch);
274        serverSession.start();
275    }
276
277    public void close() {
278        closing.set(true);
279        int activeCount = closeSessions();
280        // we may have to wait erroneously 250ms if an
281        // active session is removed during our wait and we
282        // are not notified
283        while (activeCount > 0) {
284            if (LOG.isDebugEnabled()) {
285                LOG.debug("Active Sessions = " + activeCount);
286            }
287            try {
288                synchronized (closing) {
289                    closing.wait(250);
290                }
291            } catch (InterruptedException e) {
292                Thread.currentThread().interrupt();
293                return;
294            }
295            activeCount = closeSessions();
296        }
297    }
298
299
300    protected int closeSessions() {
301        sessionLock.lock();
302        try {
303            for (ServerSessionImpl ss : activeSessions) {
304                try {
305                    ActiveMQSession session = (ActiveMQSession) ss.getSession();
306                    if (!session.isClosed()) {
307                        session.close();
308                    }
309                } catch (JMSException ignored) {
310                    if (LOG.isDebugEnabled()) {
311                        LOG.debug("Failed to close active running server session {}, reason:{}", ss, ignored.toString(), ignored);
312                    }
313                }
314            }
315            for (ServerSessionImpl ss : idleSessions) {
316                ss.close();
317            }
318            idleSessions.clear();
319            return activeSessions.size();
320        } finally {
321            sessionLock.unlock();
322        }
323    }
324
325    /**
326     * @return Returns the closing.
327     */
328    public boolean isClosing() {
329        return closing.get();
330    }
331
332    /**
333     * @param closing The closing to set.
334     */
335    public void setClosing(boolean closing) {
336        this.closing.set(closing);
337    }
338
339}