001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.store.kahadb;
019
020import java.io.File;
021import java.io.IOException;
022import java.util.Date;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.AtomicLong;
025import java.util.concurrent.locks.ReentrantReadWriteLock;
026
027import org.apache.activemq.broker.LockableServiceSupport;
028import org.apache.activemq.broker.Locker;
029import org.apache.activemq.store.SharedFileLocker;
030import org.apache.activemq.store.kahadb.data.KahaEntryType;
031import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
032import org.apache.activemq.store.kahadb.disk.journal.Journal;
033import org.apache.activemq.store.kahadb.disk.journal.Location;
034import org.apache.activemq.store.kahadb.disk.page.PageFile;
035import org.apache.activemq.store.kahadb.disk.page.Transaction;
036import org.apache.activemq.util.ByteSequence;
037import org.apache.activemq.util.DataByteArrayInputStream;
038import org.apache.activemq.util.DataByteArrayOutputStream;
039import org.apache.activemq.util.IOHelper;
040import org.apache.activemq.util.ServiceStopper;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044public abstract class AbstractKahaDBStore extends LockableServiceSupport {
045
046    static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class);
047
048    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
049    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
050
051    protected File directory;
052    protected PageFile pageFile;
053    protected Journal journal;
054    protected AtomicLong journalSize = new AtomicLong(0);
055    protected boolean failIfDatabaseIsLocked;
056    protected long checkpointInterval = 5*1000;
057    protected long cleanupInterval = 30*1000;
058    protected boolean checkForCorruptJournalFiles = false;
059    protected boolean checksumJournalFiles = true;
060    protected boolean forceRecoverIndex = false;
061    protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062    protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063    protected boolean archiveCorruptedIndex = false;
064    protected boolean enableIndexWriteAsync = false;
065    protected boolean enableJournalDiskSyncs = false;
066    protected boolean deleteAllJobs = false;
067    protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
068    protected boolean useIndexLFRUEviction = false;
069    protected float indexLFUEvictionFactor = 0.2f;
070    protected boolean ignoreMissingJournalfiles = false;
071    protected int indexCacheSize = 1000;
072    protected boolean enableIndexDiskSyncs = true;
073    protected boolean enableIndexRecoveryFile = true;
074    protected boolean enableIndexPageCaching = true;
075    protected boolean archiveDataLogs;
076    protected boolean purgeStoreOnStartup;
077    protected File directoryArchive;
078
079    protected AtomicBoolean opened = new AtomicBoolean();
080    protected Thread checkpointThread;
081    protected final Object checkpointThreadLock = new Object();
082    protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
083    protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
084
085    /**
086     * @return the name to give this store's PageFile instance.
087     */
088    protected abstract String getPageFileName();
089
090    /**
091     * @return the location of the data directory if no set by configuration.
092     */
093    protected abstract File getDefaultDataDirectory();
094
095    /**
096     * Loads the store from disk.
097     *
098     * Based on configuration this method can either load an existing store or it can purge
099     * an existing store and start in a clean state.
100     *
101     * @throws IOException if an error occurs during the load.
102     */
103    public abstract void load() throws IOException;
104
105    /**
106     * Unload the state of the Store to disk and shuts down all resources assigned to this
107     * KahaDB store implementation.
108     *
109     * @throws IOException if an error occurs during the store unload.
110     */
111    public abstract void unload() throws IOException;
112
113    @Override
114    protected void doStart() throws Exception {
115        this.indexLock.writeLock().lock();
116        if (getDirectory() == null) {
117            setDirectory(getDefaultDataDirectory());
118        }
119        IOHelper.mkdirs(getDirectory());
120        try {
121            if (isPurgeStoreOnStartup()) {
122                getJournal().start();
123                getJournal().delete();
124                getJournal().close();
125                journal = null;
126                getPageFile().delete();
127                LOG.info("{} Persistence store purged.", this);
128                setPurgeStoreOnStartup(false);
129            }
130
131            load();
132            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
133        } finally {
134            this.indexLock.writeLock().unlock();
135        }
136    }
137
138    @Override
139    protected void doStop(ServiceStopper stopper) throws Exception {
140        unload();
141    }
142
143    public PageFile getPageFile() {
144        if (pageFile == null) {
145            pageFile = createPageFile();
146        }
147        return pageFile;
148    }
149
150    public Journal getJournal() throws IOException {
151        if (journal == null) {
152            journal = createJournal();
153        }
154        return journal;
155    }
156
157    public File getDirectory() {
158        return directory;
159    }
160
161    public void setDirectory(File directory) {
162        this.directory = directory;
163    }
164
165    public boolean isArchiveCorruptedIndex() {
166        return archiveCorruptedIndex;
167    }
168
169    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
170        this.archiveCorruptedIndex = archiveCorruptedIndex;
171    }
172
173    public boolean isFailIfDatabaseIsLocked() {
174        return failIfDatabaseIsLocked;
175    }
176
177    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
178        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
179    }
180
181    public boolean isCheckForCorruptJournalFiles() {
182        return checkForCorruptJournalFiles;
183    }
184
185    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
186        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
187    }
188
189    public long getCheckpointInterval() {
190        return checkpointInterval;
191    }
192
193    public void setCheckpointInterval(long checkpointInterval) {
194        this.checkpointInterval = checkpointInterval;
195    }
196
197    public long getCleanupInterval() {
198        return cleanupInterval;
199    }
200
201    public void setCleanupInterval(long cleanupInterval) {
202        this.cleanupInterval = cleanupInterval;
203    }
204
205    public boolean isChecksumJournalFiles() {
206        return checksumJournalFiles;
207    }
208
209    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
210        this.checksumJournalFiles = checksumJournalFiles;
211    }
212
213    public boolean isForceRecoverIndex() {
214        return forceRecoverIndex;
215    }
216
217    public void setForceRecoverIndex(boolean forceRecoverIndex) {
218        this.forceRecoverIndex = forceRecoverIndex;
219    }
220
221    public int getJournalMaxFileLength() {
222        return journalMaxFileLength;
223    }
224
225    public void setJournalMaxFileLength(int journalMaxFileLength) {
226        this.journalMaxFileLength = journalMaxFileLength;
227    }
228
229    public int getJournalMaxWriteBatchSize() {
230        return journalMaxWriteBatchSize;
231    }
232
233    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
234        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
235    }
236
237    public boolean isEnableIndexWriteAsync() {
238        return enableIndexWriteAsync;
239    }
240
241    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
242        this.enableIndexWriteAsync = enableIndexWriteAsync;
243    }
244
245    public boolean isEnableJournalDiskSyncs() {
246        return enableJournalDiskSyncs;
247    }
248
249    public void setEnableJournalDiskSyncs(boolean syncWrites) {
250        this.enableJournalDiskSyncs = syncWrites;
251    }
252
253    public boolean isDeleteAllJobs() {
254        return deleteAllJobs;
255    }
256
257    public void setDeleteAllJobs(boolean deleteAllJobs) {
258        this.deleteAllJobs = deleteAllJobs;
259    }
260
261    /**
262     * @return the archiveDataLogs
263     */
264    public boolean isArchiveDataLogs() {
265        return this.archiveDataLogs;
266    }
267
268    /**
269     * @param archiveDataLogs the archiveDataLogs to set
270     */
271    public void setArchiveDataLogs(boolean archiveDataLogs) {
272        this.archiveDataLogs = archiveDataLogs;
273    }
274
275    /**
276     * @return the directoryArchive
277     */
278    public File getDirectoryArchive() {
279        return this.directoryArchive;
280    }
281
282    /**
283     * @param directoryArchive the directoryArchive to set
284     */
285    public void setDirectoryArchive(File directoryArchive) {
286        this.directoryArchive = directoryArchive;
287    }
288
289    public int getIndexCacheSize() {
290        return indexCacheSize;
291    }
292
293    public void setIndexCacheSize(int indexCacheSize) {
294        this.indexCacheSize = indexCacheSize;
295    }
296
297    public int getIndexWriteBatchSize() {
298        return indexWriteBatchSize;
299    }
300
301    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
302        this.indexWriteBatchSize = indexWriteBatchSize;
303    }
304
305    public boolean isUseIndexLFRUEviction() {
306        return useIndexLFRUEviction;
307    }
308
309    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
310        this.useIndexLFRUEviction = useIndexLFRUEviction;
311    }
312
313    public float getIndexLFUEvictionFactor() {
314        return indexLFUEvictionFactor;
315    }
316
317    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
318        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
319    }
320
321    public boolean isEnableIndexDiskSyncs() {
322        return enableIndexDiskSyncs;
323    }
324
325    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
326        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
327    }
328
329    public boolean isEnableIndexRecoveryFile() {
330        return enableIndexRecoveryFile;
331    }
332
333    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
334        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
335    }
336
337    public boolean isEnableIndexPageCaching() {
338        return enableIndexPageCaching;
339    }
340
341    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
342        this.enableIndexPageCaching = enableIndexPageCaching;
343    }
344
345    public boolean isPurgeStoreOnStartup() {
346        return this.purgeStoreOnStartup;
347    }
348
349    public void setPurgeStoreOnStartup(boolean purge) {
350        this.purgeStoreOnStartup = purge;
351    }
352
353    public boolean isIgnoreMissingJournalfiles() {
354        return ignoreMissingJournalfiles;
355    }
356
357    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
358        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
359    }
360
361    public long size() {
362        if (!isStarted()) {
363            return 0;
364        }
365        try {
366            return journalSize.get() + pageFile.getDiskSize();
367        } catch (IOException e) {
368            throw new RuntimeException(e);
369        }
370    }
371
372    @Override
373    public Locker createDefaultLocker() throws IOException {
374        SharedFileLocker locker = new SharedFileLocker();
375        locker.setDirectory(this.getDirectory());
376        return locker;
377    }
378
379    @Override
380    public void init() throws Exception {
381    }
382
383    /**
384     * Store a command in the Journal and process to update the Store index.
385     *
386     * @param command
387     *      The specific JournalCommand to store and process.
388     *
389     * @returns the Location where the data was written in the Journal.
390     *
391     * @throws IOException if an error occurs storing or processing the command.
392     */
393    public Location store(JournalCommand<?> command) throws IOException {
394        return store(command, isEnableIndexDiskSyncs(), null, null, null);
395    }
396
397    /**
398     * Store a command in the Journal and process to update the Store index.
399     *
400     * @param command
401     *      The specific JournalCommand to store and process.
402     * @param sync
403     *      Should the store operation be done synchronously. (ignored if completion passed).
404     *
405     * @returns the Location where the data was written in the Journal.
406     *
407     * @throws IOException if an error occurs storing or processing the command.
408     */
409    public Location store(JournalCommand<?> command, boolean sync) throws IOException {
410        return store(command, sync, null, null, null);
411    }
412
413    /**
414     * Store a command in the Journal and process to update the Store index.
415     *
416     * @param command
417     *      The specific JournalCommand to store and process.
418     * @param onJournalStoreComplete
419     *      The Runnable to call when the Journal write operation completes.
420     *
421     * @returns the Location where the data was written in the Journal.
422     *
423     * @throws IOException if an error occurs storing or processing the command.
424     */
425    public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException {
426        return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete);
427    }
428
429    /**
430     * Store a command in the Journal and process to update the Store index.
431     *
432     * @param command
433     *      The specific JournalCommand to store and process.
434     * @param sync
435     *      Should the store operation be done synchronously. (ignored if completion passed).
436     * @param before
437     *      The Runnable instance to execute before performing the store and process operation.
438     * @param after
439     *      The Runnable instance to execute after performing the store and process operation.
440     *
441     * @returns the Location where the data was written in the Journal.
442     *
443     * @throws IOException if an error occurs storing or processing the command.
444     */
445    public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException {
446        return store(command, sync, before, after, null);
447    }
448
449    /**
450     * All updated are are funneled through this method. The updates are converted to a
451     * JournalMessage which is logged to the journal and then the data from the JournalMessage
452     * is used to update the index just like it would be done during a recovery process.
453     *
454     * @param command
455     *      The specific JournalCommand to store and process.
456     * @param sync
457     *      Should the store operation be done synchronously. (ignored if completion passed).
458     * @param before
459     *      The Runnable instance to execute before performing the store and process operation.
460     * @param after
461     *      The Runnable instance to execute after performing the store and process operation.
462     * @param onJournalStoreComplete
463     *      Callback to be run when the journal write operation is complete.
464     *
465     * @returns the Location where the data was written in the Journal.
466     *
467     * @throws IOException if an error occurs storing or processing the command.
468     */
469    public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
470        try {
471
472            if (before != null) {
473                before.run();
474            }
475
476            ByteSequence sequence = toByteSequence(command);
477            Location location;
478            checkpointLock.readLock().lock();
479            try {
480
481                long start = System.currentTimeMillis();
482                location = onJournalStoreComplete == null ? journal.write(sequence, sync) :
483                                                            journal.write(sequence, onJournalStoreComplete);
484                long start2 = System.currentTimeMillis();
485
486                process(command, location);
487
488                long end = System.currentTimeMillis();
489                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
490                    LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms",
491                             (start2-start), (end-start2));
492                }
493            } finally {
494                checkpointLock.readLock().unlock();
495            }
496
497            if (after != null) {
498                after.run();
499            }
500
501            if (checkpointThread != null && !checkpointThread.isAlive()) {
502                startCheckpoint();
503            }
504            return location;
505        } catch (IOException ioe) {
506            LOG.error("KahaDB failed to store to Journal", ioe);
507            if (brokerService != null) {
508                brokerService.handleIOException(ioe);
509            }
510            throw ioe;
511        }
512    }
513
514    /**
515     * Loads a previously stored JournalMessage
516     *
517     * @param location
518     *      The location of the journal command to read.
519     *
520     * @return a new un-marshaled JournalCommand instance.
521     *
522     * @throws IOException if an error occurs reading the stored command.
523     */
524    protected JournalCommand<?> load(Location location) throws IOException {
525        ByteSequence data = journal.read(location);
526        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
527        byte readByte = is.readByte();
528        KahaEntryType type = KahaEntryType.valueOf(readByte);
529        if (type == null) {
530            try {
531                is.close();
532            } catch (IOException e) {
533            }
534            throw new IOException("Could not load journal record. Invalid location: " + location);
535        }
536        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
537        message.mergeFramed(is);
538        return message;
539    }
540
541    /**
542     * Process a stored or recovered JournalCommand instance and update the DB Index with the
543     * state changes that this command produces.  This can be called either as a new DB operation
544     * or as a replay during recovery operations.
545     *
546     * @param command
547     *      The JournalCommand to process.
548     * @param location
549     *      The location in the Journal where the command was written or read from.
550     */
551    protected abstract void process(JournalCommand<?> command, Location location) throws IOException;
552
553    /**
554     * Perform a checkpoint operation with optional cleanup.
555     *
556     * Called by the checkpoint background thread periodically to initiate a checkpoint operation
557     * and if the cleanup flag is set a cleanup sweep should be done to allow for release of no
558     * longer needed journal log files etc.
559     *
560     * @param cleanup
561     *      Should the method do a simple checkpoint or also perform a journal cleanup.
562     *
563     * @throws IOException if an error occurs during the checkpoint operation.
564     */
565    protected void checkpointUpdate(final boolean cleanup) throws IOException {
566        checkpointLock.writeLock().lock();
567        try {
568            this.indexLock.writeLock().lock();
569            try {
570                pageFile.tx().execute(new Transaction.Closure<IOException>() {
571                    @Override
572                    public void execute(Transaction tx) throws IOException {
573                        checkpointUpdate(tx, cleanup);
574                    }
575                });
576            } finally {
577                this.indexLock.writeLock().unlock();
578            }
579
580        } finally {
581            checkpointLock.writeLock().unlock();
582        }
583    }
584
585    /**
586     * Perform the checkpoint update operation.  If the cleanup flag is true then the
587     * operation should also purge any unused Journal log files.
588     *
589     * This method must always be called with the checkpoint and index write locks held.
590     *
591     * @param tx
592     *      The TX under which to perform the checkpoint update.
593     * @param cleanup
594     *      Should the checkpoint also do unused Journal file cleanup.
595     *
596     * @throws IOException if an error occurs while performing the checkpoint.
597     */
598    protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException;
599
600    /**
601     * Creates a new ByteSequence that represents the marshaled form of the given Journal Command.
602     *
603     * @param command
604     *      The Journal Command that should be marshaled to bytes for writing.
605     *
606     * @return the byte representation of the given journal command.
607     *
608     * @throws IOException if an error occurs while serializing the command.
609     */
610    protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
611        int size = data.serializedSizeFramed();
612        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
613        os.writeByte(data.type().getNumber());
614        data.writeFramed(os);
615        return os.toByteSequence();
616    }
617
618    /**
619     * Create the PageFile instance and configure it using the configuration options
620     * currently set.
621     *
622     * @return the newly created and configured PageFile instance.
623     */
624    protected PageFile createPageFile() {
625        PageFile index = new PageFile(getDirectory(), getPageFileName());
626        index.setEnableWriteThread(isEnableIndexWriteAsync());
627        index.setWriteBatchSize(getIndexWriteBatchSize());
628        index.setPageCacheSize(getIndexCacheSize());
629        index.setUseLFRUEviction(isUseIndexLFRUEviction());
630        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
631        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
632        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
633        index.setEnablePageCaching(isEnableIndexPageCaching());
634        return index;
635    }
636
637    /**
638     * Create a new Journal instance and configure it using the currently set configuration
639     * options.  If an archive directory is configured than this method will attempt to create
640     * that directory if it does not already exist.
641     *
642     * @return the newly created an configured Journal instance.
643     *
644     * @throws IOException if an error occurs while creating the Journal object.
645     */
646    protected Journal createJournal() throws IOException {
647        Journal manager = new Journal();
648        manager.setDirectory(getDirectory());
649        manager.setMaxFileLength(getJournalMaxFileLength());
650        manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles());
651        manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles());
652        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
653        manager.setArchiveDataLogs(isArchiveDataLogs());
654        manager.setSizeAccumulator(journalSize);
655        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
656        if (getDirectoryArchive() != null) {
657            IOHelper.mkdirs(getDirectoryArchive());
658            manager.setDirectoryArchive(getDirectoryArchive());
659        }
660        return manager;
661    }
662
663    /**
664     * Starts the checkpoint Thread instance if not already running and not disabled
665     * by configuration.
666     */
667    protected void startCheckpoint() {
668        if (checkpointInterval == 0 && cleanupInterval == 0) {
669            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
670            return;
671        }
672        synchronized (checkpointThreadLock) {
673            boolean start = false;
674            if (checkpointThread == null) {
675                start = true;
676            } else if (!checkpointThread.isAlive()) {
677                start = true;
678                LOG.info("KahaDB: Recovering checkpoint thread after death");
679            }
680            if (start) {
681                checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
682                    @Override
683                    public void run() {
684                        try {
685                            long lastCleanup = System.currentTimeMillis();
686                            long lastCheckpoint = System.currentTimeMillis();
687                            // Sleep for a short time so we can periodically check
688                            // to see if we need to exit this thread.
689                            long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
690                            while (opened.get()) {
691                                Thread.sleep(sleepTime);
692                                long now = System.currentTimeMillis();
693                                if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
694                                    checkpointCleanup(true);
695                                    lastCleanup = now;
696                                    lastCheckpoint = now;
697                                } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
698                                    checkpointCleanup(false);
699                                    lastCheckpoint = now;
700                                }
701                            }
702                        } catch (InterruptedException e) {
703                            // Looks like someone really wants us to exit this thread...
704                        } catch (IOException ioe) {
705                            LOG.error("Checkpoint failed", ioe);
706                            brokerService.handleIOException(ioe);
707                        }
708                    }
709                };
710
711                checkpointThread.setDaemon(true);
712                checkpointThread.start();
713            }
714        }
715    }
716
717    /**
718     * Called from the worker thread to start a checkpoint.
719     *
720     * This method ensure that the store is in an opened state and optionaly logs information
721     * related to slow store access times.
722     *
723     * @param cleanup
724     *      Should a cleanup of the journal occur during the checkpoint operation.
725     *
726     * @throws IOException if an error occurs during the checkpoint operation.
727     */
728    protected void checkpointCleanup(final boolean cleanup) throws IOException {
729        long start;
730        this.indexLock.writeLock().lock();
731        try {
732            start = System.currentTimeMillis();
733            if (!opened.get()) {
734                return;
735            }
736        } finally {
737            this.indexLock.writeLock().unlock();
738        }
739        checkpointUpdate(cleanup);
740        long end = System.currentTimeMillis();
741        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
742            LOG.info("Slow KahaDB access: cleanup took {}", (end - start));
743        }
744    }
745}