001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command.store;
018
019import java.io.BufferedOutputStream;
020import java.io.File;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.HashMap;
026
027import org.apache.activemq.broker.BrokerFactory;
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQQueue;
031import org.apache.activemq.command.ActiveMQTopic;
032import org.apache.activemq.command.Message;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageId;
035import org.apache.activemq.command.SubscriptionInfo;
036import org.apache.activemq.command.XATransactionId;
037import org.apache.activemq.console.command.store.proto.MessagePB;
038import org.apache.activemq.console.command.store.proto.QueueEntryPB;
039import org.apache.activemq.console.command.store.proto.QueuePB;
040import org.apache.activemq.openwire.OpenWireFormat;
041import org.apache.activemq.store.MessageRecoveryListener;
042import org.apache.activemq.store.MessageStore;
043import org.apache.activemq.store.PersistenceAdapter;
044import org.apache.activemq.store.TopicMessageStore;
045import org.apache.activemq.store.TransactionRecoveryListener;
046import org.fusesource.hawtbuf.AsciiBuffer;
047import org.fusesource.hawtbuf.DataByteArrayOutputStream;
048import org.fusesource.hawtbuf.UTF8Buffer;
049
050import com.fasterxml.jackson.databind.ObjectMapper;
051
052/**
053 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
054 */
055public class StoreExporter {
056
057    static final int OPENWIRE_VERSION = 8;
058    static final boolean TIGHT_ENCODING = false;
059
060    URI config;
061    File file;
062
063    private final ObjectMapper mapper = new ObjectMapper();
064    private final AsciiBuffer ds_kind = new AsciiBuffer("ds");
065    private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
066    private final AsciiBuffer codec_id = new AsciiBuffer("openwire");
067    private final OpenWireFormat wireformat = new OpenWireFormat();
068
069    public StoreExporter() throws URISyntaxException {
070        config = new URI("xbean:activemq.xml");
071        wireformat.setCacheEnabled(false);
072        wireformat.setTightEncodingEnabled(TIGHT_ENCODING);
073        wireformat.setVersion(OPENWIRE_VERSION);
074    }
075
076    public void execute() throws Exception {
077        if (config == null) {
078            throw new Exception("required --config option missing");
079        }
080        if (file == null) {
081            throw new Exception("required --file option missing");
082        }
083        System.out.println("Loading: " + config);
084        BrokerFactory.setStartDefault(false); // to avoid the broker auto-starting..
085        BrokerService broker = BrokerFactory.createBroker(config);
086        BrokerFactory.resetStartDefault();
087        PersistenceAdapter store = broker.getPersistenceAdapter();
088        System.out.println("Starting: " + store);
089        store.start();
090        try {
091            BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(file));
092            try {
093                export(store, fos);
094            } finally {
095                fos.close();
096            }
097        } finally {
098            store.stop();
099        }
100    }
101
102    void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
103
104
105        final long[] messageKeyCounter = new long[]{0};
106        final long[] containerKeyCounter = new long[]{0};
107        final ExportStreamManager manager = new ExportStreamManager(fos, 1);
108
109
110        final int[] preparedTxs = new int[]{0};
111        store.createTransactionStore().recover(new TransactionRecoveryListener() {
112            @Override
113            public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
114                preparedTxs[0] += 1;
115            }
116        });
117
118        if (preparedTxs[0] > 0) {
119            throw new Exception("Cannot export a store with prepared XA transactions.  Please commit or rollback those transactions before attempting to export.");
120        }
121
122        for (ActiveMQDestination odest : store.getDestinations()) {
123            containerKeyCounter[0]++;
124            if (odest instanceof ActiveMQQueue) {
125                ActiveMQQueue dest = (ActiveMQQueue) odest;
126                MessageStore queue = store.createQueueMessageStore(dest);
127
128                QueuePB.Bean destRecord = new QueuePB.Bean();
129                destRecord.setKey(containerKeyCounter[0]);
130                destRecord.setBindingKind(ptp_kind);
131
132                final long[] seqKeyCounter = new long[]{0};
133
134                HashMap<String, Object> jsonMap = new HashMap<String, Object>();
135                jsonMap.put("@class", "queue_destination");
136                jsonMap.put("name", dest.getQueueName());
137                String json = mapper.writeValueAsString(jsonMap);
138                System.out.println(json);
139                destRecord.setBindingData(new UTF8Buffer(json));
140                manager.store_queue(destRecord);
141
142                queue.recover(new MessageRecoveryListener() {
143                    @Override
144                    public boolean hasSpace() {
145                        return true;
146                    }
147
148                    @Override
149                    public boolean recoverMessageReference(MessageId ref) throws Exception {
150                        return true;
151                    }
152
153                    @Override
154                    public boolean isDuplicate(MessageId ref) {
155                        return false;
156                    }
157
158                    @Override
159                    public boolean recoverMessage(Message message) throws IOException {
160                        messageKeyCounter[0]++;
161                        seqKeyCounter[0]++;
162
163                        MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
164                        manager.store_message(messageRecord);
165
166                        QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
167                        manager.store_queue_entry(entryRecord);
168
169                        return true;
170                    }
171                });
172
173            } else if (odest instanceof ActiveMQTopic) {
174                ActiveMQTopic dest = (ActiveMQTopic) odest;
175
176                TopicMessageStore topic = store.createTopicMessageStore(dest);
177                for (SubscriptionInfo sub : topic.getAllSubscriptions()) {
178
179                    QueuePB.Bean destRecord = new QueuePB.Bean();
180                    destRecord.setKey(containerKeyCounter[0]);
181                    destRecord.setBindingKind(ds_kind);
182
183                    // TODO: use a real JSON encoder like jackson.
184                    HashMap<String, Object> jsonMap = new HashMap<String, Object>();
185                    jsonMap.put("@class", "dsub_destination");
186                    jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName());
187                    HashMap<String, Object> jsonTopic = new HashMap<String, Object>();
188                    jsonTopic.put("name", dest.getTopicName());
189                    jsonMap.put("topics", new Object[]{jsonTopic});
190                    if (sub.getSelector() != null) {
191                        jsonMap.put("selector", sub.getSelector());
192                    }
193                    jsonMap.put("noLocal", sub.isNoLocal());
194                    String json = mapper.writeValueAsString(jsonMap);
195                    System.out.println(json);
196
197                    destRecord.setBindingData(new UTF8Buffer(json));
198                    manager.store_queue(destRecord);
199
200                    final long seqKeyCounter[] = new long[]{0};
201                    topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() {
202                        @Override
203                        public boolean hasSpace() {
204                            return true;
205                        }
206
207                        @Override
208                        public boolean recoverMessageReference(MessageId ref) throws Exception {
209                            return true;
210                        }
211
212                        @Override
213                        public boolean isDuplicate(MessageId ref) {
214                            return false;
215                        }
216
217                        @Override
218                        public boolean recoverMessage(Message message) throws IOException {
219                            messageKeyCounter[0]++;
220                            seqKeyCounter[0]++;
221
222                            MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
223                            manager.store_message(messageRecord);
224
225                            QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
226                            manager.store_queue_entry(entryRecord);
227                            return true;
228                        }
229                    });
230
231                }
232            }
233        }
234        manager.finish();
235    }
236
237    private QueueEntryPB.Bean createQueueEntryPB(Message message, long queueKey, long queueSeq, long messageKey) {
238        QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
239        entryRecord.setQueueKey(queueKey);
240        entryRecord.setQueueSeq(queueSeq);
241        entryRecord.setMessageKey(messageKey);
242        entryRecord.setSize(message.getSize());
243        if (message.getExpiration() != 0) {
244            entryRecord.setExpiration(message.getExpiration());
245        }
246        if (message.getRedeliveryCounter() != 0) {
247            entryRecord.setRedeliveries(message.getRedeliveryCounter());
248        }
249        return entryRecord;
250    }
251
252    private MessagePB.Bean createMessagePB(Message message, long messageKey) throws IOException {
253        DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
254        mos.writeBoolean(TIGHT_ENCODING);
255        mos.writeVarInt(OPENWIRE_VERSION);
256        wireformat.marshal(message, mos);
257
258        MessagePB.Bean messageRecord = new MessagePB.Bean();
259        messageRecord.setCodec(codec_id);
260        messageRecord.setMessageKey(messageKey);
261        messageRecord.setSize(message.getSize());
262        messageRecord.setValue(mos.toBuffer());
263        return messageRecord;
264    }
265
266    public File getFile() {
267        return file;
268    }
269
270    public void setFile(String file) {
271        setFile(new File(file));
272    }
273
274    public void setFile(File file) {
275        this.file = file;
276    }
277
278    public URI getConfig() {
279        return config;
280    }
281
282    public void setConfig(URI config) {
283        this.config = config;
284    }
285}