1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.river.jeri.internal.mux;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.util.Deque;
25 import java.util.LinkedList;
26
27
28
29
30
31 class MuxInputStream extends InputStream {
32 private final Object sessionLock;
33 private final Session session;
34 private final Mux mux;
35 private final Deque<ByteBuffer> inBufQueue;
36 private IOException sessionDown = null;
37 private int inBufRemaining = 0;
38 private int inBufPos = 0;
39 private boolean inEOF = false;
40 private boolean inClosed = false;
41 private boolean sentAcknowledgment = false;
42
43 MuxInputStream(Mux mux, Session session, Object sessionLock) {
44 this.mux = mux;
45 this.session = session;
46 this.sessionLock = sessionLock;
47 this.inBufQueue = new LinkedList<ByteBuffer>();
48 }
49
50 void down(IOException e) {
51 sessionDown = e;
52 }
53
54 void appendToBufQueue(ByteBuffer data) {
55 inBufQueue.addLast(data);
56 }
57
58 @Override
59 public int read() throws IOException {
60 synchronized (sessionLock) {
61 if (inClosed) {
62 throw new IOException("stream closed");
63 }
64 while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
65 if (session.getInState() == Session.IDLE) {
66 assert session.getOutState() == Session.IDLE;
67 mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
68 session.setOutState(Session.OPEN);
69 session.setInState(Session.OPEN);
70 }
71 if (!session.inRationInfinite && session.getInRation() == 0) {
72 int inc = mux.initialInboundRation;
73 mux.asyncSendIncrementRation(session.sessionID, inc);
74 session.setInRation(session.getInRation() + inc);
75 }
76 try {
77 sessionLock.wait(5000L);
78 } catch (InterruptedException e) {
79 String message = "request I/O interrupted";
80 session.setDown(message, e);
81 throw wrap(message, e);
82 }
83 }
84 if (inClosed) {
85 throw new IOException("stream closed");
86 }
87 if (inBufRemaining == 0) {
88 if (inEOF) {
89 return -1;
90 } else {
91 if (session.getInState() == Session.TERMINATED) {
92 throw new IOException("request aborted by remote endpoint");
93 }
94 assert sessionDown != null;
95 throw sessionDown;
96 }
97 }
98 assert inBufQueue.size() > 0;
99 int result = -1;
100 while (result == -1) {
101 ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
102 if (inBufPos < buf.limit()) {
103 result = (buf.get() & 0xFF);
104 inBufPos++;
105 inBufRemaining--;
106 }
107 if (inBufPos == buf.limit()) {
108 inBufQueue.removeFirst();
109 inBufPos = 0;
110 }
111 }
112 if (!session.inRationInfinite) {
113 checkInboundRation();
114 }
115 return result;
116 }
117 }
118
119 private IOException wrap(String message, Exception e) {
120 Throwable t;
121 if (Session.traceSupression()) {
122 t = e;
123 } else {
124 t = e.fillInStackTrace();
125 }
126 return new IOException(message, t);
127 }
128
129 @Override
130 public int read(byte[] b, int off, int len) throws IOException {
131 if (b == null) {
132 throw new NullPointerException();
133 } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
134 throw new IndexOutOfBoundsException();
135 }
136 synchronized (sessionLock) {
137 if (inClosed) {
138 throw new IOException("stream closed");
139 } else if (len == 0) {
140
141
142
143
144
145 return 0;
146 }
147 while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
148 if (session.getInState() == Session.IDLE) {
149 assert session.getOutState() == Session.IDLE;
150 mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
151 session.setOutState(Session.OPEN);
152 session.setInState(Session.OPEN);
153 }
154 if (!session.inRationInfinite && session.getInRation() == 0) {
155 int inc = mux.initialInboundRation;
156 mux.asyncSendIncrementRation(session.sessionID, inc);
157 session.setInRation(session.getInRation() + inc);
158 }
159 try {
160 sessionLock.wait(5000L);
161 } catch (InterruptedException e) {
162 String message = "request I/O interrupted";
163 session.setDown(message, e);
164 throw wrap(message, e);
165 }
166 }
167 if (inClosed) {
168 throw new IOException("stream closed");
169 }
170 if (inBufRemaining == 0) {
171 if (inEOF) {
172 return -1;
173 } else {
174 if (session.getInState() == Session.TERMINATED) {
175 throw new IOException("request aborted by remote endpoint");
176 }
177 assert sessionDown != null;
178 throw sessionDown;
179 }
180 }
181 assert inBufQueue.size() > 0;
182 int remaining = len;
183 while (remaining > 0 && inBufRemaining > 0) {
184 ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
185 if (inBufPos < buf.limit()) {
186 int toCopy = Math.min(buf.limit() - inBufPos, remaining);
187 buf.get(b, off, toCopy);
188 inBufPos += toCopy;
189 inBufRemaining -= toCopy;
190 off += toCopy;
191 remaining -= toCopy;
192 }
193 if (inBufPos == buf.limit()) {
194 inBufQueue.removeFirst();
195 inBufPos = 0;
196 }
197 }
198 if (!session.inRationInfinite) {
199 checkInboundRation();
200 }
201 return len - remaining;
202 }
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216 private void checkInboundRation() {
217 assert Thread.holdsLock(sessionLock);
218 assert !session.inRationInfinite;
219 if (session.getInState() >= Session.FINISHED) {
220 return;
221 }
222 int mark = mux.initialInboundRation / 2;
223 int used = inBufRemaining + session.getInRation();
224 if (used <= mark) {
225 int inc = mux.initialInboundRation - used;
226 mux.asyncSendIncrementRation(session.sessionID, inc);
227 session.setInRation(session.getInRation() + inc);
228 }
229 }
230
231 @Override
232 public int available() throws IOException {
233 synchronized (sessionLock) {
234 if (inClosed) {
235 throw new IOException("stream closed");
236 }
237
238
239
240
241
242 return inBufRemaining;
243 }
244 }
245
246 @Override
247 public void close() {
248 synchronized (sessionLock) {
249 if (inClosed) {
250 return;
251 }
252 inClosed = true;
253 inBufQueue.clear();
254 if (session.role == Session.CLIENT && !sentAcknowledgment && session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) {
255 mux.asyncSendAcknowledgment(session.sessionID);
256 sentAcknowledgment = true;
257
258
259
260
261
262
263 if (session.isRemoveLater()) {
264 session.setOutState(Session.TERMINATED);
265 mux.removeSession(session.sessionID);
266 session.setRemoveLater(false);
267 }
268 }
269 sessionLock.notifyAll();
270 }
271 }
272
273
274
275
276 boolean isSentAcknowledgment() {
277 return sentAcknowledgment;
278 }
279
280
281
282
283 int getBufRemaining() {
284 return inBufRemaining;
285 }
286
287
288
289
290 boolean isClosed() {
291 return inClosed;
292 }
293
294
295
296
297 void setBufRemaining(int inBufRemaining) {
298 this.inBufRemaining = inBufRemaining;
299 }
300
301
302
303
304 void setEOF(boolean inEOF) {
305 this.inEOF = inEOF;
306 }
307
308 }