View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.river.jeri.internal.mux;
20  
21  import org.apache.river.jeri.internal.runtime.HexDumpEncoder;
22  import org.apache.river.thread.Executor;
23  import org.apache.river.thread.GetThreadPoolAction;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.OutputStream;
27  import java.nio.ByteBuffer;
28  import java.nio.CharBuffer;
29  import java.nio.channels.SocketChannel;
30  import java.nio.charset.CharacterCodingException;
31  import java.nio.charset.Charset;
32  import java.nio.charset.CharsetDecoder;
33  import java.nio.charset.CharsetEncoder;
34  import java.security.AccessController;
35  import java.util.BitSet;
36  import java.util.Deque;
37  import java.util.LinkedList;
38  import java.util.logging.Level;
39  import java.util.logging.Logger;
40  
41  /**
42   * Mux is the abstract superclass of both client-side and server-side
43   * multiplexed connections.
44   *
45   * @author Sun Microsystems, Inc.
46   **/
47  abstract class Mux {
48  
49      static final int CLIENT = 0;
50      static final int SERVER = 1;
51  
52      static final int MAX_SESSION_ID = 0x7F;
53      public static final int MAX_REQUESTS = MAX_SESSION_ID + 1;
54  
55      static final int NoOperation	= 0x00;	// 00000000
56      static final int Shutdown		= 0x02; // 00000010
57      static final int Ping		= 0x04; // 00000100
58      static final int PingAck		= 0x06; // 00000110
59      static final int Error		= 0x08; // 00001000
60      static final int IncrementRation	= 0x10; // 0001***0
61      static final int Abort		= 0x20; // 001000*0
62      static final int Close		= 0x30; // 00110000
63      static final int Acknowledgment	= 0x40; // 00100000
64      static final int Data		= 0x80; // 100****0
65  
66      static final int IncrementRation_shift	= 0x0E;
67      static final int Abort_partial		= 0x02;
68      static final int Data_open			= 0x10;
69      static final int Data_close			= 0x08;
70      static final int Data_eof			= 0x04;
71      static final int Data_ackRequired		= 0x02;
72  
73      static final int ClientConnectionHeader_negotiate	= 0x01;
74  
75      private static final byte[] magic = {
76  	(byte) 'J', (byte) 'm', (byte) 'u', (byte) 'x'	// 0x4A6D7578
77      };
78  
79      private static final int VERSION = 0x01;
80  
81      /**
82       * pool of threads for executing tasks in system thread group:
83       * used for shutting down sessions when a connection goes down
84       */
85      private static final Executor systemThreadPool =
86  	AccessController.doPrivileged(
87  	    new GetThreadPoolAction(false));
88  
89      /** session shutdown tasks to be executed asynchronously */
90      private static final Deque<Runnable> sessionShutdownQueue = new LinkedList<Runnable>();
91  
92      private static class SessionShutdownTask implements Runnable {
93  	private final Session[] sessions;
94  	private final String message;
95  	private final Throwable cause;
96  
97  	SessionShutdownTask(Session[] sessions,
98  			    String message,
99  			    Throwable cause)
100 	{
101 	    this.sessions = sessions;
102 	    this.message = message;
103 	    this.cause = cause;
104 	}
105 
106 	public void run() {
107 	    for (int i = 0, l = sessions.length; i < l; i++) {
108 		if (sessions[i] != null)
109 		sessions[i].setDown(message, cause);
110 	    }
111 	}
112     }
113 
114     /** mux logger */
115     private static final Logger logger =
116 	Logger.getLogger("net.jini.jeri.connection.mux");
117 
118     final int role;
119     final int initialInboundRation;
120     final int maxFragmentSize;
121 
122     private final ConnectionIO connectionIO;
123     private final boolean directBuffersUseful;
124 
125     /** lock guarding all mutable instance state (below) */
126     final Object muxLock = new Object();
127 
128     int initialOutboundRation;		// set from remote connection
129     // volatile reads, sync writes on muxLock
130     private volatile boolean clientConnectionReady = false; // server header received
131     boolean serverConnectionReady = false;	   // server header sent
132 
133     // volatile reads, sync writes on muxLock
134     volatile boolean muxDown = false;
135     String muxDownMessage;
136     Throwable muxDownCause;
137 
138     final BitSet busySessions = new BitSet(MAX_SESSION_ID + 1);
139     final Session [] sessions = new Session[MAX_SESSION_ID + 1];
140 
141     private int expectedPingCookie = -1;
142     
143     /** ONLY USED BY CLIENT */
144     private final long startTimeout; // milliseconds
145 
146     /**
147      * Constructs a new Mux instance for a connection accessible through
148      * standard (blocking) I/O streams.
149      */
150     Mux(OutputStream out, InputStream in, int role, int initialInboundRation, int maxFragmentSize, long handshakeTimeout)
151 	throws IOException
152     {
153 	this.role = role;
154 	if ((initialInboundRation & ~0x00FFFF00) != 0) {
155 	    throw new IllegalArgumentException(
156 		"illegal initial inbound ration: " +
157 		toHexString(initialInboundRation));
158 	}
159 	this.initialInboundRation = initialInboundRation;
160 	this.maxFragmentSize = maxFragmentSize;
161 
162 	this.connectionIO = new StreamConnectionIO(this, out, in);
163 	directBuffersUseful = false;
164         startTimeout = handshakeTimeout;
165     }
166 
167     Mux(SocketChannel channel, int role, int initialInboundRation, int maxFragmentSize, long handshakeTimeout)
168 	throws IOException
169     {
170 	this.role = role;
171 	if ((initialInboundRation & ~0x00FFFF00) != 0) {
172 	    throw new IllegalArgumentException(
173 		"illegal initial inbound ration: " +
174 		toHexString(initialInboundRation));
175 	}
176 	this.initialInboundRation = initialInboundRation;
177 	this.maxFragmentSize = maxFragmentSize;
178 
179 	this.connectionIO = new SocketChannelConnectionIO(this, channel);
180 	directBuffersUseful = true;
181         startTimeout = handshakeTimeout;
182     }
183     
184     /**
185      * Time in milliseconds for client-side connections to wait for the server
186      * to acknowledge an opening handshake. The default value is 15000
187      * milliseconds (15 seconds).
188      * 
189      * <p>
190      * This method is not thread-safe. It is expected to be called immediately
191      * after a constructor.
192      * 
193      * @param timeout
194      *            positive value in milliseconds
195      */
196 //    public void setStartTimeout(long timeout) {
197 //	if (timeout <= 0)
198 //	    throw new IllegalArgumentException("start timeout must be a positive number of milliseconds");
199 //	this.startTimeout  = timeout;
200 //    }
201 
202     /**
203      * Starts I/O processing.
204      *
205      * This method should be invoked only after this instance has
206      * been completely initialized, so that subclasses will not
207      * see uninitialized state.
208      */
209     public void start() throws IOException {
210         if (role == CLIENT) {
211             readState = READ_SERVER_CONNECTION_HEADER;
212         } else {
213             assert role == SERVER;
214             readState = READ_CLIENT_CONNECTION_HEADER;
215         }
216 
217         try {
218             connectionIO.start();
219         } catch (IOException e) {
220             setDown("I/O error starting connection", e);
221             throw e;
222         }
223 
224         if (role == CLIENT) {
225             asyncSendClientConnectionHeader();
226             long now = System.currentTimeMillis();
227             long endTime = now + this.startTimeout;
228             while (!muxDown && !clientConnectionReady) {
229                 try {
230                     synchronized (muxLock){
231                         muxLock.wait(endTime - now);
232                         if (clientConnectionReady) return;
233                         if (muxDown) throw new IOException(muxDownMessage, muxDownCause);
234                     }
235                     now = System.currentTimeMillis();
236                     if (now < endTime) continue;
237                     String message = "timeout waiting for server to respond to handshake";
238                     setDown(message, null);
239                     throw new IOException(message, null);
240                 } catch (InterruptedException e) {
241                     String message = "interrupt waiting for connection header";
242                     setDown(message, e);
243                     throw new IOException(message, e);
244                 }
245             }
246 	}
247     }
248 
249     /**
250      * Handles indication that this multiplexed connection has
251      * gone down, either through normal operation or failure.
252      *
253      * This method should be overridden by subclasses that want to
254      * implement custom behavior when this connection has gone down.
255      */
256     protected void handleDown() {
257     }
258 
259     /**
260      * This method is invoked internally and is intended to be
261      * overridden by subclasses.
262      */
263     void handleOpen(int sessionID) throws ProtocolException {
264 	throw new ProtocolException(
265 	    "remote endpoint attempted to open session");
266     }
267 
268     /**
269      *
270      * This method is intended to be invoked by subclasses only.
271      *
272      * This method must ONLY be invoked while synchronized on muxLock
273      * and while muxDown is false.
274      */
275     final void addSession(int sessionID, Session session) {
276 	assert Thread.holdsLock(muxLock);
277 	assert !muxDown;
278 	assert !busySessions.get(sessionID);
279 	assert sessions[sessionID] == null;
280 
281 	busySessions.set(sessionID);
282 	sessions[sessionID] = session;
283     }
284 
285     /**
286      *
287      * This method is intended to be invoked by this class and
288      * subclasses only.
289      *
290      * This method MAY be invoked while synchronized on muxLock if failure
291      * occurs during start up.
292      */
293     final void setDown(final String message, final Throwable cause) {
294 	SessionShutdownTask sst;
295         if (muxDown) return;
296 	synchronized (muxLock) {
297 	    muxDown = true;
298 	    muxDownMessage = message;
299 	    muxDownCause = cause;
300 	    sst = new SessionShutdownTask(sessions.clone(), message, cause);
301 	    muxLock.notifyAll();
302 	}
303 
304 	/*
305 	 * The following should be safe because we just left the
306 	 * synchonized block, and after setting the muxDown latch
307 	 * therein, no other thread should ever touch the "sessions"
308 	 * data structure.
309 	 *
310 	 * Sessions are shut down asynchronously in a separate thread
311 	 * to avoid deadlock, in case our caller holds muxLock,
312 	 * because individual session locks must never be acquired
313 	 * while holding muxLock.
314 	 */
315 	synchronized (sessionShutdownQueue) {
316 	    sessionShutdownQueue.add(sst);
317 	}
318 	try {
319 	    systemThreadPool.execute(new Runnable() {
320 		public void run() {
321 		    while (true) {
322 			Runnable task;
323 			synchronized (sessionShutdownQueue) {
324 			    if (sessionShutdownQueue.isEmpty()) break;
325 			    task = sessionShutdownQueue.removeFirst();
326 			}
327 			task.run();
328 		    }
329 		}
330 	    }, "mux session shutdown");
331 	} catch (OutOfMemoryError e) {	// assume out of threads
332 	    try {
333 		logger.log(Level.WARNING,
334 		    "could not create thread for session shutdown", e);
335 	    } catch (Throwable t) {
336 	    }
337 	    // absorb exception to proceed with connection shutdown;
338 	    // session shutdown task will remain on queue for later
339 	} finally {
340 	    handleDown();
341 	}
342     }
343 
344     /**
345      * Removes the identified session from the session table.
346      *
347      * This method is intended to be invoked by the associated Session
348      * object only.
349      */
350     final void removeSession(int sessionID) {
351 	synchronized (muxLock) {
352 	    if (muxDown) {
353 		return;
354 	    }
355 	    assert busySessions.get(sessionID);
356 	    busySessions.clear(sessionID);
357 	    sessions[sessionID] = null;
358 	}
359     }
360 
361     /**
362      * Returns true if it would be useful to pass direct buffers to
363      * this instance's *Send* methods (because the underlying I/O
364      * implementation will pass such buffers directly to channel write
365      * operations); returns false otherwise.
366      */
367     final boolean directBuffersUseful() {
368 	return directBuffersUseful;
369     }
370 
371     /**
372      * Sends the ClientConnectionHeader message for this connection.
373      */
374     final void asyncSendClientConnectionHeader() {
375 	assert role == CLIENT;
376 
377 	ByteBuffer header = ByteBuffer.allocate(8);
378 	header.put(magic)
379 	      .put((byte) VERSION)
380 	      .putShort((short) (initialInboundRation >> 8))
381 	      .put((byte) 0)
382 	      .flip();
383 	connectionIO.asyncSend(header);
384     }
385 
386     /**
387      * Sends the ServerConnectionHeader message for this connection.
388      */
389     final void asyncSendServerConnectionHeader() {
390 	assert role == SERVER;
391 
392 	ByteBuffer header = ByteBuffer.allocate(8);
393 	header.put(magic)
394 	      .put((byte) VERSION)
395 	      .putShort((short) (initialInboundRation >> 8))
396 	      .put((byte) 0)
397 	      .flip();
398 	connectionIO.asyncSend(header);
399     }
400 
401     /**
402      * Sends a NoOperation message with the contents of the supplied buffer
403      * as the data.
404      *
405      * The "length" of the NoOperation message will be the number of bytes
406      * remaining in the buffer, and the data sent will be the contents
407      * of the buffer between its current position and its limit.  Or if
408      * the buffer argument is null, "length" will simply be zero...
409      * REMIND: split into two methods instead?
410      *
411      * The actual writing to the underlying connection, including access to
412      * the buffer's content and other state, is asynchronous with the
413      * invocation of this method; therefore, the supplied buffer must not
414      * be mutated even after this method has returned.
415      */
416     final void asyncSendNoOperation(ByteBuffer buffer) {
417 	ByteBuffer header = ByteBuffer.allocate(4);
418 	header.put((byte) NoOperation)
419 	      .put((byte) 0);
420 
421 	if (buffer != null) {
422 	    assert buffer.remaining() <= 0xFFFF;
423 	    header.putShort((short) buffer.remaining())
424 		  .flip();
425 	    connectionIO.asyncSend(header, buffer);
426 	} else {
427 	    header.putShort((short) 0)
428 		  .flip();
429 	    connectionIO.asyncSend(header);
430 	}
431     }
432 
433     /**
434      * Sends a Shutdown message with the UTF-8 encoding of the supplied
435      * message as the data.  If message is null, then zero bytes of data
436      * will be sent with the message header.
437      */
438     final void asyncSendShutdown(String message) {
439 	ByteBuffer data = (message != null ?
440 			   getUTF8BufferFromString(message) : null);
441 
442 	ByteBuffer header = ByteBuffer.allocate(4);
443 	header.put((byte) Shutdown)
444 	      .put((byte) 0);
445 
446 	if (data != null) {
447 	    assert data.remaining() <= 0xFFFF;
448 	    header.putShort((short) data.remaining())
449 		  .flip();
450 	    connectionIO.asyncSend(header, data);
451 	} else {
452 	    header.putShort((short) 0)
453 		  .flip();
454 	    connectionIO.asyncSend(header);
455 	}
456     }
457 
458     /**
459      * Sends a Ping message with the specified "cookie".
460      */
461     final void asyncSendPing(int cookie) {
462 	assert cookie >= 0 && cookie <= 0xFFFF;
463 
464 	ByteBuffer header = ByteBuffer.allocate(4);
465 	header.put((byte) Ping)
466               .put((byte) 0)
467 	      .putShort((short) cookie)
468 	      .flip();
469 	connectionIO.asyncSend(header);
470     }
471 
472     /**
473      * Sends a PingAck message with the specified "cookie".
474      */
475     final void asyncSendPingAck(int cookie) {
476 	assert cookie >= 0 && cookie <= 0xFFFF;
477 
478 	ByteBuffer header = ByteBuffer.allocate(4);
479 	header.put((byte) PingAck)
480 	      .put((byte) 0)
481 	      .putShort((short) cookie)
482 	      .flip();
483 	connectionIO.asyncSend(header);
484     }
485 
486     /**
487      * Sends an Error message with the UTF-8 encoding of the supplied
488      * message as the data.  If message is null, then zero bytes of data
489      * will be sent with the message header.
490      */
491     final void asyncSendError(String message) {
492 	ByteBuffer data = (message != null ?
493 			   getUTF8BufferFromString(message) : null);
494 
495 	ByteBuffer header = ByteBuffer.allocate(4);
496 	header.put((byte) Error)
497 	      .put((byte) 0);
498 
499 	if (data != null) {
500 	    assert data.remaining() <= 0xFFFF;
501 	    header.putShort((short) data.remaining())
502 		  .flip();
503 	    connectionIO.asyncSend(header, data);
504 	} else {
505 	    header.putShort((short) 0)
506 		  .flip();
507 	    connectionIO.asyncSend(header);
508 	}
509     }
510 
511     /**
512      * Sends an Error message with the UTF-8 encoding of the supplied
513      * message as the data.  If message is null, then zero bytes of data
514      * will be sent with the message header.
515      */
516     final IOFuture futureSendError(String message) {
517 	ByteBuffer data = getUTF8BufferFromString(message);
518 
519 	ByteBuffer header = ByteBuffer.allocate(4);
520 	header.put((byte) Error)
521 	      .put((byte) 0);
522 
523 	assert data.remaining() <= 0xFFFF;
524 	header.putShort((short) data.remaining())
525 	      .flip();
526 	return connectionIO.futureSend(header, data);
527     }
528 
529     /**
530      * Sends an IncrementRation message for the specified "sessionID" and
531      * the specified "increment".
532      */
533     final void asyncSendIncrementRation(/*****int op, *****/int sessionID,
534 					int increment)
535     {
536 	final int op = IncrementRation;
537 //	assert (op & 0xF1) == IncrementRation;	// validate operation code
538 //	assert (op & 0xE0) == 0;		// NYI: support use of shift
539 	assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
540 	assert increment >= 0 && increment <= 0xFFFF;
541 
542 	ByteBuffer header = ByteBuffer.allocate(4);
543 	header.put((byte) op)
544 	      .put((byte) sessionID)
545 	      .putShort((short) increment)
546 	      .flip();
547 	connectionIO.asyncSend(header);
548     }
549 
550     /**
551      * Sends an Abort message for the specified "sessionID" with the contents
552      * of the specified buffer as the data.
553      *
554      * The "length" of the Abort message will be the number of bytes
555      * remaining in the buffer, and the data sent will be the contents
556      * of the buffer between its current position and its limit.  Or if
557      * the buffer argument is null, "length" will simply be zero...
558      * REMIND: split into two methods instead?
559      *
560      * For efficiency, the caller is responsible for pre-computing the first
561      * byte of the message, including any control flags if appropriate.
562      */
563     final void asyncSendAbort(int op, int sessionID, ByteBuffer data) {
564 	assert (op & 0xFD) == Abort;		// validate operation code
565 	assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
566 
567 	ByteBuffer header = ByteBuffer.allocate(4);
568 	header.put((byte) op)
569 	      .put((byte) sessionID);
570 
571 	if (data != null) {
572 	    assert data.remaining() <= 0xFFFF;
573 	    header.putShort((short) data.remaining())
574 		  .flip();
575 	    connectionIO.asyncSend(header, data);
576 	} else {
577 	    header.putShort((short) 0)
578 		  .flip();
579 	    connectionIO.asyncSend(header);
580 	}
581     }
582 
583     /**
584      * Sends a Close message for the specified "sessionID".
585      */
586     final void asyncSendClose(int sessionID) {
587 	assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
588 
589 	ByteBuffer header = ByteBuffer.allocate(4);
590 	header.put((byte) Close)
591 	      .put((byte) sessionID)
592 	      .putShort((short) 0)
593 	      .flip();
594 	connectionIO.asyncSend(header);
595     }
596 
597     /**
598      * Sends an Acknowledgment message for the specified "sessionID".
599      */
600     final void asyncSendAcknowledgment(int sessionID) {
601 	assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
602 
603 	ByteBuffer header = ByteBuffer.allocate(4);
604 	header.put((byte) Acknowledgment)
605 	      .put((byte) sessionID)
606 	      .putShort((short) 0)
607 	      .flip();
608 	connectionIO.asyncSend(header);
609     }
610 
611     /**
612      * Sends a Data message for the specified "sessionID" with the contents
613      * of the supplied buffer as the data.
614      *
615      * The "length" of the Data message will be the number of bytes
616      * remaining in the buffer, and the data sent will be the contents
617      * of the buffer between its current position and its limit.  Or if
618      * the buffer argument is null, "length" will simply be zero...
619      * REMIND: split into two methods instead?
620      *
621      * For efficiency, the caller is responsible for pre-computing the first
622      * byte of the Data message, including any control flags if appropriate.
623      *
624      * The actual writing to the underlying connection, including access to
625      * the buffer's content and other state, is asynchronous with the
626      * invocation of this method; therefore, the supplied buffer must not
627      * be mutated even after this method has returned.
628      */
629     final void asyncSendData(int op, int sessionID, ByteBuffer data) {
630 	assert (op & 0xE1) == Data;	// validate operation code
631 	assert (op & Data_eof) != 0 ||	// close and ackRequired require eof
632 	    (op & Data_close & Data_ackRequired) == 0;
633 	assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
634 
635 	ByteBuffer header = ByteBuffer.allocate(4);
636 	header.put((byte) op)
637 	      .put((byte) sessionID);
638 
639 	if (data != null) {
640 	    assert data.remaining() <= 0xFFFF;
641 	    header.putShort((short) data.remaining())
642 		  .flip();
643 	    connectionIO.asyncSend(header, data);
644 	} else {
645 	    header.putShort((short) 0)
646 		  .flip();
647 	    connectionIO.asyncSend(header);
648 	}
649     }
650 
651     /**
652      * Sends a Data message for the specified sessionID with the contents
653      * of the supplied buffer as the data.
654      *
655      * The "length" of the Data message will be the number of bytes
656      * remaining in the buffer, and the data sent will be the contents
657      * of the buffer between its current position and its limit.
658      *
659      * For efficiency, the caller is responsible for pre-computing the first
660      * byte of the Data message, including any control flags if appropriate.
661      *
662      * The actual writing to the underlying connection, including access to
663      * the buffer's content and other state, is asynchronous with the
664      * invocation of this method; therefore, the supplied buffer must not
665      * be mutated even after this method has returned, until it is guaranteed
666      * that use of the buffer has completed.
667      *
668      * The returned IOFuture object can be used to wait until the write has
669      * definitely completed (or will definitely not complete due to some
670      * failure).  After the write has completed, the buffer's position will
671      * have been incremented to its limit (which will not have changed), the
672      * position may be obtained by calling @link{IOFuture#getPosition()}.
673      */
674     final IOFuture futureSendData(int op, int sessionID, ByteBuffer data) {
675 	assert (op & 0xE1) == Data;	// verify operation code
676 	assert (op & Data_eof) != 0 ||	// close and ackRequired require eof
677 	    (op & Data_close & Data_ackRequired) == 0;
678 	assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
679 	assert data.remaining() <= 0xFFFF;
680 
681 	ByteBuffer header = ByteBuffer.allocate(4);
682 	header.put((byte) op)
683 	      .put((byte) sessionID)
684 	      .putShort((short) data.remaining())
685 	      .flip();
686 	return connectionIO.futureSend(header, data);
687     }
688 
689     /*
690      * read states
691      */
692     private static final int READ_CLIENT_CONNECTION_HEADER	= 0;
693     private static final int READ_SERVER_CONNECTION_HEADER	= 1;
694     private static final int READ_MESSAGE_HEADER		= 2;
695     private static final int READ_MESSAGE_BODY			= 3;
696 
697     /*
698      * current read state lock and variables
699      */
700     private final Object readStateLock = new Object();
701     private volatile int readState;
702     private int currentOp;
703     private int currentSessionID;
704     private int currentLengthRemaining;
705     private ByteBuffer currentDataBuffer = null;
706 
707     void processIncomingData(ByteBuffer buffer) throws ProtocolException {
708 	buffer.flip();	// process data that has been read into buffer
709 	assert buffer.hasRemaining();
710 
711 	synchronized (readStateLock) {
712 	  stateLoop:
713 	    do {
714 		switch (readState) {
715 		  case READ_CLIENT_CONNECTION_HEADER:
716 		    if (!readClientConnectionHeader(buffer)) break stateLoop;
717 		    break;
718 
719 		  case READ_SERVER_CONNECTION_HEADER:
720 		    if (!readServerConnectionHeader(buffer)) break stateLoop;
721 		    break;
722 
723 		  case READ_MESSAGE_HEADER:
724 		    if (!readMessageHeader(buffer)) break stateLoop;
725 		    break;
726 
727 		  case READ_MESSAGE_BODY:
728 		    if (!readMessageBody(buffer)) break stateLoop;
729 		    break;
730 
731 		  default:
732 		    throw new AssertionError();
733 		}
734 	    } while (buffer.hasRemaining());
735 	}
736 
737 	buffer.compact();
738     }
739 
740     private boolean readClientConnectionHeader(ByteBuffer buffer)
741 	throws ProtocolException
742     {
743 	assert role == SERVER;
744         assert Thread.holdsLock(readStateLock);
745 
746 	validatePartialMagicNumber(buffer);
747 	if (buffer.remaining() < 8) {
748 	    return false;		// wait for complete header to arrive
749 	}
750 	int headerPosition = buffer.position();
751 	buffer.position(headerPosition + 4);	// skip header already checked
752 	int version = (buffer.get() & 0xFF);
753 	int ration = (buffer.getShort() & 0xFFFF) << 8;
754 	int flags = (buffer.get() & 0xFF);
755 	boolean negotiate = (flags & ClientConnectionHeader_negotiate) != 0;
756 
757 	synchronized (muxLock) {
758 	    initialOutboundRation = ration;
759 	    asyncSendServerConnectionHeader();
760 
761 	    if (version == 0) {
762 		throw new ProtocolException(
763 		    "bad protocol version: " + version);
764 	    }
765 	    if (version > VERSION) {
766 		if (!negotiate) {
767 		    setDown("unsupported protocol version: " + version, null);
768 		    throw new ProtocolException(
769 			"unsupported protocol version: " + version);
770 		}
771 	    }
772 
773 	    serverConnectionReady = true;
774 	}
775 
776 	readState = READ_MESSAGE_HEADER;
777 	return true;
778     }
779 
780     private boolean readServerConnectionHeader(ByteBuffer buffer)
781 	throws ProtocolException
782     {
783 	assert role == CLIENT;
784         assert Thread.holdsLock(readStateLock);
785         
786 	validatePartialMagicNumber(buffer);
787 
788 	if (buffer.remaining() < 8) {
789 	    return false;
790 	}
791 	int headerPosition = buffer.position();
792 	buffer.position(headerPosition + 4);	// skip header already checked
793 	int version = (buffer.get() & 0xFF);
794 	int ration = (buffer.getShort() & 0xFFFF) << 8;
795 	int flags = (buffer.get() & 0xFF);  //TODO: Determine flags intended use.
796 
797 	synchronized (muxLock) {
798 	    initialOutboundRation = ration;
799 
800 	    if (version == 0) {
801 		throw new ProtocolException(
802 		    "bad protocol version: " + version);
803 	    }
804 	    if (version > VERSION) {
805 		throw new ProtocolException(
806 		    "unexpected protocol version: " + version);
807 	    }
808 
809 	    clientConnectionReady = true;
810 	    muxLock.notifyAll();
811 	}
812 
813 	readState = READ_MESSAGE_HEADER;
814 	return true;
815     }
816 
817     private void validatePartialMagicNumber(ByteBuffer buffer)
818 	throws ProtocolException
819     {
820 	if (buffer.remaining() > 0) {
821 	    byte[] temp = new byte[Math.min(buffer.remaining(), magic.length)];
822 	    buffer.mark();
823 	    buffer.get(temp);
824 	    buffer.reset();
825 	    for (int i = 0; i < temp.length; i++) {
826 		if (temp[i] != magic[i]) {
827 		    setDown((role == CLIENT ? "server" : "client") +
828 			" sent bad magic number: " + toHexString(temp), null);
829 		    throw new ProtocolException("bad magic number: " +
830 						toHexString(temp));
831 		}
832 	    }
833 	}
834     }
835 
836     private boolean readMessageHeader(ByteBuffer buffer)
837 	throws ProtocolException
838     {
839         assert Thread.holdsLock(readStateLock);
840 	if (buffer.remaining() < 4) {
841 	    return false;		// wait for complete header to arrive
842 	}
843 	int headerPosition = buffer.position();
844 	if (logger.isLoggable(Level.FINEST)) {
845 	    logger.log(Level.FINEST,
846 		       "message header: " +
847 		       toHexString(buffer.getInt(headerPosition)));
848 	}
849 
850 	int op = (buffer.get() & 0xFF);
851 	if ((op & 0xE1) == Data) {
852 	    int sessionID = (buffer.get() & 0xFF);
853 	    if (sessionID > MAX_SESSION_ID) {
854 		throw new ProtocolException("bad message header: " +
855 		    toHexString(buffer.getInt(headerPosition)));
856 	    }
857 	    currentOp = op;
858 	    currentSessionID = sessionID;
859 	    currentLengthRemaining = (buffer.getShort() & 0xFFFF);
860 	    if (currentLengthRemaining > 0) {
861 		currentDataBuffer =
862 		    ByteBuffer.allocate(currentLengthRemaining);
863 		readState = READ_MESSAGE_BODY;
864 	    } else {
865 		dispatchCurrentMessage();
866 	    }
867 	    return true;
868 
869 	} else if ((op & 0xF1) == IncrementRation) {
870 	    int sessionID = (buffer.get() & 0xFF);
871 	    if (sessionID > MAX_SESSION_ID) {
872 		throw new ProtocolException("bad message header: " +
873 		    toHexString(buffer.getInt(headerPosition)));
874 	    }
875 	    int increment = (buffer.getShort() & 0xFFFF);
876 	    int shift = op & IncrementRation_shift;
877 	    increment <<= shift;
878 	    handleIncrementRation(sessionID, increment);
879 	    return true;
880 
881 	} else if ((op & 0xFD) == Abort) {
882 	    int sessionID = (buffer.get() & 0xFF);
883 	    if (sessionID > MAX_SESSION_ID) {
884 		throw new ProtocolException("bad message header: " +
885 		    toHexString(buffer.getInt(headerPosition)));
886 	    }
887 	    currentOp = op;
888 	    currentSessionID = sessionID;
889 	    currentLengthRemaining = (buffer.getShort() & 0xFFFF);
890 	    if (currentLengthRemaining > 0) {
891 		currentDataBuffer =
892 		    ByteBuffer.allocate(currentLengthRemaining);
893 		readState = READ_MESSAGE_BODY;
894 	    } else {
895 		dispatchCurrentMessage();
896 	    }
897 	    return true;
898 
899 	}
900 	switch (op) {
901 	  case NoOperation: {
902 	    if (buffer.get() != 0) {	// ignore sign extension
903 		throw new ProtocolException("bad message header: " +
904 		    toHexString(buffer.getInt(headerPosition)));
905 	    }
906 	    currentOp = op;
907 	    currentLengthRemaining = (buffer.getShort() & 0xFFFF);
908 	    currentDataBuffer = null;	// ignore data for NoOperation
909 	    if (currentLengthRemaining > 0) {
910 		readState = READ_MESSAGE_BODY;
911 	    } else {
912 		dispatchCurrentMessage();
913 	    }
914 	    return true;
915 	  }
916 
917 	  case Shutdown: {
918 	    if (buffer.get() != 0) {	// ignore sign extension
919 		throw new ProtocolException("bad message header: " +
920 		    toHexString(buffer.getInt(headerPosition)));
921 	    }
922 	    currentOp = op;
923 	    currentLengthRemaining = (buffer.getShort() & 0xFFFF);
924 	    if (currentLengthRemaining > 0) {
925 		currentDataBuffer =
926 		    ByteBuffer.allocate(currentLengthRemaining);
927 		readState = READ_MESSAGE_BODY;
928 	    } else {
929 		dispatchCurrentMessage();
930 	    }
931 	    return true;
932 	  }
933 
934 	  case Ping: {
935 	    if (buffer.get() != 0) {	// ignore sign extension
936 		throw new ProtocolException("bad message header: " +
937 		    toHexString(buffer.getInt(headerPosition)));
938 	    }
939 	    int cookie = (buffer.getShort() & 0xFFFF);
940 	    handlePing(cookie);
941 	    return true;
942 	  }
943 
944 	  case PingAck: {
945 	    if (buffer.get() != 0) {	// ignore sign extension
946 		throw new ProtocolException("bad message header: " +
947 		    toHexString(buffer.getInt(headerPosition)));
948 	    }
949 	    int cookie = (buffer.getShort() & 0xFFFF);
950 	    handlePingAck(cookie);
951 	    return true;
952 	  }
953 
954 	  case Error: {
955 	    if (buffer.get() != 0) {	// ignore sign extension
956 		throw new ProtocolException("bad message header: " +
957 		    toHexString(buffer.getInt(headerPosition)));
958 	    }
959 	    currentOp = op;
960 	    currentLengthRemaining = (buffer.getShort() & 0xFFFF);
961 	    if (currentLengthRemaining > 0) {
962 		currentDataBuffer =
963 		    ByteBuffer.allocate(currentLengthRemaining);
964 		readState = READ_MESSAGE_BODY;
965 	    } else {
966 		dispatchCurrentMessage();
967 	    }
968 	    return true;
969 	  }
970 
971 	  case Close: {
972 	    int sessionID = (buffer.get() & 0xFF);
973 	    if (sessionID > MAX_SESSION_ID ||
974 		buffer.getShort() != 0)		// ignore sign extension
975 	    {
976 		throw new ProtocolException("bad message header: " +
977 		    toHexString(buffer.getInt(headerPosition)));
978 	    }
979 	    handleClose(sessionID);
980 	    return true;
981 	  }
982 
983 	  case Acknowledgment: {
984 	    int sessionID = (buffer.get() & 0xFF);
985 	    if (sessionID > MAX_SESSION_ID ||
986 		buffer.getShort() != 0)		// ignore sign extension
987 	    {
988 		throw new ProtocolException("bad message header: " +
989 		    toHexString(buffer.getInt(headerPosition)));
990 	    }
991 	    handleAcknowledgment(sessionID);
992 	    return true;
993 	  }
994 
995 	  default:
996 	    throw new ProtocolException("bad message header: " +
997 		toHexString(buffer.getInt(headerPosition)));
998 	}
999     }
1000 
1001     private boolean readMessageBody(ByteBuffer buffer)
1002 	throws ProtocolException
1003     {
1004         assert Thread.holdsLock(readStateLock);
1005 	assert currentLengthRemaining > 0;
1006 	assert currentDataBuffer == null ||
1007 	    currentDataBuffer.remaining() == currentLengthRemaining;
1008 
1009 	if (buffer.remaining() > currentLengthRemaining) {
1010 	    int origLimit = buffer.limit();
1011 	    buffer.limit(buffer.position() + currentLengthRemaining);
1012 	    if (currentDataBuffer != null) {
1013 		currentDataBuffer.put(buffer);
1014 	    } else {
1015 		buffer.position(buffer.position() + currentLengthRemaining);
1016 	    }
1017 	    currentLengthRemaining = 0;
1018 	    buffer.limit(origLimit);
1019 	} else {
1020 	    currentLengthRemaining -= buffer.remaining();
1021 	    if (currentDataBuffer != null) {
1022 		currentDataBuffer.put(buffer);
1023 	    } else {
1024 		buffer.position(buffer.limit());
1025 	    }
1026 	}
1027 
1028 	if (currentLengthRemaining > 0) {
1029 	    return false;
1030 	} else {
1031 	    currentDataBuffer.flip();
1032 	    dispatchCurrentMessage();
1033 	    currentDataBuffer = null;		// don't let this linger
1034 	    readState = READ_MESSAGE_HEADER;
1035 	    return true;
1036 	}
1037     }
1038 
1039     private void dispatchCurrentMessage() throws ProtocolException {
1040 	assert currentDataBuffer == null || currentDataBuffer.hasRemaining();
1041 
1042 	int op = currentOp;
1043 	if ((op & 0xE1) == Data) {
1044 	    boolean open	= (op & Data_open) != 0;
1045 	    boolean close	= (op & Data_close) != 0;
1046 	    boolean eof		= (op & Data_eof) != 0;
1047 	    boolean ackRequired	= (op & Data_ackRequired) != 0;
1048 	    handleData(currentSessionID, open, close, eof, ackRequired,
1049 		       (currentDataBuffer != null ?
1050 			currentDataBuffer : ByteBuffer.allocate(0)));
1051 	    return;
1052 
1053 	} else if ((op & 0xFD) == Abort) {
1054 	    boolean partial = (op & Abort_partial) != 0;
1055 	    handleAbort(currentSessionID, partial,
1056 			(currentDataBuffer != null ?
1057 			 getStringFromUTF8Buffer(currentDataBuffer) : ""));
1058 	    return;
1059 
1060 	}
1061 	switch (op) {
1062 	  case NoOperation:
1063 	    handleNoOperation();
1064 	    return;
1065 
1066 	  case Shutdown:
1067 	    handleShutdown(currentDataBuffer != null ?
1068 			   getStringFromUTF8Buffer(currentDataBuffer) : "");
1069 	    return;
1070 
1071 	  case Error:
1072 	    handleError(currentDataBuffer != null ?
1073 			getStringFromUTF8Buffer(currentDataBuffer) : "");
1074 	    return;
1075 
1076 	  default:
1077 	    throw new AssertionError(Integer.toHexString((byte) op));
1078 	}
1079     }
1080 
1081     private void handleNoOperation() throws ProtocolException {
1082 	if (logger.isLoggable(Level.FINEST)) {
1083 	    logger.log(Level.FINEST, "NoOperation");
1084 	}
1085 
1086 	// do nothing
1087     }
1088 
1089     private void handleShutdown(String message) throws ProtocolException {
1090 	if (logger.isLoggable(Level.FINEST)) {
1091 	    logger.log(Level.FINEST, "Shutdown");
1092 	}
1093 
1094 	if (role != CLIENT) {
1095 	    throw new ProtocolException("Shutdown sent by client");
1096 	}
1097 	setDown("mux connection shut down gracefully", null);
1098 	throw new ProtocolException("received Shutdown message");
1099     }
1100 
1101     private void handlePing(int cookie) throws ProtocolException {
1102 	if (logger.isLoggable(Level.FINEST)) {
1103 	    logger.log(Level.FINEST, "Ping: cookie=" + cookie);
1104 	}
1105 
1106 	asyncSendPingAck(cookie);
1107     }
1108 
1109     private void handlePingAck(int cookie) throws ProtocolException {
1110 	if (logger.isLoggable(Level.FINEST)) {
1111 	    logger.log(Level.FINEST, "PingAck: cookie=" + cookie);
1112 	}
1113 
1114 	synchronized (muxLock) {
1115 	    if (cookie != expectedPingCookie) {
1116 		throw new ProtocolException(
1117 		    "unexpected ping cookie: " + cookie);
1118 	    } else {
1119 		expectedPingCookie = -1;
1120 		// NYI: rest of ping machinery
1121 	    }
1122 	}
1123     }
1124 
1125     private void handleError(String message) throws ProtocolException {
1126 	if (logger.isLoggable(Level.FINEST)) {
1127 	    logger.log(Level.FINEST, "Error");
1128 	}
1129 
1130 	setDown((role == CLIENT ? "server" : "client") +
1131 		" reported protocol error: " + message, null);
1132 	throw new ProtocolException("received Error message");
1133     }
1134 
1135     private void handleIncrementRation(int sessionID, int increment)
1136 	throws ProtocolException
1137     {
1138 	if (logger.isLoggable(Level.FINEST)) {
1139 	    logger.log(Level.FINEST,
1140 		"IncrementRation: sessionID=" + sessionID +
1141 		",increment=" + increment);
1142 	}
1143 
1144 	getSession(sessionID).handleIncrementRation(increment);
1145     }
1146 
1147     private void handleAbort(int sessionID, boolean partial, String message)
1148 	throws ProtocolException
1149     {
1150 	if (logger.isLoggable(Level.FINEST)) {
1151 	    logger.log(Level.FINEST,
1152 		"Abort: sessionID=" + sessionID +
1153 		",partial=" + partial);
1154 	}
1155 
1156 	getSession(sessionID).handleAbort(partial);
1157     }
1158 
1159     private void handleClose(int sessionID) throws ProtocolException {
1160 	if (logger.isLoggable(Level.FINEST)) {
1161 	    logger.log(Level.FINEST, "Close: sessionID=" + sessionID);
1162 	}
1163 
1164 	getSession(sessionID).handleClose();
1165     }
1166 
1167     private void handleAcknowledgment(int sessionID) throws ProtocolException {
1168 	if (logger.isLoggable(Level.FINEST)) {
1169 	    logger.log(Level.FINEST, "Acknowledgment: sessionID=" + sessionID);
1170 	}
1171 
1172 	getSession(sessionID).handleAcknowledgment();
1173     }
1174 
1175     private void handleData(int sessionID, boolean open, boolean close, boolean eof, boolean ackRequired, ByteBuffer data)
1176 	throws ProtocolException
1177     {
1178 	if (logger.isLoggable(Level.FINEST)) {
1179 	    int length = data.remaining();
1180 	    HexDumpEncoder encoder = new HexDumpEncoder();
1181 	    byte[] bytes = new byte[data.remaining()];
1182 	    data.mark();
1183 	    data.get(bytes);
1184 	    data.reset();
1185 	    logger.log(Level.FINEST,
1186                     "Data: sessionID={0}{1}{2}{3}{4},length={5}{6}",
1187                     new Object[]{sessionID,
1188                         open ? ",open" : "",
1189                         close ? ",close" : "",
1190                         eof ? ",eof" : "",
1191                         ackRequired ? ",ackRequired" : "",
1192                         length, 
1193                         length > 0 ? ",data=\n" + encoder.encode(bytes) : ""});
1194 	}
1195 
1196 	if (!eof && (close || ackRequired)) {
1197 	    throw new ProtocolException("Data: eof=" + eof +
1198 					",close=" + close +
1199 					",ackRequired=" + ackRequired);
1200 	}
1201 
1202 	if (open) {
1203 	    handleOpen(sessionID);
1204 	}
1205 
1206 	getSession(sessionID).handleData(data, eof, close, ackRequired);
1207     }
1208 
1209     private Session getSession(int sessionID) throws ProtocolException {
1210 	synchronized (muxLock) {
1211 	    if (!busySessions.get(sessionID)) {
1212 		throw new ProtocolException(
1213 		    "inactive sessionID: " + sessionID);
1214 	    }
1215 	    return sessions[sessionID];
1216 	}
1217     }
1218 
1219     private static ByteBuffer getUTF8BufferFromString(String s) {
1220 	CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
1221 	try {
1222 	    return encoder.encode(CharBuffer.wrap(s));
1223 	} catch (CharacterCodingException e) {
1224 	    return null;
1225 	}
1226     }
1227 
1228     private static String getStringFromUTF8Buffer(ByteBuffer buffer) {
1229 	CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
1230 	try {
1231 	    return decoder.decode(buffer).toString();
1232 	} catch (CharacterCodingException e) {
1233 	    return "(error decoding UTF-8 message: " + e.toString() + ")";
1234 	}
1235     }
1236 
1237 //    private static String toHexString(byte x) {
1238 //	char[] buf = new char[2];
1239 //	buf[0] = toHexChar((x >> 4) & 0xF);
1240 //	buf[1] = toHexChar(x & 0xF);
1241 //	return new String(buf);
1242 //    }
1243 
1244     private static String toHexString(int x) {
1245 	char[] buf = new char[8];
1246 	for (int i = 0; i < 8; i++) {
1247 	    buf[i] = toHexChar((x >> ((7 - i) * 4)) & 0xF);
1248 	}
1249 	return new String(buf);
1250     }
1251 
1252     private static String toHexString(byte[] b) {
1253 	char[] buf = new char[b.length * 2];
1254 	int j = 0;
1255 	for (int i = 0; i < b.length; i++) {
1256 	    buf[j++] = toHexChar((b[i] >> 4) & 0xF);
1257 	    buf[j++] = toHexChar(b[i] & 0xF);
1258 	}
1259 	return new String(buf);
1260     }
1261 
1262     private static char toHexChar(int x) {
1263 	return x < 10 ? (char) ('0' + x) : (char) ('A' - 10 + x);
1264     }
1265 }