001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.broker.scheduler.memory; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Date; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.Timer; 028import java.util.TimerTask; 029import java.util.TreeMap; 030import java.util.concurrent.CopyOnWriteArrayList; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.locks.ReentrantReadWriteLock; 033 034import javax.jms.MessageFormatException; 035 036import org.apache.activemq.broker.scheduler.CronParser; 037import org.apache.activemq.broker.scheduler.Job; 038import org.apache.activemq.broker.scheduler.JobListener; 039import org.apache.activemq.broker.scheduler.JobScheduler; 040import org.apache.activemq.broker.scheduler.JobSupport; 041import org.apache.activemq.util.ByteSequence; 042import org.apache.activemq.util.IdGenerator; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Implements an in-memory JobScheduler instance. 048 */ 049public class InMemoryJobScheduler implements JobScheduler { 050 051 private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobScheduler.class); 052 053 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 054 055 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 056 private final String name; 057 private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<>(); 058 private final AtomicBoolean started = new AtomicBoolean(false); 059 private final AtomicBoolean dispatchEnabled = new AtomicBoolean(false); 060 private final List<JobListener> jobListeners = new CopyOnWriteArrayList<>(); 061 private final Timer timer = new Timer(); 062 063 public InMemoryJobScheduler(String name) { 064 this.name = name; 065 } 066 067 @Override 068 public String getName() throws Exception { 069 return name; 070 } 071 072 public void start() throws Exception { 073 if (started.compareAndSet(false, true)) { 074 startDispatching(); 075 LOG.trace("JobScheduler[{}] started", name); 076 } 077 } 078 079 public void stop() throws Exception { 080 if (started.compareAndSet(true, false)) { 081 stopDispatching(); 082 timer.cancel(); 083 jobs.clear(); 084 LOG.trace("JobScheduler[{}] stopped", name); 085 } 086 } 087 088 public boolean isStarted() { 089 return started.get(); 090 } 091 092 public boolean isDispatchEnabled() { 093 return dispatchEnabled.get(); 094 } 095 096 @Override 097 public void startDispatching() throws Exception { 098 dispatchEnabled.set(true); 099 } 100 101 @Override 102 public void stopDispatching() throws Exception { 103 dispatchEnabled.set(false); 104 } 105 106 @Override 107 public void addListener(JobListener listener) throws Exception { 108 this.jobListeners.add(listener); 109 } 110 111 @Override 112 public void removeListener(JobListener listener) throws Exception { 113 this.jobListeners.remove(listener); 114 } 115 116 @Override 117 public void schedule(String jobId, ByteSequence payload, long delay) throws Exception { 118 doSchedule(jobId, payload, "", 0, delay, 0); 119 } 120 121 @Override 122 public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception { 123 doSchedule(jobId, payload, cronEntry, 0, 0, 0); 124 } 125 126 @Override 127 public void schedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws Exception { 128 doSchedule(jobId, payload, cronEntry, delay, period, repeat); 129 } 130 131 @Override 132 public void remove(long time) throws Exception { 133 doRemoveRange(time, time); 134 } 135 136 @Override 137 public void remove(String jobId) throws Exception { 138 doRemoveJob(jobId); 139 } 140 141 @Override 142 public void removeAllJobs() throws Exception { 143 doRemoveRange(0, Long.MAX_VALUE); 144 } 145 146 @Override 147 public void removeAllJobs(long start, long finish) throws Exception { 148 doRemoveRange(start, finish); 149 } 150 151 @Override 152 public long getNextScheduleTime() throws Exception { 153 long nextExecutionTime = -1L; 154 155 lock.readLock().lock(); 156 try { 157 if (!jobs.isEmpty()) { 158 nextExecutionTime = jobs.entrySet().iterator().next().getKey(); 159 } 160 } finally { 161 lock.readLock().unlock(); 162 } 163 return nextExecutionTime; 164 } 165 166 @Override 167 public List<Job> getNextScheduleJobs() throws Exception { 168 List<Job> result = new ArrayList<>(); 169 lock.readLock().lock(); 170 try { 171 if (!jobs.isEmpty()) { 172 result.addAll(jobs.entrySet().iterator().next().getValue().getAllJobs()); 173 } 174 } finally { 175 lock.readLock().unlock(); 176 } 177 return result; 178 } 179 180 @Override 181 public List<Job> getAllJobs() throws Exception { 182 final List<Job> result = new ArrayList<>(); 183 this.lock.readLock().lock(); 184 try { 185 for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) { 186 result.addAll(entry.getValue().getAllJobs()); 187 } 188 } finally { 189 this.lock.readLock().unlock(); 190 } 191 192 return result; 193 } 194 195 @Override 196 public List<Job> getAllJobs(long start, long finish) throws Exception { 197 final List<Job> result = new ArrayList<>(); 198 this.lock.readLock().lock(); 199 try { 200 for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) { 201 long jobTime = entry.getKey(); 202 if (start <= jobTime && jobTime <= finish) { 203 result.addAll(entry.getValue().getAllJobs()); 204 } 205 } 206 } finally { 207 this.lock.readLock().unlock(); 208 } 209 return result; 210 } 211 212 @Override 213 public int hashCode() { 214 return name.hashCode(); 215 } 216 217 @Override 218 public String toString() { 219 return "JobScheduler: " + name; 220 } 221 222 private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException { 223 long startTime = System.currentTimeMillis(); 224 long executionTime = 0; 225 // round startTime - so we can schedule more jobs at the same time 226 startTime = ((startTime + 500) / 500) * 500; 227 228 if (cronEntry != null && cronEntry.length() > 0) { 229 try { 230 executionTime = CronParser.getNextScheduledTime(cronEntry, startTime); 231 } catch (MessageFormatException e) { 232 throw new IOException(e.getMessage()); 233 } 234 } 235 236 if (executionTime == 0) { 237 // start time not set by CRON - so it it to the current time 238 executionTime = startTime; 239 } 240 241 if (delay > 0) { 242 executionTime += delay; 243 } else { 244 executionTime += period; 245 } 246 247 InMemoryJob newJob = new InMemoryJob(jobId); 248 newJob.setStart(startTime); 249 newJob.setCronEntry(cronEntry); 250 newJob.setDelay(delay); 251 newJob.setPeriod(period); 252 newJob.setRepeat(repeat); 253 newJob.setNextTime(executionTime); 254 newJob.setPayload(payload.getData()); 255 256 LOG.trace("JobScheduler adding job[{}] to fire at: {}", jobId, JobSupport.getDateTime(executionTime)); 257 258 lock.writeLock().lock(); 259 try { 260 ScheduledTask task = jobs.get(executionTime); 261 if (task == null) { 262 task = new ScheduledTask(executionTime); 263 task.add(newJob); 264 jobs.put(task.getExecutionTime(), task); 265 timer.schedule(task, new Date(newJob.getNextTime())); 266 } else { 267 task.add(newJob); 268 } 269 } finally { 270 lock.writeLock().unlock(); 271 } 272 } 273 274 private void doReschedule(InMemoryJob job, long nextExecutionTime) { 275 job.setNextTime(nextExecutionTime); 276 job.incrementExecutionCount(); 277 if (!job.isCron()) { 278 job.decrementRepeatCount(); 279 } 280 281 LOG.trace("JobScheduler rescheduling job[{}] to fire at: {}", job.getJobId(), JobSupport.getDateTime(nextExecutionTime)); 282 283 lock.writeLock().lock(); 284 try { 285 ScheduledTask task = jobs.get(nextExecutionTime); 286 if (task == null) { 287 task = new ScheduledTask(nextExecutionTime); 288 task.add(job); 289 jobs.put(task.getExecutionTime(), task); 290 timer.schedule(task, new Date(task.getExecutionTime())); 291 } else { 292 task.add(job); 293 } 294 } finally { 295 lock.writeLock().unlock(); 296 } 297 } 298 299 private void doRemoveJob(String jobId) throws IOException { 300 this.lock.writeLock().lock(); 301 try { 302 Iterator<Map.Entry<Long, ScheduledTask>> scheduled = jobs.entrySet().iterator(); 303 while (scheduled.hasNext()) { 304 Map.Entry<Long, ScheduledTask> entry = scheduled.next(); 305 ScheduledTask task = entry.getValue(); 306 if (task.remove(jobId)) { 307 LOG.trace("JobScheduler removing job[{}]", jobId); 308 if (task.isEmpty()) { 309 task.cancel(); 310 scheduled.remove(); 311 } 312 return; 313 } 314 } 315 } finally { 316 this.lock.writeLock().unlock(); 317 } 318 } 319 320 private void doRemoveRange(long start, long end) throws IOException { 321 this.lock.writeLock().lock(); 322 try { 323 Iterator<Map.Entry<Long, ScheduledTask>> scheduled = jobs.entrySet().iterator(); 324 while (scheduled.hasNext()) { 325 Map.Entry<Long, ScheduledTask> entry = scheduled.next(); 326 long executionTime = entry.getKey(); 327 if (start <= executionTime && executionTime <= end) { 328 ScheduledTask task = entry.getValue(); 329 task.cancel(); 330 scheduled.remove(); 331 } 332 333 // Don't look beyond the end range. 334 if (end < executionTime) { 335 break; 336 } 337 } 338 } finally { 339 this.lock.writeLock().unlock(); 340 } 341 } 342 343 private boolean canDispatch() { 344 return isStarted() && isDispatchEnabled(); 345 } 346 347 private long calculateNextExecutionTime(InMemoryJob job, long currentTime, int repeat) throws MessageFormatException { 348 long result = currentTime; 349 String cron = job.getCronEntry(); 350 if (cron != null && cron.length() > 0) { 351 result = CronParser.getNextScheduledTime(cron, result); 352 } else if (job.getRepeat() != 0) { 353 result += job.getPeriod(); 354 } 355 return result; 356 } 357 358 private void dispatch(InMemoryJob job) throws IllegalStateException, IOException { 359 if (canDispatch()) { 360 LOG.debug("Firing: {}", job); 361 for (JobListener l : jobListeners) { 362 l.scheduledJob(job.getJobId(), new ByteSequence(job.getPayload())); 363 } 364 } 365 } 366 367 /* 368 * A TimerTask instance that can aggregate the execution of a number 369 * scheduled Jobs and handle rescheduling the jobs that require it. 370 */ 371 private class ScheduledTask extends TimerTask { 372 373 private final Map<String, InMemoryJob> jobs = new TreeMap<>(); 374 private final long executionTime; 375 376 public ScheduledTask(long executionTime) { 377 this.executionTime = executionTime; 378 } 379 380 public long getExecutionTime() { 381 return executionTime; 382 } 383 384 /** 385 * @return a Collection containing all the managed jobs for this task. 386 */ 387 public Collection<InMemoryJob> getAllJobs() { 388 return new ArrayList<>(jobs.values()); 389 } 390 391 /** 392 * @return true if the internal list of jobs has become empty. 393 */ 394 public boolean isEmpty() { 395 return jobs.isEmpty(); 396 } 397 398 /** 399 * Adds the job to the internal list of scheduled Jobs managed by this task. 400 * 401 * @param newJob 402 * the new job to add to the list of Jobs. 403 */ 404 public void add(InMemoryJob newJob) { 405 this.jobs.put(newJob.getJobId(), newJob); 406 } 407 408 /** 409 * Removes the job from the internal list of scheduled Jobs managed by this task. 410 * 411 * @param jobId 412 * the job ID to remove from the list of Jobs. 413 * 414 * @return true if the job was removed from the list of managed jobs. 415 */ 416 public boolean remove(String jobId) { 417 return jobs.remove(jobId) != null; 418 } 419 420 @Override 421 public void run() { 422 if (!isStarted()) { 423 return; 424 } 425 426 try { 427 long currentTime = System.currentTimeMillis(); 428 lock.writeLock().lock(); 429 try { 430 // Remove this entry as it will now fire any scheduled jobs, if new 431 // jobs or rescheduled jobs land in the same time slot we want them 432 // to go into a new ScheduledTask in the Timer instance. 433 InMemoryJobScheduler.this.jobs.remove(executionTime); 434 } finally { 435 lock.writeLock().unlock(); 436 } 437 438 long nextExecutionTime = 0; 439 440 for (InMemoryJob job : jobs.values()) { 441 442 if (!isStarted()) { 443 break; 444 } 445 446 int repeat = job.getRepeat(); 447 nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat); 448 if (!job.isCron()) { 449 dispatch(job); 450 if (repeat != 0) { 451 // Reschedule for the next time, the scheduler will take care of 452 // updating the repeat counter on the update. 453 doReschedule(job, nextExecutionTime); 454 } 455 } else { 456 if (repeat == 0) { 457 // This is a non-repeating Cron entry so we can fire and forget it. 458 dispatch(job); 459 } 460 461 if (nextExecutionTime > currentTime) { 462 // Reschedule the cron job as a new event, if the cron entry signals 463 // a repeat then it will be stored separately and fired as a normal 464 // event with decrementing repeat. 465 doReschedule(job, nextExecutionTime); 466 467 if (repeat != 0) { 468 // we have a separate schedule to run at this time 469 // so the cron job is used to set of a separate schedule 470 // hence we won't fire the original cron job to the 471 // listeners but we do need to start a separate schedule 472 String jobId = ID_GENERATOR.generateId(); 473 ByteSequence payload = new ByteSequence(job.getPayload()); 474 schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()); 475 } 476 } 477 } 478 } 479 } catch (Throwable e) { 480 LOG.error("Error while processing scheduled job(s).", e); 481 } 482 } 483 } 484}