001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.DataInput;
020import java.io.DataInputStream;
021import java.io.DataOutput;
022import java.io.DataOutputStream;
023import java.io.IOException;
024
025import org.apache.activemq.util.ByteArrayInputStream;
026import org.apache.activemq.util.ByteArrayOutputStream;
027import org.apache.activemq.util.ByteSequence;
028import org.apache.activemq.wireformat.WireFormat;
029import org.fusesource.hawtbuf.Buffer;
030import org.fusesource.mqtt.codec.MQTTFrame;
031
032/**
033 * Implements marshalling and unmarsalling the <a
034 * href="http://mqtt.org/">MQTT</a> protocol.
035 */
036public class MQTTWireFormat implements WireFormat {
037
038    static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
039    static final long DEFAULT_CONNECTION_TIMEOUT = 30000L;
040
041    private int version = 1;
042
043    private int maxFrameSize = MAX_MESSAGE_LENGTH;
044    private long connectAttemptTimeout = MQTTWireFormat.DEFAULT_CONNECTION_TIMEOUT;
045
046    @Override
047    public ByteSequence marshal(Object command) throws IOException {
048        ByteArrayOutputStream baos = new ByteArrayOutputStream();
049        DataOutputStream dos = new DataOutputStream(baos);
050        marshal(command, dos);
051        dos.close();
052        return baos.toByteSequence();
053    }
054
055    @Override
056    public Object unmarshal(ByteSequence packet) throws IOException {
057        ByteArrayInputStream stream = new ByteArrayInputStream(packet);
058        DataInputStream dis = new DataInputStream(stream);
059        return unmarshal(dis);
060    }
061
062    @Override
063    public void marshal(Object command, DataOutput dataOut) throws IOException {
064        MQTTFrame frame = (MQTTFrame) command;
065        dataOut.write(frame.header());
066
067        int remaining = 0;
068        for (Buffer buffer : frame.buffers) {
069            remaining += buffer.length;
070        }
071        do {
072            byte digit = (byte) (remaining & 0x7F);
073            remaining >>>= 7;
074            if (remaining > 0) {
075                digit |= 0x80;
076            }
077            dataOut.write(digit);
078        } while (remaining > 0);
079        for (Buffer buffer : frame.buffers) {
080            dataOut.write(buffer.data, buffer.offset, buffer.length);
081        }
082    }
083
084    @Override
085    public Object unmarshal(DataInput dataIn) throws IOException {
086        byte header = dataIn.readByte();
087
088        byte digit;
089        int multiplier = 1;
090        int length = 0;
091        do {
092            digit = dataIn.readByte();
093            length += (digit & 0x7F) * multiplier;
094            multiplier <<= 7;
095        }
096        while ((digit & 0x80) != 0);
097
098        if (length >= 0) {
099            if (length > getMaxFrameSize()) {
100                throw new IOException("The maximum message length was exceeded");
101            }
102
103            if (length > 0) {
104                byte[] data = new byte[length];
105                dataIn.readFully(data);
106                Buffer body = new Buffer(data);
107                return new MQTTFrame(body).header(header);
108            } else {
109                return new MQTTFrame().header(header);
110            }
111        }
112        return null;
113    }
114
115    /**
116     * @param the version of the wire format
117     */
118    @Override
119    public void setVersion(int version) {
120        this.version = version;
121    }
122
123    /**
124     * @return the version of the wire format
125     */
126    @Override
127    public int getVersion() {
128        return this.version;
129    }
130
131    /**
132     * @return the maximum number of bytes a single MQTT message frame is allowed to be.
133     */
134    public int getMaxFrameSize() {
135        return maxFrameSize;
136    }
137
138    /**
139     * Sets the maximum frame size for an incoming MQTT frame.  The protocl limit is
140     * 256 megabytes and this value cannot be set higher.
141     *
142     * @param maxFrameSize
143     *        the maximum allowed frame size for a single MQTT frame.
144     */
145    public void setMaxFrameSize(int maxFrameSize) {
146        this.maxFrameSize = Math.min(MAX_MESSAGE_LENGTH, maxFrameSize);
147    }
148
149    /**
150     * @return the timeout value used to fail a connection if no CONNECT frame read.
151     */
152    public long getConnectAttemptTimeout() {
153        return connectAttemptTimeout;
154    }
155
156    /**
157     * Sets the timeout value used to fail a connection if no CONNECT frame is read
158     * in the given interval.
159     *
160     * @param connectTimeout
161     *        the connection frame received timeout value.
162     */
163    public void setConnectAttemptTimeout(long connectTimeout) {
164        this.connectAttemptTimeout = connectTimeout;
165    }
166}