View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.river.jeri.internal.runtime;
20  
21  import org.apache.river.logging.Levels;
22  import org.apache.river.thread.Executor;
23  import org.apache.river.thread.GetThreadPoolAction;
24  import java.io.IOException;
25  import java.nio.ByteBuffer;
26  import java.nio.channels.CancelledKeyException;
27  import java.nio.channels.ClosedChannelException;
28  import java.nio.channels.IllegalBlockingModeException;
29  import java.nio.channels.Pipe;
30  import java.nio.channels.SelectableChannel;
31  import java.nio.channels.SelectionKey;
32  import java.nio.channels.Selector;
33  import java.security.AccessController;
34  import java.util.Collections;
35  import java.util.Iterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.WeakHashMap;
39  import java.util.logging.Level;
40  import java.util.logging.Logger;
41  
42  /**
43   * A SelectionManager provides an event dispatching layer on top of the
44   * java.nio.Selector and java.nio.SelectableChannel abstractions; it manages
45   * one-shot registrations of interest in I/O readiness events and
46   * dispatching notifications of such events to registered callback objects.
47   *
48   * SelectionManager is designed to support multiple select/dispatch threads
49   * to allow for improved I/O concurrency on symmetric multiprocessor systems.
50   *
51   * If interest in a particular I/O event is (re-)registered while there is a
52   * blocking select operation in progress, that select operation must be woken
53   * up so that it can take the new interest into account; this wakeup is
54   * achieved by writing a byte to an internal pipe that is registered with the
55   * underlying selector.
56   *
57   * A current limitation of this API is that it does not allow an executing
58   * callback to yield control in such a way that it will be scheduled to
59   * execute again without another I/O readiness event occurring.  Therefore,
60   * data that gets read during a callback must also get processed, unless such
61   * processing depends on more data that has not yet been read (like the
62   * remainder of a partial message).
63   *
64   * <p>This implementation uses the {@link Logger} named
65   * <code>org.apache.river.jeri.internal.runtime.SelectionManager</code> to
66   * log information at the following levels:
67   *
68   * <p><table summary="Describes what is logged by SelectionManager at
69   * various logging levels" border=1 cellpadding=5>
70   *
71   * <tr> <th> Level <th> Description
72   *
73   * <tr> <td> {@link Levels#HANDLED HANDLED} <td> I/O exception caught
74   * from select operation
75   *
76   * </table>
77   *
78   * @author Sun Microsystems, Inc.
79   **/
80  public final class SelectionManager {
81  
82      /** number of concurrent I/O processing threads */
83      private static final int concurrency = 1;	// REMIND: get from property?
84  
85      private static final Logger logger = Logger.getLogger(
86  	"org.apache.river.jeri.internal.runtime.SelectionManager");
87  
88      /** pool of threads for executing tasks in system thread group */
89      private static final Executor systemThreadPool = (Executor)
90  	AccessController.doPrivileged(new GetThreadPoolAction(false));
91  
92      /** shared Selector used by this SelectionManager */
93      private final Selector selector;
94  
95      /** internal pipe used to wake up a blocked select operation */
96      private final Pipe.SinkChannel wakeupPipeSink;
97      private final Pipe.SourceChannel wakeupPipeSource;
98      private final SelectionKey wakeupPipeKey;
99      private final ByteBuffer wakeupBuffer = ByteBuffer.allocate(2);
100 
101     /** set of registered channels, to detect duplicate registrations */
102     private final Map registeredChannels = 
103 	Collections.synchronizedMap(new WeakHashMap());
104 
105     /**
106      * lock guarding selectingThread, wakeupPending, renewQueue, readyQueue,
107      * renewMaskRef, and mutable state of all Key instances.
108      */
109     private final Object lock = new Object();
110 
111     /** thread with exclusive right to perform a select operation, if any */
112     private Thread selectingThread = null;
113 
114     /** true if a wakeup has been requested but not yet processed */
115     private boolean wakeupPending = false;
116 
117     /*
118      * The following two queues of Key objects are implemented as LIFO
119      * linked lists with internally threaded links for fast addition and
120      * removal of elements (no memory allocation overhead) and so that
121      * multiple entries for the same channel get implicitly combined.
122      * LIFO ordering is OK because the entire queue always gets drained
123      * and processed at once, and in between such processing, addition
124      * order is arbitrary anyway.
125      */
126 
127     /** queue of keys that need to have interest masks updated */
128     private Key renewQueue = null;
129 
130     /** queue of keys that have I/O operations ready to be handled */
131     private Key readyQueue = null;
132 
133     /** holder used for pass-by-reference invocations */
134     private final int[] renewMaskRef = new int[1];
135 
136     /**
137      * Creates a new SelectionManager.
138      *
139      * REMIND: Is this necessary, or should we just provide access to
140      * a singleton instance?
141      */
142     public SelectionManager() throws IOException {
143 
144 	// REMIND: create threads and other resources lazily?
145 
146 	selector = Selector.open();
147 
148 	Pipe pipe = Pipe.open();
149 	wakeupPipeSink = pipe.sink();
150 	wakeupPipeSource = pipe.source();
151 	wakeupPipeSource.configureBlocking(false);
152 	wakeupPipeKey = wakeupPipeSource.register(selector,
153 						  SelectionKey.OP_READ);
154 
155 	for (int i = 0; i < concurrency; i++) {
156 	    systemThreadPool.execute(new SelectLoop(),
157 				     "I/O SelectionManager-" + i);
158 	}
159 
160 	// REMIND: How do these threads and other resources get cleaned up?
161 	// REMIND: Should there be an explicit close method?
162     }
163 
164     /**
165      * Registers the given SelectableChannel with this SelectionManager.
166      * After registration, the returned Key's renewInterestMask method may
167      * be used to register one-shot interest in particular I/O events.
168      */
169     public Key register(SelectableChannel channel, SelectionHandler handler) {
170 	if (registeredChannels.containsKey(channel)) {
171 	    throw new IllegalStateException("channel already registered");
172 	}
173 	Key key = new Key(channel, handler);
174 	registeredChannels.put(channel, null);
175 	return key;
176     }
177 
178     /**
179      * SelectionHandler is the callback interface for an object that will
180      * process an I/O readiness event that has been detected by a
181      * SelectionManager.
182      */
183     public interface SelectionHandler {
184 	void handleSelection(int readyMask, Key key);
185     }
186 
187     /**
188      * A Key represents a given SelectableChannel's registration with this
189      * SelectionManager.  Externally, this object is used to re-register
190      * interest in I/O readiness events that have been previously detected
191      * and dispatched.
192      */
193     public final class Key {
194 
195 	/** the channel that this Key represents a registration for */
196 	final SelectableChannel channel;
197 	/** the supplied callback object for dispatching I/O events */
198 	final SelectionHandler handler;
199 
200 	// mutable instance state guarded by enclosing SelectionManager's lock:
201 
202 	/**
203 	 * the SelectionKey representing this Key's registration with the
204 	 * internal Selector, or null if it hasn't yet been registered
205 	 */
206 	SelectionKey selectionKey = null;
207 
208 	/** the current interest mask established with the SelectionKey */
209 	int interestMask = 0;
210 
211 	boolean onRenewQueue = false;	// invariant: == (renewMask != 0)
212 	Key renewQueueNext = null;	// null if !onRenewQueue
213 	int renewMask = 0;
214 
215 	boolean onReadyQueue = false;	// invariant: == (readyMask != 0)
216 	Key readyQueueNext = null;	// null if !onReadyQueue
217 	int readyMask = 0;
218 
219 	/*
220 	 * other invariants:
221 	 *
222 	 * (renewMask & interestMask) == 0
223 	 * (interestMask & readyMask) == 0
224 	 * (renewMask & readyMask) == 0
225 	 */
226 
227 	Key(SelectableChannel channel, SelectionHandler handler) {
228 	    this.channel = channel;
229 	    this.handler = handler;
230 	}
231 
232 	/**
233 	 * Renews interest in receiving notifications when the I/O operations
234 	 * identified by the specified mask are ready for the associated
235 	 * SelectableChannel.  The specified mask identifies I/O operations
236 	 * with the same bit values as would a java.nio.SelectionKey for the
237 	 * same SelectableChannel.
238 	 *
239 	 * Some time after one of the operations specified in the mask is
240 	 * detected to be ready, the previously-registered SelectionHandler
241 	 * callback object will be invoked to handle the readiness event.
242 	 *
243 	 * An event for each operation specified will only be dispatched to the
244 	 * callback handler once for the invocation of this method; to
245 	 * re-register interest in subsequent readiness of the same operation
246 	 * for the given channel, this method must be invoked again.
247 	 */
248 	public void renewInterestMask(int mask)
249 	    throws ClosedChannelException
250 	{
251 	    if (!channel.isOpen()) {
252 		throw new ClosedChannelException();
253 	    }
254 	    if ((mask & ~channel.validOps()) != 0) {
255 		throw new IllegalArgumentException(
256 		    "invalid mask " + mask +
257 		    " (valid mask " + channel.validOps() + ")");
258 	    }
259 	    if (channel.isBlocking()) {
260 		throw new IllegalBlockingModeException();
261 	    }
262 	    synchronized (lock) {
263 		int delta = mask & ~(renewMask | interestMask | readyMask);
264 		if (delta != 0) {
265 		    addOrUpdateRenewQueue(this, delta);
266 		    if (selectingThread != null && !wakeupPending) {
267 			wakeupSelector();
268 			wakeupPending = true;
269 		    }
270 		}
271 	    }
272 	}
273     }
274 
275     /**
276      * SelectLoop provides the main loop for each I/O processing thread.
277      */
278     private class SelectLoop implements Runnable {
279 
280 	private long lastExceptionTime = 0L;	// local to select thread
281 	private int recentExceptionCount;	// local to select thread
282 
283 	public void run() {
284 	    int[] readyMaskRef = new int[1];
285 	    while (true) {
286 		try {
287 		    Key readyKey = waitForReadyKey(readyMaskRef);
288 		    readyKey.handler.handleSelection(readyMaskRef[0],
289 						     readyKey);
290 		} catch (Throwable t) {
291 		    try {
292 			logger.log(Level.WARNING, "select loop throws", t);
293 		    } catch (Throwable tt) {
294 		    }
295 		    throttleLoopOnException();
296 		}
297 	    }
298 	}
299 
300 	/**
301 	 * Throttles the select loop after an exception has been
302 	 * caught: if a burst of 10 exceptions in 5 seconds occurs,
303 	 * then wait for 10 seconds to curb busy CPU usage.
304 	 **/
305 	private void throttleLoopOnException() {
306 	    long now = System.currentTimeMillis();
307 	    if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) {
308 		// last exception was long ago (or this is the first)
309 		lastExceptionTime = now;
310 		recentExceptionCount = 0;
311 	    } else {
312 		// exception burst window was started recently
313 		if (++recentExceptionCount >= 10) {
314 		    try {
315 			Thread.sleep(10000);
316 		    } catch (InterruptedException ignore) {
317 		    }
318 		}
319 	    }
320 	}
321     }
322 
323     /**
324      * Waits until one of the registered channels is ready for one or more
325      * I/O operations.  The Key for the ready channel is returned, and the
326      * first element of the supplied array is set to the mask of the
327      * channel's ready operations.
328      *
329      * If there is a ready channel available, then its key is returned.
330      * If another thread is already performing a select operation, then the
331      * current thread waits for that thread to complete and then begins
332      * again.  Otherwise, the current thread assumes the responsibility of
333      * performing the next select operation.
334      */
335     private Key waitForReadyKey(int[] readyMaskOut)
336 	throws InterruptedException
337     {
338 	assert !Thread.holdsLock(lock);
339 	assert readyMaskOut != null && readyMaskOut.length == 1;
340 
341 	boolean needToClearSelectingThread = false;
342 	Set selectedKeys = selector.selectedKeys();
343 
344 	try {
345 	    synchronized (lock) {
346 		while (isReadyQueueEmpty() && selectingThread != null) {
347 		    lock.wait();
348 		}
349 		if (!isReadyQueueEmpty()) {
350 		    Key readyKey = removeFromReadyQueue(readyMaskOut);
351 		    lock.notify();
352 		    return readyKey;
353 		}
354 
355 		assert selectingThread == null;
356 		selectingThread = Thread.currentThread();
357 		needToClearSelectingThread = true;
358 
359 		processRenewQueue();
360 	    }						// wakeup allowed
361 
362 	    while (true) {
363 		try {
364 		    int n = selector.select();
365 		    if (Thread.interrupted()) {
366 			throw new InterruptedException();
367 		    }
368 		} catch (Error e) {
369 		    String message = e.getMessage();
370 		    if (message != null && message.startsWith("POLLNVAL")) {
371 //			Thread.yield();
372                         Thread.sleep(100L);
373 			continue;		// work around 4458268
374 		    } else {
375 			throw e;
376 		    }
377 		} catch (CancelledKeyException e) {
378 		    continue;			// work around 4458268
379 		} catch (NullPointerException e) {
380 		    continue;			// work around 4729342
381 		} catch (IOException e) {
382 		    logger.log(Levels.HANDLED,
383 			       "thrown by select, continuing", e);
384 		    continue;			// work around 4504001
385 		}
386 
387 		synchronized (lock) {
388 		    if (wakeupPending &&
389 			selectedKeys.contains(wakeupPipeKey))
390 		    {
391 			drainWakeupPipe();		// clear wakeup state
392 			wakeupPending = false;
393 			selectedKeys.remove(wakeupPipeKey);
394 		    }
395 		    if (selectedKeys.isEmpty()) {
396 			processRenewQueue();
397 			continue;
398 		    }
399 
400 		    selectingThread = null;
401 		    needToClearSelectingThread = false;
402 		    lock.notify();
403 
404 		    Iterator iter = selectedKeys.iterator();
405 		    assert iter.hasNext();	// there must be at least one
406 		    while (iter.hasNext()) {
407 			SelectionKey selectionKey = (SelectionKey) iter.next();
408 			Key key = (Key) selectionKey.attachment();
409 
410 			int readyMask = 0;
411 			try {
412 			    readyMask = selectionKey.readyOps();
413 			    assert readyMask != 0;
414 			    assert (key.interestMask & readyMask) == readyMask;
415 
416 			    /*
417 			     * Remove interest in I/O events detected to be
418 			     * ready; interest must be renewed after each
419 			     * notification.
420 			     */
421 			    int newInterestMask =
422 				key.interestMask & ~readyMask;
423 			    assert key.interestMask ==
424 				selectionKey.interestOps();
425 			    key.selectionKey.interestOps(newInterestMask);
426 			    key.interestMask = newInterestMask;
427 			} catch (CancelledKeyException e) {
428 			    /*
429 			     * If channel is closed, then all interested events
430 			     * become considered ready immediately.
431 			     */
432 			    readyMask |= key.interestMask;
433 			    key.interestMask = 0;
434 			}
435 			addOrUpdateReadyQueue(key, readyMask);
436 
437 			iter.remove();
438 		    }
439 
440 		    return removeFromReadyQueue(readyMaskOut);
441 		}					// wakeup NOT allowed
442 	    }
443 	} finally {
444 	    if (needToClearSelectingThread) {
445 		synchronized (lock) {
446 		    if (wakeupPending &&
447 			selectedKeys.contains(wakeupPipeKey))
448 		    {
449 			drainWakeupPipe();		// clear wakeup state
450 			wakeupPending = false;
451 			selectedKeys.remove(wakeupPipeKey);
452 		    }
453 		    selectingThread = null;
454 		    needToClearSelectingThread = false;
455 		    lock.notify();
456 		}					// wakeup NOT allowed
457 	    }
458 	}
459     }
460 
461     private void wakeupSelector() {
462 	assert Thread.holdsLock(lock);
463 	assert wakeupPending == false;
464 
465 	wakeupBuffer.clear().limit(1);
466 	try {
467 	    wakeupPipeSink.write(wakeupBuffer);
468 	} catch (IOException e) {
469 	    // REMIND: what if thread was interrupted?
470 	    throw new AssertionError("unexpected I/O exception", e);
471 	}
472     }
473 
474     private void drainWakeupPipe() {
475 	assert Thread.holdsLock(lock);
476 	assert selectingThread != null;
477 
478 	do {
479 	    wakeupBuffer.clear();
480 	    try {
481 		wakeupPipeSource.read(wakeupBuffer);
482 	    } catch (IOException e) {
483 		// REMIND: what if thread was interrupted?
484 		throw new AssertionError("unexpected I/O exception", e);
485 	    }
486 	} while (!wakeupBuffer.hasRemaining());
487     }
488 
489     /**
490      * In preparation for performing a select operation, process all new
491      * and renewed interest registrations so that current SelectionKey
492      * interest masks are up to date.
493      *
494      * This method must not be invoked while there is a select operation in
495      * progress (because otherwise it could block indefinitely); therefore,
496      * it must be invoked only by a thread that has the exclusive right to
497      * perform a select operation.
498      */
499     private void processRenewQueue() {
500 	assert Thread.holdsLock(lock);
501 	assert selectingThread != null;
502 
503 	while (!isRenewQueueEmpty()) {
504 	    Key key = removeFromRenewQueue(renewMaskRef);
505 	    int renewMask = renewMaskRef[0];
506 	    assert renewMask != 0;
507 
508 	    if (key.selectionKey == null) {
509 		assert key.interestMask == 0 && key.readyMask == 0;
510 
511 		try {
512 		    key.selectionKey = key.channel.register(selector,
513 							    renewMask);
514 		    key.selectionKey.attach(key);
515 		    key.interestMask = renewMask;
516 		} catch (ClosedChannelException e) {
517 		    addOrUpdateReadyQueue(key, renewMask);
518 		} catch (IllegalBlockingModeException e) {
519 		    addOrUpdateReadyQueue(key, renewMask);
520 		}
521 	    } else {
522 		assert (key.interestMask & renewMask) == 0;
523 
524 		int newInterestMask = key.interestMask | renewMask;
525 		try {
526 		    assert key.interestMask == key.selectionKey.interestOps();
527 		    key.selectionKey.interestOps(newInterestMask);
528 		    key.interestMask = newInterestMask;
529 		} catch (CancelledKeyException e) {
530 		    addOrUpdateReadyQueue(key, newInterestMask);
531 		    key.interestMask = 0;
532 		}
533 
534 		assert (key.interestMask & key.readyMask) == 0;
535 	    }
536 	}
537     }
538 
539     /*
540      * Queue manipulation utilities:
541      */
542 
543     private boolean isRenewQueueEmpty() {
544 	assert Thread.holdsLock(lock);
545 	return renewQueue == null;
546     }
547 
548     private Key removeFromRenewQueue(int[] renewMaskOut) {
549 	assert renewMaskOut != null && renewMaskOut.length == 1;
550 	assert Thread.holdsLock(lock);
551 
552 	Key key = renewQueue;
553 	assert key != null;
554 
555 	assert key.onRenewQueue;
556 	assert key.renewMask != 0;
557 	renewMaskOut[0] = key.renewMask;
558 	key.renewMask = 0;
559 	renewQueue = key.renewQueueNext;
560 	key.renewQueueNext = null;
561 	key.onRenewQueue = false;
562 	return key;
563     }
564 
565     private void addOrUpdateRenewQueue(Key key, int newRenewMask) {
566 	assert newRenewMask != 0;
567 	assert Thread.holdsLock(lock);
568 
569 	if (!key.onRenewQueue) {
570 	    assert key.renewMask == 0;
571 	    assert key.renewQueueNext == null;
572 	    key.renewMask = newRenewMask;
573 	    key.renewQueueNext = renewQueue;
574 	    renewQueue = key;
575 	    key.onRenewQueue = true;
576 	} else {
577 	    assert key.renewMask != 0;
578 	    assert (key.renewMask & newRenewMask) == 0;
579 	    key.renewMask |= newRenewMask;
580 	}
581     }
582 
583     private boolean isReadyQueueEmpty() {
584 	assert Thread.holdsLock(lock);
585 	return readyQueue == null;
586     }
587 
588     private Key removeFromReadyQueue(int[] readyMaskOut) {
589 	assert readyMaskOut != null && readyMaskOut.length == 1;
590 	assert Thread.holdsLock(lock);
591 
592 	Key key = readyQueue;
593 	assert key != null;
594 
595 	assert key.onReadyQueue;
596 	assert key.readyMask != 0;
597 	readyMaskOut[0] = key.readyMask;
598 	key.readyMask = 0;
599 	readyQueue = key.readyQueueNext;
600 	key.readyQueueNext = null;
601 	key.onReadyQueue = false;
602 	return key;
603     }
604 
605     private void addOrUpdateReadyQueue(Key key, int newReadyMask) {
606 	assert newReadyMask != 0;
607 	assert Thread.holdsLock(lock);
608 
609 	if (!key.onReadyQueue) {
610 	    assert key.readyMask == 0;
611 	    assert key.readyQueueNext == null;
612 	    key.readyMask = newReadyMask;
613 	    key.readyQueueNext = readyQueue;
614 	    readyQueue = key;
615 	    key.onReadyQueue = true;
616 	} else {
617 	    assert key.readyMask != 0;
618 	    assert (key.readyMask & newReadyMask) == 0;
619 	    key.readyMask |= newReadyMask;
620 	}
621     }
622 }