001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.vm; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.net.URI; 022import java.security.cert.X509Certificate; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.apache.activemq.command.ShutdownInfo; 030import org.apache.activemq.thread.Task; 031import org.apache.activemq.thread.TaskRunner; 032import org.apache.activemq.thread.TaskRunnerFactory; 033import org.apache.activemq.transport.FutureResponse; 034import org.apache.activemq.transport.ResponseCallback; 035import org.apache.activemq.transport.Transport; 036import org.apache.activemq.transport.TransportDisposedIOException; 037import org.apache.activemq.transport.TransportListener; 038import org.apache.activemq.wireformat.WireFormat; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * A Transport implementation that uses direct method invocations. 044 */ 045public class VMTransport implements Transport, Task { 046 protected static final Logger LOG = LoggerFactory.getLogger(VMTransport.class); 047 048 private static final AtomicLong NEXT_ID = new AtomicLong(0); 049 050 // Transport Configuration 051 protected VMTransport peer; 052 protected TransportListener transportListener; 053 protected boolean marshal; 054 protected boolean async = true; 055 protected int asyncQueueDepth = 2000; 056 protected final URI location; 057 protected final long id; 058 059 // Implementation 060 private volatile LinkedBlockingQueue<Object> messageQueue; 061 private volatile TaskRunnerFactory taskRunnerFactory; 062 private volatile TaskRunner taskRunner; 063 064 // Transport State 065 protected final AtomicBoolean started = new AtomicBoolean(); 066 protected final AtomicBoolean disposed = new AtomicBoolean(); 067 068 private volatile int receiveCounter; 069 070 public VMTransport(URI location) { 071 this.location = location; 072 this.id = NEXT_ID.getAndIncrement(); 073 } 074 075 public void setPeer(VMTransport peer) { 076 this.peer = peer; 077 } 078 079 @Override 080 public void oneway(Object command) throws IOException { 081 082 if (disposed.get()) { 083 throw new TransportDisposedIOException("Transport disposed."); 084 } 085 086 if (peer == null) { 087 throw new IOException("Peer not connected."); 088 } 089 090 try { 091 092 if (peer.disposed.get()) { 093 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); 094 } 095 096 if (peer.async) { 097 peer.getMessageQueue().put(command); 098 peer.wakeup(); 099 return; 100 } 101 102 if (!peer.started.get()) { 103 LinkedBlockingQueue<Object> pending = peer.getMessageQueue(); 104 int sleepTimeMillis; 105 boolean accepted = false; 106 do { 107 sleepTimeMillis = 0; 108 // the pending queue is drained on start so we need to ensure we add before 109 // the drain commences, otherwise we never get the command dispatched! 110 synchronized (peer.started) { 111 if (!peer.started.get()) { 112 accepted = pending.offer(command); 113 if (!accepted) { 114 sleepTimeMillis = 500; 115 } 116 } 117 } 118 // give start thread a chance if we will loop 119 TimeUnit.MILLISECONDS.sleep(sleepTimeMillis); 120 121 } while (!accepted && !peer.started.get()); 122 if (accepted) { 123 return; 124 } 125 } 126 } catch (InterruptedException e) { 127 Thread.currentThread().interrupt(); 128 InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); 129 iioe.initCause(e); 130 throw iioe; 131 } 132 133 dispatch(peer, peer.messageQueue, command); 134 } 135 136 public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) { 137 TransportListener transportListener = transport.getTransportListener(); 138 if (transportListener != null) { 139 // Lock here on the target transport's started since we want to wait for its start() 140 // method to finish dispatching out of the queue before we do our own. 141 synchronized (transport.started) { 142 143 // Ensure that no additional commands entered the queue in the small time window 144 // before the start method locks the dispatch lock and the oneway method was in 145 // an put operation. 146 while(pending != null && !pending.isEmpty() && !transport.isDisposed()) { 147 doDispatch(transport, transportListener, pending.poll()); 148 } 149 150 // We are now in sync mode and won't enqueue any more commands to the target 151 // transport so lets clean up its resources. 152 transport.messageQueue = null; 153 154 // Don't dispatch if either end was disposed already. 155 if (command != null && !this.disposed.get() && !transport.isDisposed()) { 156 doDispatch(transport, transportListener, command); 157 } 158 } 159 } 160 } 161 162 public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) { 163 transport.receiveCounter++; 164 transportListener.onCommand(command); 165 } 166 167 @Override 168 public void start() throws Exception { 169 170 if (transportListener == null) { 171 throw new IOException("TransportListener not set."); 172 } 173 174 // If we are not in async mode we lock the dispatch lock here and then start to 175 // prevent any sync dispatches from occurring until we dispatch the pending messages 176 // to maintain delivery order. When async this happens automatically so just set 177 // started and wakeup the task runner. 178 if (!async) { 179 synchronized (started) { 180 if (started.compareAndSet(false, true)) { 181 LinkedBlockingQueue<Object> mq = getMessageQueue(); 182 Object command; 183 while ((command = mq.poll()) != null && !disposed.get() ) { 184 receiveCounter++; 185 doDispatch(this, transportListener, command); 186 } 187 } 188 } 189 } else { 190 if (started.compareAndSet(false, true)) { 191 wakeup(); 192 } 193 } 194 } 195 196 @Override 197 public void stop() throws Exception { 198 // Only need to do this once, all future oneway calls will now 199 // fail as will any asnyc jobs in the task runner. 200 if (disposed.compareAndSet(false, true)) { 201 202 TaskRunner tr = taskRunner; 203 LinkedBlockingQueue<Object> mq = this.messageQueue; 204 205 taskRunner = null; 206 messageQueue = null; 207 208 if (mq != null) { 209 mq.clear(); 210 } 211 212 // don't wait for completion 213 if (tr != null) { 214 try { 215 tr.shutdown(1); 216 } catch(Exception e) { 217 } 218 tr = null; 219 } 220 221 if (peer.transportListener != null) { 222 // let the peer know that we are disconnecting after attempting 223 // to cleanly shutdown the async tasks so that this is the last 224 // command it see's. 225 try { 226 peer.transportListener.onCommand(new ShutdownInfo()); 227 } catch (Exception ignore) { 228 } 229 230 // let any requests pending a response see an exception 231 try { 232 peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped.")); 233 } catch (Exception ignore) { 234 } 235 } 236 237 // shutdown task runner factory 238 if (taskRunnerFactory != null) { 239 taskRunnerFactory.shutdownNow(); 240 taskRunnerFactory = null; 241 } 242 } 243 } 244 245 protected void wakeup() { 246 if (async && started.get()) { 247 try { 248 getTaskRunner().wakeup(); 249 } catch (InterruptedException e) { 250 Thread.currentThread().interrupt(); 251 } catch (TransportDisposedIOException e) { 252 } 253 } 254 } 255 256 /** 257 * @see org.apache.activemq.thread.Task#iterate() 258 */ 259 @Override 260 public boolean iterate() { 261 262 final TransportListener tl = transportListener; 263 264 LinkedBlockingQueue<Object> mq; 265 try { 266 mq = getMessageQueue(); 267 } catch (TransportDisposedIOException e) { 268 return false; 269 } 270 271 Object command = mq.poll(); 272 if (command != null && !disposed.get()) { 273 tl.onCommand(command); 274 return !mq.isEmpty() && !disposed.get(); 275 } else { 276 if(disposed.get()) { 277 mq.clear(); 278 } 279 return false; 280 } 281 } 282 283 @Override 284 public void setTransportListener(TransportListener commandListener) { 285 this.transportListener = commandListener; 286 } 287 288 public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException { 289 LinkedBlockingQueue<Object> result = messageQueue; 290 if (result == null) { 291 synchronized (this) { 292 result = messageQueue; 293 if (result == null) { 294 if (disposed.get()) { 295 throw new TransportDisposedIOException("The Transport has been disposed"); 296 } 297 298 messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth); 299 } 300 } 301 } 302 return result; 303 } 304 305 protected TaskRunner getTaskRunner() throws TransportDisposedIOException { 306 TaskRunner result = taskRunner; 307 if (result == null) { 308 synchronized (this) { 309 result = taskRunner; 310 if (result == null) { 311 if (disposed.get()) { 312 throw new TransportDisposedIOException("The Transport has been disposed"); 313 } 314 315 String name = "ActiveMQ VMTransport: " + toString(); 316 if (taskRunnerFactory == null) { 317 taskRunnerFactory = new TaskRunnerFactory(name); 318 taskRunnerFactory.init(); 319 } 320 taskRunner = result = taskRunnerFactory.createTaskRunner(this, name); 321 } 322 } 323 } 324 return result; 325 } 326 327 @Override 328 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 329 throw new AssertionError("Unsupported Method"); 330 } 331 332 @Override 333 public Object request(Object command) throws IOException { 334 throw new AssertionError("Unsupported Method"); 335 } 336 337 @Override 338 public Object request(Object command, int timeout) throws IOException { 339 throw new AssertionError("Unsupported Method"); 340 } 341 342 @Override 343 public TransportListener getTransportListener() { 344 return transportListener; 345 } 346 347 @Override 348 public <T> T narrow(Class<T> target) { 349 if (target.isAssignableFrom(getClass())) { 350 return target.cast(this); 351 } 352 return null; 353 } 354 355 public boolean isMarshal() { 356 return marshal; 357 } 358 359 public void setMarshal(boolean marshal) { 360 this.marshal = marshal; 361 } 362 363 @Override 364 public String toString() { 365 return location + "#" + id; 366 } 367 368 @Override 369 public String getRemoteAddress() { 370 if (peer != null) { 371 return peer.toString(); 372 } 373 return null; 374 } 375 376 /** 377 * @return the async 378 */ 379 public boolean isAsync() { 380 return async; 381 } 382 383 /** 384 * @param async the async to set 385 */ 386 public void setAsync(boolean async) { 387 this.async = async; 388 } 389 390 /** 391 * @return the asyncQueueDepth 392 */ 393 public int getAsyncQueueDepth() { 394 return asyncQueueDepth; 395 } 396 397 /** 398 * @param asyncQueueDepth the asyncQueueDepth to set 399 */ 400 public void setAsyncQueueDepth(int asyncQueueDepth) { 401 this.asyncQueueDepth = asyncQueueDepth; 402 } 403 404 @Override 405 public boolean isFaultTolerant() { 406 return false; 407 } 408 409 @Override 410 public boolean isDisposed() { 411 return disposed.get(); 412 } 413 414 @Override 415 public boolean isConnected() { 416 return !disposed.get(); 417 } 418 419 @Override 420 public void reconnect(URI uri) throws IOException { 421 throw new IOException("Transport reconnect is not supported"); 422 } 423 424 @Override 425 public boolean isReconnectSupported() { 426 return false; 427 } 428 429 @Override 430 public boolean isUpdateURIsSupported() { 431 return false; 432 } 433 434 @Override 435 public void updateURIs(boolean reblance,URI[] uris) throws IOException { 436 throw new IOException("URI update feature not supported"); 437 } 438 439 @Override 440 public int getReceiveCounter() { 441 return receiveCounter; 442 } 443 444 @Override 445 public X509Certificate[] getPeerCertificates() { 446 return null; 447 } 448 449 @Override 450 public void setPeerCertificates(X509Certificate[] certificates) { 451 452 } 453 454 @Override 455 public WireFormat getWireFormat() { 456 return null; 457 } 458}