1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.river.jeri.internal.mux;
20
21 import org.apache.river.logging.Levels;
22 import org.apache.river.thread.Executor;
23 import org.apache.river.thread.GetThreadPoolAction;
24 import java.io.EOFException;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.OutputStream;
28 import java.nio.ByteBuffer;
29 import java.nio.channels.ReadableByteChannel;
30 import java.nio.channels.WritableByteChannel;
31 import java.security.AccessController;
32 import java.util.Deque;
33 import java.util.LinkedList;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36
37
38
39
40
41
42
43
44 final class StreamConnectionIO extends ConnectionIO {
45
46 private static final int RECEIVE_BUFFER_SIZE = 2048;
47
48
49
50
51
52 private static final Executor systemThreadPool =
53 (Executor) AccessController.doPrivileged(
54 new GetThreadPoolAction(false));
55
56
57 private static final Logger logger =
58 Logger.getLogger("net.jini.jeri.connection.mux");
59
60
61 private final OutputStream out;
62 private final InputStream in;
63
64
65 private final WritableByteChannel outChannel;
66 private final ReadableByteChannel inChannel;
67
68
69
70
71
72
73
74 private final Deque sendQueue;
75
76
77
78
79
80
81 StreamConnectionIO(Mux mux, OutputStream out, InputStream in) {
82 super(mux);
83 this.out = out;
84
85 this.in = in;
86
87 outChannel = newChannel(out);
88 inChannel = newChannel(in);
89 sendQueue = new LinkedList();
90 }
91
92
93
94
95
96 @Override
97 void start() throws IOException {
98 try {
99 systemThreadPool.execute(new Writer(), "mux writer");
100 systemThreadPool.execute(new Reader(), "mux reader");
101 } catch (OutOfMemoryError e) {
102 try {
103 logger.log(Level.WARNING,
104 "could not create thread for request dispatch", e);
105 } catch (Throwable t) {
106 }
107 throw new IOException("could not create I/O threads", e);
108 }
109 }
110
111 @Override
112 void asyncSend(ByteBuffer buffer) {
113 synchronized (mux.muxLock) {
114 if (mux.muxDown) {
115 return;
116 }
117 sendQueue.addLast(buffer);
118 mux.muxLock.notifyAll();
119 }
120 }
121
122 @Override
123 void asyncSend(ByteBuffer first, ByteBuffer second) {
124 synchronized (mux.muxLock) {
125 if (mux.muxDown) {
126 return;
127 }
128 sendQueue.addLast(first);
129 sendQueue.addLast(second);
130 mux.muxLock.notifyAll();
131 }
132 }
133
134 @Override
135 IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
136 synchronized (mux.muxLock) {
137 IOFuture future = new IOFuture();
138 if (mux.muxDown) {
139 IOException ioe = new IOException(mux.muxDownMessage, mux.muxDownCause);
140 future.done(ioe);
141 return future;
142 }
143 sendQueue.addLast(first);
144 sendQueue.addLast(second);
145 sendQueue.addLast(future);
146 mux.muxLock.notifyAll();
147 return future;
148 }
149
150
151
152
153 }
154
155 private class Writer implements Runnable {
156 Writer() { }
157
158 @Override
159 public void run() {
160 Deque localQueue = null;
161 try {
162 while (true) {
163 synchronized (mux.muxLock) {
164 while (!mux.muxDown && sendQueue.isEmpty()) {
165
166
167
168
169
170 mux.muxLock.wait();
171
172
173
174
175
176 }
177 if (mux.muxDown && sendQueue.isEmpty()) {
178 logger.log(Level.FINEST,
179 "mux writer thread dying, connection " +
180 "down and nothing more to send");
181 break;
182 }
183
184 localQueue = new LinkedList(sendQueue);
185 sendQueue.clear();
186 }
187
188 boolean needToFlush = false;
189 ByteBuffer last = null;
190 int lastIndex = Integer.MIN_VALUE;
191 for ( int i = 0; !localQueue.isEmpty(); i++) {
192 Object next = localQueue.getFirst();
193 if (next instanceof ByteBuffer) {
194 ByteBuffer buffer = (ByteBuffer) next;
195 outChannel.write((buffer));
196 last = buffer;
197 lastIndex = i;
198 needToFlush = true;
199 } else {
200 assert next instanceof IOFuture;
201 if (needToFlush) {
202 out.flush();
203 needToFlush = false;
204 }
205 if (lastIndex == i - 1 && last != null){
206 ((IOFuture) next).done(last.position());
207 } else {
208 ((IOFuture) next).done();
209 }
210 }
211 localQueue.removeFirst();
212 }
213 if (needToFlush) {
214 out.flush();
215 }
216 }
217 } catch (InterruptedException e) {
218 try {
219 logger.log(Level.WARNING,
220 "mux writer thread dying, interrupted", e);
221 } catch (Throwable t) {
222 }
223 mux.setDown("mux writer thread interrupted", e);
224 } catch (IOException e) {
225 try {
226 logger.log(Levels.HANDLED,
227 "mux writer thread dying, I/O error", e);
228 } catch (Throwable t) {
229 }
230 mux.setDown("I/O error writing to mux connection: " +
231 e.toString(), e);
232 } catch (Throwable t) {
233 try {
234 logger.log(Level.WARNING,
235 "mux writer thread dying, unexpected exception", t);
236 } catch (Throwable tt) {
237 }
238 mux.setDown("unexpected exception in mux writer thread: " +
239 t.toString(), t);
240 } finally {
241 synchronized (mux.muxLock) {
242 assert mux.muxDown;
243 if (localQueue != null) {
244 drainQueue(localQueue);
245 }
246 drainQueue(sendQueue);
247 }
248 try {
249 outChannel.close();
250 } catch (IOException e) {
251 }
252 }
253 }
254 }
255
256 private void drainQueue(Deque queue) {
257 while (!queue.isEmpty()) {
258 Object next = queue.removeFirst();
259 if (next instanceof IOFuture) {
260 IOException ioe = new IOException(mux.muxDownMessage, mux.muxDownCause);
261 ((IOFuture) next).done(ioe);
262 }
263 }
264 }
265
266 private class Reader implements Runnable {
267
268 private final ByteBuffer inputBuffer =
269 ByteBuffer.allocate(RECEIVE_BUFFER_SIZE);
270
271 Reader() { }
272
273 public void run() {
274 try {
275 while (true) {
276 int n = inChannel.read(inputBuffer);
277 if (n == -1) {
278 throw new EOFException();
279 }
280 assert n > 0;
281 mux.processIncomingData(inputBuffer);
282 assert inputBuffer.hasRemaining();
283 }
284 } catch (ProtocolException e) {
285 IOFuture future = null;
286 synchronized (mux.muxLock) {
287
288
289
290
291
292
293
294
295 if (!mux.muxDown) {
296 try {
297 logger.log(Levels.HANDLED,
298 "mux reader thread dying, protocol error", e);
299 } catch (Throwable t) {
300 }
301 future = mux.futureSendError(e.getMessage());
302 mux.setDown("protocol violation detected: " +
303 e.getMessage(), null);
304 } else {
305 try {
306 logger.log(Level.FINEST,
307 "mux reader thread dying: " + e.getMessage());
308 } catch (Throwable t) {
309 }
310 }
311 }
312 if (future != null) {
313 try {
314 future.waitUntilDone();
315 } catch (IOException ignore) {
316 } catch (InterruptedException interrupt) {
317 Thread.currentThread().interrupt();
318 }
319 }
320 } catch (IOException e) {
321 try {
322 logger.log(Levels.HANDLED,
323 "mux reader thread dying, I/O error", e);
324 } catch (Throwable t) {
325 }
326 mux.setDown("I/O error reading from mux connection: " +
327 e.toString(), e);
328 } catch (Throwable t) {
329 try {
330 logger.log(Level.WARNING,
331 "mux reader thread dying, unexpected exception", t);
332 } catch (Throwable tt) {
333 }
334 mux.setDown("unexpected exception in mux reader thread: " +
335 t.toString(), t);
336 } finally {
337 try {
338 inChannel.close();
339 } catch (IOException e) {
340 }
341 }
342 }
343 }
344
345
346
347
348
349
350
351
352 public static ReadableByteChannel newChannel(final InputStream in) {
353 return new ReadableByteChannel() {
354 private boolean open = true;
355
356
357 @Override
358 public synchronized int read(ByteBuffer dst) throws IOException {
359 assert dst.hasArray();
360 byte[] array = dst.array();
361 int arrayOffset = dst.arrayOffset();
362
363 int totalRead = 0;
364 int bytesRead = 0;
365 int bytesToRead;
366 while ((bytesToRead = dst.remaining()) > 0) {
367 if ((totalRead > 0) && !(in.available() > 0)) {
368 break;
369 }
370 int pos = dst.position();
371 bytesRead = in.read(array, arrayOffset + pos, bytesToRead);
372 if (bytesRead < 0) {
373 break;
374 } else {
375 dst.position(pos + bytesRead);
376 totalRead += bytesRead;
377 }
378 }
379 if ((bytesRead < 0) && (totalRead == 0)) {
380 return -1;
381 }
382
383 return totalRead;
384 }
385
386 @Override
387 public synchronized boolean isOpen() {
388 return open;
389 }
390
391
392 @Override
393 public synchronized void close() throws IOException {
394 in.close();
395 open = false;
396 }
397 };
398 }
399
400 public static WritableByteChannel newChannel(final OutputStream out) {
401 return new WritableByteChannel() {
402 private volatile boolean open = true;
403
404
405 @Override
406 public synchronized int write(ByteBuffer src) throws IOException {
407 assert src.hasArray();
408
409 int len = src.remaining();
410 if (len > 0) {
411 int pos = src.position();
412 out.write(src.array(), src.arrayOffset() + pos, len);
413 src.position(pos + len);
414 }
415 return len;
416 }
417
418 @Override
419 public boolean isOpen() {
420 return open;
421 }
422
423
424 @Override
425 public synchronized void close() throws IOException {
426 out.close();
427 open = false;
428 }
429 };
430 }
431
432 }