1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21 
22 module capnproto.BufferedInputStreamWrapper;
23 
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.ReadableByteChannel;
27 
28 import capnproto.BufferedInputStream;
29 
30 final class BufferedInputStreamWrapper : BufferedInputStream
31 {
32 public: //Methods.
33 	this(ReadableByteChannel chan)
34 	{
35 		this.inner = chan;
36 		this.buf = ByteBuffer(new ubyte[](8192));
37 		this.buf.limit = 0;
38 	}
39 	
40 	size_t read(ref ByteBuffer dst)
41 	{
42 		auto numBytes = dst.remaining();
43 		if(numBytes < this.buf.remaining())
44 		{
45 			//# Serve from the current buffer.
46 			auto slice = this.buf.slice();
47 			slice.limit = numBytes;
48 			dst.put!ByteBuffer(slice);
49 			this.buf.position += numBytes;
50 			return numBytes;
51 		}
52 		else
53 		{
54 			//# Copy current available into destination.
55 			auto fromFirstBuffer = this.buf.remaining();
56 			{
57 				auto slice = this.buf.slice();
58 				slice.limit = fromFirstBuffer;
59 				dst.put!ByteBuffer(slice);
60 			}
61 			
62 			numBytes -= fromFirstBuffer;
63 			if(numBytes <= this.buf.capacity())
64 			{
65 				//# Read the next buffer-full.
66 				this.buf.clear();
67 				auto n = readAtLeast(this.inner, this.buf, numBytes);
68 				
69 				this.buf.rewind();
70 				auto slice = this.buf.slice();
71 				slice.limit = numBytes;
72 				dst.put!ByteBuffer(slice);
73 				
74 				this.buf.limit = n;
75 				this.buf.position = numBytes;
76 				return fromFirstBuffer + numBytes;
77 			}
78 			else
79 			{
80 				//# Forward large read to the underlying stream.
81 				this.buf.clear();
82 				this.buf.limit = 0;
83 				return fromFirstBuffer + readAtLeast(this.inner, dst, numBytes);
84 			}
85 		}
86 	}
87 	
88 	ByteBuffer* getReadBuffer()
89 	{
90 		if(this.buf.remaining() == 0)
91 		{
92 			this.buf.clear();
93 			auto n = readAtLeast(this.inner, this.buf, 1);
94 			this.buf.rewind();
95 			this.buf.limit = n;
96 		}
97 		return &this.buf;
98 	}
99 	
100 	void close()
101 	{
102 		this.inner.close();
103 	}
104 	
105 	bool isOpen()
106 	{
107 		return this.inner.isOpen();
108 	}
109 	
110 	int readAtLeast(ReadableByteChannel reader, ref ByteBuffer buf, size_t minBytes)
111 	{
112 		int numRead = 0;
113 		while(numRead < minBytes)
114 		{
115 			auto res = reader.read(buf);
116 			if(res < 0)
117 				throw new Error("premature EOF");
118 			numRead += res;
119 		}
120 		return numRead;
121 	}
122 
123 private: //Variables.
124 	ReadableByteChannel inner;
125 	ByteBuffer buf;
126 }