001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.FilterInputStream;
020import java.io.IOException;
021import java.io.InputStream;
022
023/**
024 * An optimized buffered input stream for Tcp
025 *
026 *
027 */
028public class TcpBufferedInputStream extends FilterInputStream {
029    private static final int DEFAULT_BUFFER_SIZE = 8192;
030    protected byte internalBuffer[];
031    protected int count;
032    protected int position;
033
034    public TcpBufferedInputStream(InputStream in) {
035        this(in, DEFAULT_BUFFER_SIZE);
036    }
037
038    public TcpBufferedInputStream(InputStream in, int size) {
039        super(in);
040        if (size <= 0) {
041            throw new IllegalArgumentException("Buffer size <= 0");
042        }
043        internalBuffer = new byte[size];
044    }
045
046    protected void fill() throws IOException {
047        byte[] buffer = internalBuffer;
048        count = 0;
049        position = 0;
050        int n = in.read(buffer, position, buffer.length - position);
051        if (n > 0) {
052            count = n + position;
053        }
054    }
055
056    @Override
057    public int read() throws IOException {
058        if (position >= count) {
059            fill();
060            if (position >= count) {
061                return -1;
062            }
063        }
064        return internalBuffer[position++] & 0xff;
065    }
066
067    private int readStream(byte[] b, int off, int len) throws IOException {
068        int avail = count - position;
069        if (avail <= 0) {
070            if (len >= internalBuffer.length) {
071                return in.read(b, off, len);
072            }
073            fill();
074            avail = count - position;
075            if (avail <= 0) {
076                return -1;
077            }
078        }
079        int cnt = (avail < len) ? avail : len;
080        System.arraycopy(internalBuffer, position, b, off, cnt);
081        position += cnt;
082        return cnt;
083    }
084
085    @Override
086    public int read(byte b[], int off, int len) throws IOException {
087        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
088            throw new IndexOutOfBoundsException();
089        } else if (len == 0) {
090            return 0;
091        }
092        int n = 0;
093        for (;;) {
094            int nread = readStream(b, off + n, len - n);
095            if (nread <= 0) {
096                return (n == 0) ? nread : n;
097            }
098            n += nread;
099            if (n >= len) {
100                return n;
101            }
102            // if not closed but no bytes available, return
103            InputStream input = in;
104            if (input != null && input.available() <= 0) {
105                return n;
106            }
107        }
108    }
109
110    @Override
111    public long skip(long n) throws IOException {
112        if (n <= 0) {
113            return 0;
114        }
115        long avail = count - position;
116        if (avail <= 0) {
117            return in.skip(n);
118        }
119        long skipped = (avail < n) ? avail : n;
120        position += skipped;
121        return skipped;
122    }
123
124    @Override
125    public int available() throws IOException {
126        return in.available() + (count - position);
127    }
128
129    @Override
130    public boolean markSupported() {
131        return false;
132    }
133
134    @Override
135    public void close() throws IOException {
136        if (in != null) {
137            in.close();
138        }
139    }
140
141    /**
142     * @param array
143     * @throws IOException
144     */
145    public void unread(byte[] array) throws IOException {
146        int avail = internalBuffer.length - position;
147        if (array.length > avail) {
148            throw new IOException("Buffer is full, can't unread");
149        }
150
151        System.arraycopy(array, position, internalBuffer, 0, array.length);
152        count += array.length;
153    }
154}