View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.river.jeri.internal.mux;
20  
21  import org.apache.river.jeri.internal.runtime.SelectionManager;
22  import org.apache.river.logging.Levels;
23  import java.io.EOFException;
24  import java.io.IOException;
25  import java.nio.ByteBuffer;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.SocketChannel;
28  import java.util.Deque;
29  import java.util.Iterator;
30  import java.util.LinkedList;
31  import java.util.logging.Level;
32  import java.util.logging.Logger;
33  
34  /**
35   * SocketChannelConnectionIO implements the ConnectionIO abstraction for a
36   * connection accessible through a java.nio.channels.SocketChannel, and thus
37   * supports non-blocking I/O.
38   *
39   * @author Sun Microsystems, Inc.
40   **/
41  final class SocketChannelConnectionIO extends ConnectionIO {
42  
43      private static final int RECEIVE_BUFFER_SIZE = 4096;
44      private static final int IOV_MAX = 16; // max writev iovcnt on Solaris...
45  
46      /** mux logger */
47      private static final Logger logger =
48  	Logger.getLogger("net.jini.jeri.connection.mux");
49  
50      /** selection manager used by this implementation */
51      private static final SelectionManager selectionManager;
52      static {					// REMIND: share more widely?
53  	try {
54  	    selectionManager = new SelectionManager();
55  	} catch (IOException e) {
56  	    throw new ExceptionInInitializerError(e);
57  	}
58      }
59  
60      /*
61       * Work around 4496906: sun.nio.ch.IOVecWrapper.<clinit> requires
62       * permission to read the system property "sun.arch.data.model".
63       */
64      static {
65  	java.security.AccessController.doPrivileged(
66  	new java.security.PrivilegedAction() { public Object run() {
67  	    try {
68  		Class.forName("sun.nio.ch.IOVecWrapper");
69  	    } catch (ClassNotFoundException e) {
70  	    }
71  	    return null;
72  	} });
73      }
74  
75      /** detail message of IOException thrown when 4854354 occurs */
76      private static final String detailMessage4854354 =
77  	"A non-blocking socket operation could not be completed immediately";
78  
79      /** socket channel for underlying connection */
80      private final SocketChannel channel;
81  
82      private final SelectionManager.Key key;
83  
84      /**
85       * queue of buffers of data to be sent over connection
86       */
87      private final Deque sendQueue = new LinkedList();
88  
89      /**
90       * queue of alternating buffers (that are in sendQueue) and IOFuture
91       * objects that need to be notified when those buffers are written
92       */
93      private final Deque notifyQueue = new LinkedList();
94  
95      /** buffer for reading incoming data from connection */
96      private final ByteBuffer inputBuffer =
97  	ByteBuffer.allocateDirect(RECEIVE_BUFFER_SIZE);	// ready for reading
98  
99      private final ByteBuffer[] bufferPair = new ByteBuffer[2];
100 
101     private final ByteBuffer[] preallocBufferArray = new ByteBuffer[IOV_MAX];
102 
103     /**
104      * Creates a new SocketChannelConnectionIO for the connection represented
105      * by the supplied SocketChannel.
106      */
107     SocketChannelConnectionIO(Mux mux, SocketChannel channel)
108 	throws IOException
109     {
110 	super(mux);
111 	channel.configureBlocking(false);
112 	this.channel = channel;
113 	key = selectionManager.register(channel, new Handler());
114     }
115 
116     /**
117      * Starts processing connection data.
118      */
119     void start() throws IOException {
120 	key.renewInterestMask(SelectionKey.OP_READ);
121     }
122 
123     void asyncSend(ByteBuffer buffer) {
124 	synchronized (mux.muxLock) {
125 	    if (mux.muxDown) {
126 		return;
127 	    }
128 	    try {
129 		if (sendQueue.isEmpty()) {
130 		    channel.write(buffer);
131 		}
132 		if (buffer.hasRemaining()) {
133 		    sendQueue.addLast(buffer);
134 		    key.renewInterestMask(SelectionKey.OP_WRITE);	// ###
135 		}
136 	    } catch (IOException e) {
137 		mux.setDown("I/O error writing to mux connection: " +
138 			    e.toString(), e);
139 		try {
140 		    channel.close();
141 		} catch (IOException ignore) {
142 		}
143 	    }
144 	}
145     }
146 
147     @Override
148     void asyncSend(ByteBuffer first, ByteBuffer second) {
149 	synchronized (mux.muxLock) {
150 	    if (mux.muxDown) {
151 		return;
152 	    }
153 	    try {
154 		if (sendQueue.isEmpty()) {
155 		    bufferPair[0] = first;
156 		    bufferPair[1] = second;
157 		    try {
158 			channel.write(bufferPair);
159 		    } catch (IOException e) {
160 			// work around 4854354
161 			String message = e.getMessage();
162 			if (message != null &&
163 			    message.indexOf(detailMessage4854354) != -1)
164 			{
165 			    logger.log(Levels.HANDLED,
166 				       "ignoring to work around 4854354", e);
167 			} else {
168 			    throw e;
169 			}
170 		    }
171 		}
172 		if (!first.hasRemaining()) {
173 		    if (second.hasRemaining()) {
174 			sendQueue.addLast(second);
175 			key.renewInterestMask(SelectionKey.OP_WRITE);	// ###
176 		    }
177 		} else {
178 		    sendQueue.addLast(first);
179 		    sendQueue.addLast(second);
180 		    key.renewInterestMask(SelectionKey.OP_WRITE);	// ###
181 		}
182 	    } catch (IOException e) {
183 		mux.setDown("I/O error writing to mux connection: " +
184 			    e.toString(), e);
185 		try {
186 		    channel.close();
187 		} catch (IOException ignore) {
188 		}
189 	    } finally {
190 		bufferPair[0] = null;
191 		bufferPair[1] = null;
192 	    }
193 	}
194     }
195 
196     @Override
197     IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
198 	synchronized (mux.muxLock) {
199 	    IOFuture future = new IOFuture();
200 	    if (mux.muxDown) {
201 		IOException ioe = new IOException(mux.muxDownMessage, mux.muxDownCause);
202 		future.done(ioe);
203 		return future;
204 	    }
205 	    try {
206 		if (sendQueue.isEmpty()) {
207 		    bufferPair[0] = first;
208 		    bufferPair[1] = second;
209 		    try {
210 			channel.write(bufferPair);
211 		    } catch (IOException e) {
212 			// work around 4854354
213 			String message = e.getMessage();
214 			if (message != null &&
215 			    message.indexOf(detailMessage4854354) != -1)
216 			{
217 			    logger.log(Levels.HANDLED,
218 				       "ignoring to work around 4854354", e);
219 			} else {
220 			    throw e;
221 			}
222 		    }
223 		}
224 		if (!first.hasRemaining()) {
225 		    if (second.hasRemaining()) {
226 			sendQueue.addLast(second);
227 			key.renewInterestMask(SelectionKey.OP_WRITE);	// ###
228 			notifyQueue.addLast(second);
229 			notifyQueue.addLast(future);
230 		    } else {
231 			future.done(second.position());
232 		    }
233 		} else {
234 		    sendQueue.addLast(first);
235 		    sendQueue.addLast(second);
236 		    key.renewInterestMask(SelectionKey.OP_WRITE);	// ###
237 		    notifyQueue.addLast(second);
238 		    notifyQueue.addLast(future);
239 		}
240 	    } catch (IOException e) {
241 		mux.setDown("I/O error writing to mux connection: " +
242 			    e.toString(), e);
243 		future.done(e);
244 		try {
245 		    channel.close();
246 		} catch (IOException ignore) {
247 		}
248 	    } finally {
249 		bufferPair[0] = first;
250 		bufferPair[1] = second;
251 	    }
252 	    return future;
253 	}
254 	/*
255 	 * REMIND: Can/should we implement any sort of
256 	 * priority inversion avoidance scheme here?
257 	 */
258     }
259 
260     private void handleWriteReady() {
261 	try {
262 	    synchronized (mux.muxLock) {
263 //		ByteBuffer[] buffers =
264 //		    (ByteBuffer[]) sendQueue.toArray(preallocBufferArray);
265 //		channel.write(buffers);
266 //		while (!sendQueue.isEmpty()) {
267 //		    ByteBuffer bb = (ByteBuffer) sendQueue.getFirst();
268 //		    if (!bb.hasRemaining()) {
269 //			sendQueue.removeFirst();
270 //			if (!notifyQueue.isEmpty() &&
271 //			    bb == notifyQueue.getFirst())
272 //			{
273 //			    notifyQueue.removeFirst();
274 //			    IOFuture future =
275 //				(IOFuture) notifyQueue.removeFirst();
276 //			    future.done();
277 //			}
278 //		    } else {
279 //			key.renewInterestMask(SelectionKey.OP_WRITE);	// ###
280 //			break;
281 //		    }
282 //		}
283 
284 		/*
285 		 * Work around 4481573: must manually break sequence of
286 		 * buffers to write into chunks no larger than IOV_MAX.
287 		 */
288 	      gatherLoop:
289 		while (!sendQueue.isEmpty()) {
290 		    /*
291 		     * Copy up to the first IOV_MAX buffers of the send queue
292 		     * into the preallocated ByteBuffer array.
293 		     */
294 		    ByteBuffer[] bufs = preallocBufferArray; // IOV_MAX length
295 		    int len = sendQueue.size();
296 		    if (len <= bufs.length) {			// optimization
297 			bufs = (ByteBuffer[]) sendQueue.toArray(bufs);
298 		    } else {
299 			Iterator iter = sendQueue.iterator();	// sufficient
300 			len = 0;
301 			while (iter.hasNext() && len < bufs.length) {
302 			    bufs[len++] = (ByteBuffer) iter.next();
303 			}
304 		    }
305 		    try {
306 			channel.write(bufs, 0, len);
307 		    } catch (IOException e) {
308 			// work around 4854354
309 			String message = e.getMessage();
310 			if (message != null &&
311 			    message.indexOf(detailMessage4854354) != -1)
312 			{
313 			    logger.log(Levels.HANDLED,
314 				       "ignoring to work around 4854354", e);
315 			} else {
316 			    throw e;
317 			}
318 		    }        
319 		    for (int i = 0; i < len; i++) {
320 			ByteBuffer bb = bufs[i];
321 			assert bb == sendQueue.getFirst();
322 			if (!bb.hasRemaining()) {
323 			    sendQueue.removeFirst();
324 			    if (!notifyQueue.isEmpty() &&
325 				bb == notifyQueue.getFirst())
326 			    {
327 				notifyQueue.removeFirst();
328 				IOFuture future =
329 				    (IOFuture) notifyQueue.removeFirst();
330 				future.done(bb.position());
331 			    }
332 			} else {
333 			    key.renewInterestMask(SelectionKey.OP_WRITE);// ###
334 			    break gatherLoop;
335 			}
336 		    }
337 		}
338 	    }
339 	} catch (IOException e) {
340 	    try {
341 		logger.log(Levels.HANDLED,
342 			   "mux write handler, I/O error", e);
343 	    } catch (Throwable t) {
344 	    }
345 	    mux.setDown("I/O error writing to mux connection: " +
346 			e.toString(), e);
347 	    drainNotifyQueue();
348 	    try {
349 		channel.close();
350 	    } catch (IOException ignore) {
351 	    }
352 	} catch (Throwable t) {
353 	    try {
354 		logger.log(Level.WARNING,
355 			   "mux write handler, unexpected exception", t);
356 	    } catch (Throwable tt) {
357 	    }
358 	    mux.setDown("unexpected exception in mux write handler: " +
359 			t.toString(), t);
360 	    drainNotifyQueue();
361 	    try {
362 		channel.close();
363 	    } catch (IOException ignore) {
364 	    }
365 	}
366     }
367 
368     private void drainNotifyQueue() {
369 	synchronized (mux.muxLock) {
370 	    assert mux.muxDown;
371 	    while (!notifyQueue.isEmpty()) {
372 		notifyQueue.removeFirst();
373 		IOFuture future = (IOFuture) notifyQueue.removeFirst();
374 		IOException ioe = new IOException(mux.muxDownMessage, mux.muxDownCause);
375 		future.done(ioe);
376 	    }
377 	}
378     }
379 
380     private void handleReadReady() {
381 	try {
382 	    int n = channel.read(inputBuffer);
383 	    if (n == -1) {
384 		throw new EOFException();
385 	    }
386 	    if (n > 0) {
387 		mux.processIncomingData(inputBuffer);
388 	    }
389 	    assert inputBuffer.hasRemaining();
390 	    key.renewInterestMask(SelectionKey.OP_READ);
391 	} catch (ProtocolException e) {
392 	    IOFuture future = null;
393 	    synchronized (mux.muxLock) {
394 		/*
395 		 * If mux connection is already down, then we probably got
396 		 * here because of the receipt of a normal protocol-ending
397 		 * message, like Shutdown or Error, or else something else
398 		 * went wrong anyway.  Otherwise, a real protocol violation
399 		 * was detected, so respond with an Error message before
400 		 * taking down the whole mux connection.
401 		 */
402 		if (!mux.muxDown) {
403 		    try {
404 			logger.log(Levels.HANDLED,
405 				   "mux read handler, protocol error", e);
406 		    } catch (Throwable t) {
407 		    }
408 		    future = mux.futureSendError(e.getMessage());
409 		    mux.setDown("protocol violation detected: " +
410 				e.getMessage(), null);
411 		} else {
412 		    try {
413 			logger.log(Level.FINEST,
414 				   "mux read handler: " + e.getMessage());
415 		    } catch (Throwable t) {
416 		    }
417 		}
418 	    }
419 	    if (future != null) {
420 		try {
421 		    future.waitUntilDone();
422 		} catch (IOException ignore) {
423 		} catch (InterruptedException interrupt) {
424 		    Thread.currentThread().interrupt();
425 		}
426 	    }
427 	    try {
428 		channel.close();
429 	    } catch (IOException ignore) {
430 	    }
431 	} catch (IOException e) {
432 	    try {
433 		logger.log(Levels.HANDLED,
434 			   "mux read handler, I/O error", e);
435 	    } catch (Throwable t) {
436 	    }
437 	    mux.setDown("I/O error reading from mux connection: " +
438 			e.toString(), e);
439 	    try {
440 		channel.close();
441 	    } catch (IOException ignore) {
442 	    }
443 	} catch (Throwable t) {
444 	    try {
445 		logger.log(Level.WARNING,
446 			   "mux read handler, unexpected exception", t);
447 	    } catch (Throwable tt) {
448 	    }
449 	    mux.setDown("unexpected exception in mux read handler: " +
450 			t.toString(), t);
451 	    try {
452 		channel.close();
453 	    } catch (IOException ignore) {
454 	    }
455 	}
456     }
457 
458     private class Handler implements SelectionManager.SelectionHandler {
459         @Override
460 	public void handleSelection(int readyMask, SelectionManager.Key key) {
461 	    if ((readyMask & SelectionKey.OP_WRITE) != 0) {
462 		handleWriteReady();
463 	    }
464 	    if ((readyMask & SelectionKey.OP_READ) != 0) {
465 		handleReadReady();
466 	    }
467 	}
468     }
469 }