1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.river.jeri.internal.runtime;
20
21 import org.apache.river.logging.Levels;
22 import org.apache.river.thread.Executor;
23 import org.apache.river.thread.GetThreadPoolAction;
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.CancelledKeyException;
27 import java.nio.channels.ClosedChannelException;
28 import java.nio.channels.IllegalBlockingModeException;
29 import java.nio.channels.Pipe;
30 import java.nio.channels.SelectableChannel;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.Selector;
33 import java.security.AccessController;
34 import java.util.Collections;
35 import java.util.Iterator;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.WeakHashMap;
39 import java.util.logging.Level;
40 import java.util.logging.Logger;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 public final class SelectionManager {
81
82
83 private static final int concurrency = 1;
84
85 private static final Logger logger = Logger.getLogger(
86 "org.apache.river.jeri.internal.runtime.SelectionManager");
87
88
89 private static final Executor systemThreadPool = (Executor)
90 AccessController.doPrivileged(new GetThreadPoolAction(false));
91
92
93 private final Selector selector;
94
95
96 private final Pipe.SinkChannel wakeupPipeSink;
97 private final Pipe.SourceChannel wakeupPipeSource;
98 private final SelectionKey wakeupPipeKey;
99 private final ByteBuffer wakeupBuffer = ByteBuffer.allocate(2);
100
101
102 private final Map registeredChannels =
103 Collections.synchronizedMap(new WeakHashMap());
104
105
106
107
108
109 private final Object lock = new Object();
110
111
112 private Thread selectingThread = null;
113
114
115 private boolean wakeupPending = false;
116
117
118
119
120
121
122
123
124
125
126
127
128 private Key renewQueue = null;
129
130
131 private Key readyQueue = null;
132
133
134 private final int[] renewMaskRef = new int[1];
135
136
137
138
139
140
141
142 public SelectionManager() throws IOException {
143
144
145
146 selector = Selector.open();
147
148 Pipe pipe = Pipe.open();
149 wakeupPipeSink = pipe.sink();
150 wakeupPipeSource = pipe.source();
151 wakeupPipeSource.configureBlocking(false);
152 wakeupPipeKey = wakeupPipeSource.register(selector,
153 SelectionKey.OP_READ);
154
155 for (int i = 0; i < concurrency; i++) {
156 systemThreadPool.execute(new SelectLoop(),
157 "I/O SelectionManager-" + i);
158 }
159
160
161
162 }
163
164
165
166
167
168
169 public Key register(SelectableChannel channel, SelectionHandler handler) {
170 if (registeredChannels.containsKey(channel)) {
171 throw new IllegalStateException("channel already registered");
172 }
173 Key key = new Key(channel, handler);
174 registeredChannels.put(channel, null);
175 return key;
176 }
177
178
179
180
181
182
183 public interface SelectionHandler {
184 void handleSelection(int readyMask, Key key);
185 }
186
187
188
189
190
191
192
193 public final class Key {
194
195
196 final SelectableChannel channel;
197
198 final SelectionHandler handler;
199
200
201
202
203
204
205
206 SelectionKey selectionKey = null;
207
208
209 int interestMask = 0;
210
211 boolean onRenewQueue = false;
212 Key renewQueueNext = null;
213 int renewMask = 0;
214
215 boolean onReadyQueue = false;
216 Key readyQueueNext = null;
217 int readyMask = 0;
218
219
220
221
222
223
224
225
226
227 Key(SelectableChannel channel, SelectionHandler handler) {
228 this.channel = channel;
229 this.handler = handler;
230 }
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 public void renewInterestMask(int mask)
249 throws ClosedChannelException
250 {
251 if (!channel.isOpen()) {
252 throw new ClosedChannelException();
253 }
254 if ((mask & ~channel.validOps()) != 0) {
255 throw new IllegalArgumentException(
256 "invalid mask " + mask +
257 " (valid mask " + channel.validOps() + ")");
258 }
259 if (channel.isBlocking()) {
260 throw new IllegalBlockingModeException();
261 }
262 synchronized (lock) {
263 int delta = mask & ~(renewMask | interestMask | readyMask);
264 if (delta != 0) {
265 addOrUpdateRenewQueue(this, delta);
266 if (selectingThread != null && !wakeupPending) {
267 wakeupSelector();
268 wakeupPending = true;
269 }
270 }
271 }
272 }
273 }
274
275
276
277
278 private class SelectLoop implements Runnable {
279
280 private long lastExceptionTime = 0L;
281 private int recentExceptionCount;
282
283 public void run() {
284 int[] readyMaskRef = new int[1];
285 while (true) {
286 try {
287 Key readyKey = waitForReadyKey(readyMaskRef);
288 readyKey.handler.handleSelection(readyMaskRef[0],
289 readyKey);
290 } catch (Throwable t) {
291 try {
292 logger.log(Level.WARNING, "select loop throws", t);
293 } catch (Throwable tt) {
294 }
295 throttleLoopOnException();
296 }
297 }
298 }
299
300
301
302
303
304
305 private void throttleLoopOnException() {
306 long now = System.currentTimeMillis();
307 if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) {
308
309 lastExceptionTime = now;
310 recentExceptionCount = 0;
311 } else {
312
313 if (++recentExceptionCount >= 10) {
314 try {
315 Thread.sleep(10000);
316 } catch (InterruptedException ignore) {
317 }
318 }
319 }
320 }
321 }
322
323
324
325
326
327
328
329
330
331
332
333
334
335 private Key waitForReadyKey(int[] readyMaskOut)
336 throws InterruptedException
337 {
338 assert !Thread.holdsLock(lock);
339 assert readyMaskOut != null && readyMaskOut.length == 1;
340
341 boolean needToClearSelectingThread = false;
342 Set selectedKeys = selector.selectedKeys();
343
344 try {
345 synchronized (lock) {
346 while (isReadyQueueEmpty() && selectingThread != null) {
347 lock.wait();
348 }
349 if (!isReadyQueueEmpty()) {
350 Key readyKey = removeFromReadyQueue(readyMaskOut);
351 lock.notify();
352 return readyKey;
353 }
354
355 assert selectingThread == null;
356 selectingThread = Thread.currentThread();
357 needToClearSelectingThread = true;
358
359 processRenewQueue();
360 }
361
362 while (true) {
363 try {
364 int n = selector.select();
365 if (Thread.interrupted()) {
366 throw new InterruptedException();
367 }
368 } catch (Error e) {
369 String message = e.getMessage();
370 if (message != null && message.startsWith("POLLNVAL")) {
371
372 Thread.sleep(100L);
373 continue;
374 } else {
375 throw e;
376 }
377 } catch (CancelledKeyException e) {
378 continue;
379 } catch (NullPointerException e) {
380 continue;
381 } catch (IOException e) {
382 logger.log(Levels.HANDLED,
383 "thrown by select, continuing", e);
384 continue;
385 }
386
387 synchronized (lock) {
388 if (wakeupPending &&
389 selectedKeys.contains(wakeupPipeKey))
390 {
391 drainWakeupPipe();
392 wakeupPending = false;
393 selectedKeys.remove(wakeupPipeKey);
394 }
395 if (selectedKeys.isEmpty()) {
396 processRenewQueue();
397 continue;
398 }
399
400 selectingThread = null;
401 needToClearSelectingThread = false;
402 lock.notify();
403
404 Iterator iter = selectedKeys.iterator();
405 assert iter.hasNext();
406 while (iter.hasNext()) {
407 SelectionKey selectionKey = (SelectionKey) iter.next();
408 Key key = (Key) selectionKey.attachment();
409
410 int readyMask = 0;
411 try {
412 readyMask = selectionKey.readyOps();
413 assert readyMask != 0;
414 assert (key.interestMask & readyMask) == readyMask;
415
416
417
418
419
420
421 int newInterestMask =
422 key.interestMask & ~readyMask;
423 assert key.interestMask ==
424 selectionKey.interestOps();
425 key.selectionKey.interestOps(newInterestMask);
426 key.interestMask = newInterestMask;
427 } catch (CancelledKeyException e) {
428
429
430
431
432 readyMask |= key.interestMask;
433 key.interestMask = 0;
434 }
435 addOrUpdateReadyQueue(key, readyMask);
436
437 iter.remove();
438 }
439
440 return removeFromReadyQueue(readyMaskOut);
441 }
442 }
443 } finally {
444 if (needToClearSelectingThread) {
445 synchronized (lock) {
446 if (wakeupPending &&
447 selectedKeys.contains(wakeupPipeKey))
448 {
449 drainWakeupPipe();
450 wakeupPending = false;
451 selectedKeys.remove(wakeupPipeKey);
452 }
453 selectingThread = null;
454 needToClearSelectingThread = false;
455 lock.notify();
456 }
457 }
458 }
459 }
460
461 private void wakeupSelector() {
462 assert Thread.holdsLock(lock);
463 assert wakeupPending == false;
464
465 wakeupBuffer.clear().limit(1);
466 try {
467 wakeupPipeSink.write(wakeupBuffer);
468 } catch (IOException e) {
469
470 throw new AssertionError("unexpected I/O exception", e);
471 }
472 }
473
474 private void drainWakeupPipe() {
475 assert Thread.holdsLock(lock);
476 assert selectingThread != null;
477
478 do {
479 wakeupBuffer.clear();
480 try {
481 wakeupPipeSource.read(wakeupBuffer);
482 } catch (IOException e) {
483
484 throw new AssertionError("unexpected I/O exception", e);
485 }
486 } while (!wakeupBuffer.hasRemaining());
487 }
488
489
490
491
492
493
494
495
496
497
498
499 private void processRenewQueue() {
500 assert Thread.holdsLock(lock);
501 assert selectingThread != null;
502
503 while (!isRenewQueueEmpty()) {
504 Key key = removeFromRenewQueue(renewMaskRef);
505 int renewMask = renewMaskRef[0];
506 assert renewMask != 0;
507
508 if (key.selectionKey == null) {
509 assert key.interestMask == 0 && key.readyMask == 0;
510
511 try {
512 key.selectionKey = key.channel.register(selector,
513 renewMask);
514 key.selectionKey.attach(key);
515 key.interestMask = renewMask;
516 } catch (ClosedChannelException e) {
517 addOrUpdateReadyQueue(key, renewMask);
518 } catch (IllegalBlockingModeException e) {
519 addOrUpdateReadyQueue(key, renewMask);
520 }
521 } else {
522 assert (key.interestMask & renewMask) == 0;
523
524 int newInterestMask = key.interestMask | renewMask;
525 try {
526 assert key.interestMask == key.selectionKey.interestOps();
527 key.selectionKey.interestOps(newInterestMask);
528 key.interestMask = newInterestMask;
529 } catch (CancelledKeyException e) {
530 addOrUpdateReadyQueue(key, newInterestMask);
531 key.interestMask = 0;
532 }
533
534 assert (key.interestMask & key.readyMask) == 0;
535 }
536 }
537 }
538
539
540
541
542
543 private boolean isRenewQueueEmpty() {
544 assert Thread.holdsLock(lock);
545 return renewQueue == null;
546 }
547
548 private Key removeFromRenewQueue(int[] renewMaskOut) {
549 assert renewMaskOut != null && renewMaskOut.length == 1;
550 assert Thread.holdsLock(lock);
551
552 Key key = renewQueue;
553 assert key != null;
554
555 assert key.onRenewQueue;
556 assert key.renewMask != 0;
557 renewMaskOut[0] = key.renewMask;
558 key.renewMask = 0;
559 renewQueue = key.renewQueueNext;
560 key.renewQueueNext = null;
561 key.onRenewQueue = false;
562 return key;
563 }
564
565 private void addOrUpdateRenewQueue(Key key, int newRenewMask) {
566 assert newRenewMask != 0;
567 assert Thread.holdsLock(lock);
568
569 if (!key.onRenewQueue) {
570 assert key.renewMask == 0;
571 assert key.renewQueueNext == null;
572 key.renewMask = newRenewMask;
573 key.renewQueueNext = renewQueue;
574 renewQueue = key;
575 key.onRenewQueue = true;
576 } else {
577 assert key.renewMask != 0;
578 assert (key.renewMask & newRenewMask) == 0;
579 key.renewMask |= newRenewMask;
580 }
581 }
582
583 private boolean isReadyQueueEmpty() {
584 assert Thread.holdsLock(lock);
585 return readyQueue == null;
586 }
587
588 private Key removeFromReadyQueue(int[] readyMaskOut) {
589 assert readyMaskOut != null && readyMaskOut.length == 1;
590 assert Thread.holdsLock(lock);
591
592 Key key = readyQueue;
593 assert key != null;
594
595 assert key.onReadyQueue;
596 assert key.readyMask != 0;
597 readyMaskOut[0] = key.readyMask;
598 key.readyMask = 0;
599 readyQueue = key.readyQueueNext;
600 key.readyQueueNext = null;
601 key.onReadyQueue = false;
602 return key;
603 }
604
605 private void addOrUpdateReadyQueue(Key key, int newReadyMask) {
606 assert newReadyMask != 0;
607 assert Thread.holdsLock(lock);
608
609 if (!key.onReadyQueue) {
610 assert key.readyMask == 0;
611 assert key.readyQueueNext == null;
612 key.readyMask = newReadyMask;
613 key.readyQueueNext = readyQueue;
614 readyQueue = key;
615 key.onReadyQueue = true;
616 } else {
617 assert key.readyMask != 0;
618 assert (key.readyMask & newReadyMask) == 0;
619 key.readyMask |= newReadyMask;
620 }
621 }
622 }