1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.river.jeri.internal.mux;
20
21 import org.apache.river.jeri.internal.runtime.SelectionManager;
22 import org.apache.river.logging.Levels;
23 import java.io.EOFException;
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.SocketChannel;
28 import java.util.Deque;
29 import java.util.Iterator;
30 import java.util.LinkedList;
31 import java.util.logging.Level;
32 import java.util.logging.Logger;
33
34
35
36
37
38
39
40
41 final class SocketChannelConnectionIO extends ConnectionIO {
42
43 private static final int RECEIVE_BUFFER_SIZE = 4096;
44 private static final int IOV_MAX = 16;
45
46
47 private static final Logger logger =
48 Logger.getLogger("net.jini.jeri.connection.mux");
49
50
51 private static final SelectionManager selectionManager;
52 static {
53 try {
54 selectionManager = new SelectionManager();
55 } catch (IOException e) {
56 throw new ExceptionInInitializerError(e);
57 }
58 }
59
60
61
62
63
64 static {
65 java.security.AccessController.doPrivileged(
66 new java.security.PrivilegedAction() { public Object run() {
67 try {
68 Class.forName("sun.nio.ch.IOVecWrapper");
69 } catch (ClassNotFoundException e) {
70 }
71 return null;
72 } });
73 }
74
75
76 private static final String detailMessage4854354 =
77 "A non-blocking socket operation could not be completed immediately";
78
79
80 private final SocketChannel channel;
81
82 private final SelectionManager.Key key;
83
84
85
86
87 private final Deque sendQueue = new LinkedList();
88
89
90
91
92
93 private final Deque notifyQueue = new LinkedList();
94
95
96 private final ByteBuffer inputBuffer =
97 ByteBuffer.allocateDirect(RECEIVE_BUFFER_SIZE);
98
99 private final ByteBuffer[] bufferPair = new ByteBuffer[2];
100
101 private final ByteBuffer[] preallocBufferArray = new ByteBuffer[IOV_MAX];
102
103
104
105
106
107 SocketChannelConnectionIO(Mux mux, SocketChannel channel)
108 throws IOException
109 {
110 super(mux);
111 channel.configureBlocking(false);
112 this.channel = channel;
113 key = selectionManager.register(channel, new Handler());
114 }
115
116
117
118
119 void start() throws IOException {
120 key.renewInterestMask(SelectionKey.OP_READ);
121 }
122
123 void asyncSend(ByteBuffer buffer) {
124 synchronized (mux.muxLock) {
125 if (mux.muxDown) {
126 return;
127 }
128 try {
129 if (sendQueue.isEmpty()) {
130 channel.write(buffer);
131 }
132 if (buffer.hasRemaining()) {
133 sendQueue.addLast(buffer);
134 key.renewInterestMask(SelectionKey.OP_WRITE);
135 }
136 } catch (IOException e) {
137 mux.setDown("I/O error writing to mux connection: " +
138 e.toString(), e);
139 try {
140 channel.close();
141 } catch (IOException ignore) {
142 }
143 }
144 }
145 }
146
147 @Override
148 void asyncSend(ByteBuffer first, ByteBuffer second) {
149 synchronized (mux.muxLock) {
150 if (mux.muxDown) {
151 return;
152 }
153 try {
154 if (sendQueue.isEmpty()) {
155 bufferPair[0] = first;
156 bufferPair[1] = second;
157 try {
158 channel.write(bufferPair);
159 } catch (IOException e) {
160
161 String message = e.getMessage();
162 if (message != null &&
163 message.indexOf(detailMessage4854354) != -1)
164 {
165 logger.log(Levels.HANDLED,
166 "ignoring to work around 4854354", e);
167 } else {
168 throw e;
169 }
170 }
171 }
172 if (!first.hasRemaining()) {
173 if (second.hasRemaining()) {
174 sendQueue.addLast(second);
175 key.renewInterestMask(SelectionKey.OP_WRITE);
176 }
177 } else {
178 sendQueue.addLast(first);
179 sendQueue.addLast(second);
180 key.renewInterestMask(SelectionKey.OP_WRITE);
181 }
182 } catch (IOException e) {
183 mux.setDown("I/O error writing to mux connection: " +
184 e.toString(), e);
185 try {
186 channel.close();
187 } catch (IOException ignore) {
188 }
189 } finally {
190 bufferPair[0] = null;
191 bufferPair[1] = null;
192 }
193 }
194 }
195
196 @Override
197 IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
198 synchronized (mux.muxLock) {
199 IOFuture future = new IOFuture();
200 if (mux.muxDown) {
201 IOException ioe = new IOException(mux.muxDownMessage, mux.muxDownCause);
202 future.done(ioe);
203 return future;
204 }
205 try {
206 if (sendQueue.isEmpty()) {
207 bufferPair[0] = first;
208 bufferPair[1] = second;
209 try {
210 channel.write(bufferPair);
211 } catch (IOException e) {
212
213 String message = e.getMessage();
214 if (message != null &&
215 message.indexOf(detailMessage4854354) != -1)
216 {
217 logger.log(Levels.HANDLED,
218 "ignoring to work around 4854354", e);
219 } else {
220 throw e;
221 }
222 }
223 }
224 if (!first.hasRemaining()) {
225 if (second.hasRemaining()) {
226 sendQueue.addLast(second);
227 key.renewInterestMask(SelectionKey.OP_WRITE);
228 notifyQueue.addLast(second);
229 notifyQueue.addLast(future);
230 } else {
231 future.done(second.position());
232 }
233 } else {
234 sendQueue.addLast(first);
235 sendQueue.addLast(second);
236 key.renewInterestMask(SelectionKey.OP_WRITE);
237 notifyQueue.addLast(second);
238 notifyQueue.addLast(future);
239 }
240 } catch (IOException e) {
241 mux.setDown("I/O error writing to mux connection: " +
242 e.toString(), e);
243 future.done(e);
244 try {
245 channel.close();
246 } catch (IOException ignore) {
247 }
248 } finally {
249 bufferPair[0] = first;
250 bufferPair[1] = second;
251 }
252 return future;
253 }
254
255
256
257
258 }
259
260 private void handleWriteReady() {
261 try {
262 synchronized (mux.muxLock) {
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288 gatherLoop:
289 while (!sendQueue.isEmpty()) {
290
291
292
293
294 ByteBuffer[] bufs = preallocBufferArray;
295 int len = sendQueue.size();
296 if (len <= bufs.length) {
297 bufs = (ByteBuffer[]) sendQueue.toArray(bufs);
298 } else {
299 Iterator iter = sendQueue.iterator();
300 len = 0;
301 while (iter.hasNext() && len < bufs.length) {
302 bufs[len++] = (ByteBuffer) iter.next();
303 }
304 }
305 try {
306 channel.write(bufs, 0, len);
307 } catch (IOException e) {
308
309 String message = e.getMessage();
310 if (message != null &&
311 message.indexOf(detailMessage4854354) != -1)
312 {
313 logger.log(Levels.HANDLED,
314 "ignoring to work around 4854354", e);
315 } else {
316 throw e;
317 }
318 }
319 for (int i = 0; i < len; i++) {
320 ByteBuffer bb = bufs[i];
321 assert bb == sendQueue.getFirst();
322 if (!bb.hasRemaining()) {
323 sendQueue.removeFirst();
324 if (!notifyQueue.isEmpty() &&
325 bb == notifyQueue.getFirst())
326 {
327 notifyQueue.removeFirst();
328 IOFuture future =
329 (IOFuture) notifyQueue.removeFirst();
330 future.done(bb.position());
331 }
332 } else {
333 key.renewInterestMask(SelectionKey.OP_WRITE);
334 break gatherLoop;
335 }
336 }
337 }
338 }
339 } catch (IOException e) {
340 try {
341 logger.log(Levels.HANDLED,
342 "mux write handler, I/O error", e);
343 } catch (Throwable t) {
344 }
345 mux.setDown("I/O error writing to mux connection: " +
346 e.toString(), e);
347 drainNotifyQueue();
348 try {
349 channel.close();
350 } catch (IOException ignore) {
351 }
352 } catch (Throwable t) {
353 try {
354 logger.log(Level.WARNING,
355 "mux write handler, unexpected exception", t);
356 } catch (Throwable tt) {
357 }
358 mux.setDown("unexpected exception in mux write handler: " +
359 t.toString(), t);
360 drainNotifyQueue();
361 try {
362 channel.close();
363 } catch (IOException ignore) {
364 }
365 }
366 }
367
368 private void drainNotifyQueue() {
369 synchronized (mux.muxLock) {
370 assert mux.muxDown;
371 while (!notifyQueue.isEmpty()) {
372 notifyQueue.removeFirst();
373 IOFuture future = (IOFuture) notifyQueue.removeFirst();
374 IOException ioe = new IOException(mux.muxDownMessage, mux.muxDownCause);
375 future.done(ioe);
376 }
377 }
378 }
379
380 private void handleReadReady() {
381 try {
382 int n = channel.read(inputBuffer);
383 if (n == -1) {
384 throw new EOFException();
385 }
386 if (n > 0) {
387 mux.processIncomingData(inputBuffer);
388 }
389 assert inputBuffer.hasRemaining();
390 key.renewInterestMask(SelectionKey.OP_READ);
391 } catch (ProtocolException e) {
392 IOFuture future = null;
393 synchronized (mux.muxLock) {
394
395
396
397
398
399
400
401
402 if (!mux.muxDown) {
403 try {
404 logger.log(Levels.HANDLED,
405 "mux read handler, protocol error", e);
406 } catch (Throwable t) {
407 }
408 future = mux.futureSendError(e.getMessage());
409 mux.setDown("protocol violation detected: " +
410 e.getMessage(), null);
411 } else {
412 try {
413 logger.log(Level.FINEST,
414 "mux read handler: " + e.getMessage());
415 } catch (Throwable t) {
416 }
417 }
418 }
419 if (future != null) {
420 try {
421 future.waitUntilDone();
422 } catch (IOException ignore) {
423 } catch (InterruptedException interrupt) {
424 Thread.currentThread().interrupt();
425 }
426 }
427 try {
428 channel.close();
429 } catch (IOException ignore) {
430 }
431 } catch (IOException e) {
432 try {
433 logger.log(Levels.HANDLED,
434 "mux read handler, I/O error", e);
435 } catch (Throwable t) {
436 }
437 mux.setDown("I/O error reading from mux connection: " +
438 e.toString(), e);
439 try {
440 channel.close();
441 } catch (IOException ignore) {
442 }
443 } catch (Throwable t) {
444 try {
445 logger.log(Level.WARNING,
446 "mux read handler, unexpected exception", t);
447 } catch (Throwable tt) {
448 }
449 mux.setDown("unexpected exception in mux read handler: " +
450 t.toString(), t);
451 try {
452 channel.close();
453 } catch (IOException ignore) {
454 }
455 }
456 }
457
458 private class Handler implements SelectionManager.SelectionHandler {
459 @Override
460 public void handleSelection(int readyMask, SelectionManager.Key key) {
461 if ((readyMask & SelectionKey.OP_WRITE) != 0) {
462 handleWriteReady();
463 }
464 if ((readyMask & SelectionKey.OP_READ) != 0) {
465 handleReadReady();
466 }
467 }
468 }
469 }