1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.river.jeri.internal.mux;
20
21 import org.apache.river.jeri.internal.runtime.HexDumpEncoder;
22 import org.apache.river.thread.Executor;
23 import org.apache.river.thread.GetThreadPoolAction;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.nio.ByteBuffer;
28 import java.nio.CharBuffer;
29 import java.nio.channels.SocketChannel;
30 import java.nio.charset.CharacterCodingException;
31 import java.nio.charset.Charset;
32 import java.nio.charset.CharsetDecoder;
33 import java.nio.charset.CharsetEncoder;
34 import java.security.AccessController;
35 import java.util.BitSet;
36 import java.util.Deque;
37 import java.util.LinkedList;
38 import java.util.logging.Level;
39 import java.util.logging.Logger;
40
41
42
43
44
45
46
47 abstract class Mux {
48
49 static final int CLIENT = 0;
50 static final int SERVER = 1;
51
52 static final int MAX_SESSION_ID = 0x7F;
53 public static final int MAX_REQUESTS = MAX_SESSION_ID + 1;
54
55 static final int NoOperation = 0x00;
56 static final int Shutdown = 0x02;
57 static final int Ping = 0x04;
58 static final int PingAck = 0x06;
59 static final int Error = 0x08;
60 static final int IncrementRation = 0x10;
61 static final int Abort = 0x20;
62 static final int Close = 0x30;
63 static final int Acknowledgment = 0x40;
64 static final int Data = 0x80;
65
66 static final int IncrementRation_shift = 0x0E;
67 static final int Abort_partial = 0x02;
68 static final int Data_open = 0x10;
69 static final int Data_close = 0x08;
70 static final int Data_eof = 0x04;
71 static final int Data_ackRequired = 0x02;
72
73 static final int ClientConnectionHeader_negotiate = 0x01;
74
75 private static final byte[] magic = {
76 (byte) 'J', (byte) 'm', (byte) 'u', (byte) 'x'
77 };
78
79 private static final int VERSION = 0x01;
80
81
82
83
84
85 private static final Executor systemThreadPool =
86 AccessController.doPrivileged(
87 new GetThreadPoolAction(false));
88
89
90 private static final Deque<Runnable> sessionShutdownQueue = new LinkedList<Runnable>();
91
92 private static class SessionShutdownTask implements Runnable {
93 private final Session[] sessions;
94 private final String message;
95 private final Throwable cause;
96
97 SessionShutdownTask(Session[] sessions,
98 String message,
99 Throwable cause)
100 {
101 this.sessions = sessions;
102 this.message = message;
103 this.cause = cause;
104 }
105
106 public void run() {
107 for (int i = 0, l = sessions.length; i < l; i++) {
108 if (sessions[i] != null)
109 sessions[i].setDown(message, cause);
110 }
111 }
112 }
113
114
115 private static final Logger logger =
116 Logger.getLogger("net.jini.jeri.connection.mux");
117
118 final int role;
119 final int initialInboundRation;
120 final int maxFragmentSize;
121
122 private final ConnectionIO connectionIO;
123 private final boolean directBuffersUseful;
124
125
126 final Object muxLock = new Object();
127
128 int initialOutboundRation;
129
130 private volatile boolean clientConnectionReady = false;
131 boolean serverConnectionReady = false;
132
133
134 volatile boolean muxDown = false;
135 String muxDownMessage;
136 Throwable muxDownCause;
137
138 final BitSet busySessions = new BitSet(MAX_SESSION_ID + 1);
139 final Session [] sessions = new Session[MAX_SESSION_ID + 1];
140
141 private int expectedPingCookie = -1;
142
143
144 private final long startTimeout;
145
146
147
148
149
150 Mux(OutputStream out, InputStream in, int role, int initialInboundRation, int maxFragmentSize, long handshakeTimeout)
151 throws IOException
152 {
153 this.role = role;
154 if ((initialInboundRation & ~0x00FFFF00) != 0) {
155 throw new IllegalArgumentException(
156 "illegal initial inbound ration: " +
157 toHexString(initialInboundRation));
158 }
159 this.initialInboundRation = initialInboundRation;
160 this.maxFragmentSize = maxFragmentSize;
161
162 this.connectionIO = new StreamConnectionIO(this, out, in);
163 directBuffersUseful = false;
164 startTimeout = handshakeTimeout;
165 }
166
167 Mux(SocketChannel channel, int role, int initialInboundRation, int maxFragmentSize, long handshakeTimeout)
168 throws IOException
169 {
170 this.role = role;
171 if ((initialInboundRation & ~0x00FFFF00) != 0) {
172 throw new IllegalArgumentException(
173 "illegal initial inbound ration: " +
174 toHexString(initialInboundRation));
175 }
176 this.initialInboundRation = initialInboundRation;
177 this.maxFragmentSize = maxFragmentSize;
178
179 this.connectionIO = new SocketChannelConnectionIO(this, channel);
180 directBuffersUseful = true;
181 startTimeout = handshakeTimeout;
182 }
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209 public void start() throws IOException {
210 if (role == CLIENT) {
211 readState = READ_SERVER_CONNECTION_HEADER;
212 } else {
213 assert role == SERVER;
214 readState = READ_CLIENT_CONNECTION_HEADER;
215 }
216
217 try {
218 connectionIO.start();
219 } catch (IOException e) {
220 setDown("I/O error starting connection", e);
221 throw e;
222 }
223
224 if (role == CLIENT) {
225 asyncSendClientConnectionHeader();
226 long now = System.currentTimeMillis();
227 long endTime = now + this.startTimeout;
228 while (!muxDown && !clientConnectionReady) {
229 try {
230 synchronized (muxLock){
231 muxLock.wait(endTime - now);
232 if (clientConnectionReady) return;
233 if (muxDown) throw new IOException(muxDownMessage, muxDownCause);
234 }
235 now = System.currentTimeMillis();
236 if (now < endTime) continue;
237 String message = "timeout waiting for server to respond to handshake";
238 setDown(message, null);
239 throw new IOException(message, null);
240 } catch (InterruptedException e) {
241 String message = "interrupt waiting for connection header";
242 setDown(message, e);
243 throw new IOException(message, e);
244 }
245 }
246 }
247 }
248
249
250
251
252
253
254
255
256 protected void handleDown() {
257 }
258
259
260
261
262
263 void handleOpen(int sessionID) throws ProtocolException {
264 throw new ProtocolException(
265 "remote endpoint attempted to open session");
266 }
267
268
269
270
271
272
273
274
275 final void addSession(int sessionID, Session session) {
276 assert Thread.holdsLock(muxLock);
277 assert !muxDown;
278 assert !busySessions.get(sessionID);
279 assert sessions[sessionID] == null;
280
281 busySessions.set(sessionID);
282 sessions[sessionID] = session;
283 }
284
285
286
287
288
289
290
291
292
293 final void setDown(final String message, final Throwable cause) {
294 SessionShutdownTask sst;
295 if (muxDown) return;
296 synchronized (muxLock) {
297 muxDown = true;
298 muxDownMessage = message;
299 muxDownCause = cause;
300 sst = new SessionShutdownTask(sessions.clone(), message, cause);
301 muxLock.notifyAll();
302 }
303
304
305
306
307
308
309
310
311
312
313
314
315 synchronized (sessionShutdownQueue) {
316 sessionShutdownQueue.add(sst);
317 }
318 try {
319 systemThreadPool.execute(new Runnable() {
320 public void run() {
321 while (true) {
322 Runnable task;
323 synchronized (sessionShutdownQueue) {
324 if (sessionShutdownQueue.isEmpty()) break;
325 task = sessionShutdownQueue.removeFirst();
326 }
327 task.run();
328 }
329 }
330 }, "mux session shutdown");
331 } catch (OutOfMemoryError e) {
332 try {
333 logger.log(Level.WARNING,
334 "could not create thread for session shutdown", e);
335 } catch (Throwable t) {
336 }
337
338
339 } finally {
340 handleDown();
341 }
342 }
343
344
345
346
347
348
349
350 final void removeSession(int sessionID) {
351 synchronized (muxLock) {
352 if (muxDown) {
353 return;
354 }
355 assert busySessions.get(sessionID);
356 busySessions.clear(sessionID);
357 sessions[sessionID] = null;
358 }
359 }
360
361
362
363
364
365
366
367 final boolean directBuffersUseful() {
368 return directBuffersUseful;
369 }
370
371
372
373
374 final void asyncSendClientConnectionHeader() {
375 assert role == CLIENT;
376
377 ByteBuffer header = ByteBuffer.allocate(8);
378 header.put(magic)
379 .put((byte) VERSION)
380 .putShort((short) (initialInboundRation >> 8))
381 .put((byte) 0)
382 .flip();
383 connectionIO.asyncSend(header);
384 }
385
386
387
388
389 final void asyncSendServerConnectionHeader() {
390 assert role == SERVER;
391
392 ByteBuffer header = ByteBuffer.allocate(8);
393 header.put(magic)
394 .put((byte) VERSION)
395 .putShort((short) (initialInboundRation >> 8))
396 .put((byte) 0)
397 .flip();
398 connectionIO.asyncSend(header);
399 }
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416 final void asyncSendNoOperation(ByteBuffer buffer) {
417 ByteBuffer header = ByteBuffer.allocate(4);
418 header.put((byte) NoOperation)
419 .put((byte) 0);
420
421 if (buffer != null) {
422 assert buffer.remaining() <= 0xFFFF;
423 header.putShort((short) buffer.remaining())
424 .flip();
425 connectionIO.asyncSend(header, buffer);
426 } else {
427 header.putShort((short) 0)
428 .flip();
429 connectionIO.asyncSend(header);
430 }
431 }
432
433
434
435
436
437
438 final void asyncSendShutdown(String message) {
439 ByteBuffer data = (message != null ?
440 getUTF8BufferFromString(message) : null);
441
442 ByteBuffer header = ByteBuffer.allocate(4);
443 header.put((byte) Shutdown)
444 .put((byte) 0);
445
446 if (data != null) {
447 assert data.remaining() <= 0xFFFF;
448 header.putShort((short) data.remaining())
449 .flip();
450 connectionIO.asyncSend(header, data);
451 } else {
452 header.putShort((short) 0)
453 .flip();
454 connectionIO.asyncSend(header);
455 }
456 }
457
458
459
460
461 final void asyncSendPing(int cookie) {
462 assert cookie >= 0 && cookie <= 0xFFFF;
463
464 ByteBuffer header = ByteBuffer.allocate(4);
465 header.put((byte) Ping)
466 .put((byte) 0)
467 .putShort((short) cookie)
468 .flip();
469 connectionIO.asyncSend(header);
470 }
471
472
473
474
475 final void asyncSendPingAck(int cookie) {
476 assert cookie >= 0 && cookie <= 0xFFFF;
477
478 ByteBuffer header = ByteBuffer.allocate(4);
479 header.put((byte) PingAck)
480 .put((byte) 0)
481 .putShort((short) cookie)
482 .flip();
483 connectionIO.asyncSend(header);
484 }
485
486
487
488
489
490
491 final void asyncSendError(String message) {
492 ByteBuffer data = (message != null ?
493 getUTF8BufferFromString(message) : null);
494
495 ByteBuffer header = ByteBuffer.allocate(4);
496 header.put((byte) Error)
497 .put((byte) 0);
498
499 if (data != null) {
500 assert data.remaining() <= 0xFFFF;
501 header.putShort((short) data.remaining())
502 .flip();
503 connectionIO.asyncSend(header, data);
504 } else {
505 header.putShort((short) 0)
506 .flip();
507 connectionIO.asyncSend(header);
508 }
509 }
510
511
512
513
514
515
516 final IOFuture futureSendError(String message) {
517 ByteBuffer data = getUTF8BufferFromString(message);
518
519 ByteBuffer header = ByteBuffer.allocate(4);
520 header.put((byte) Error)
521 .put((byte) 0);
522
523 assert data.remaining() <= 0xFFFF;
524 header.putShort((short) data.remaining())
525 .flip();
526 return connectionIO.futureSend(header, data);
527 }
528
529
530
531
532
533 final void asyncSendIncrementRation(int sessionID,
534 int increment)
535 {
536 final int op = IncrementRation;
537
538
539 assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
540 assert increment >= 0 && increment <= 0xFFFF;
541
542 ByteBuffer header = ByteBuffer.allocate(4);
543 header.put((byte) op)
544 .put((byte) sessionID)
545 .putShort((short) increment)
546 .flip();
547 connectionIO.asyncSend(header);
548 }
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563 final void asyncSendAbort(int op, int sessionID, ByteBuffer data) {
564 assert (op & 0xFD) == Abort;
565 assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
566
567 ByteBuffer header = ByteBuffer.allocate(4);
568 header.put((byte) op)
569 .put((byte) sessionID);
570
571 if (data != null) {
572 assert data.remaining() <= 0xFFFF;
573 header.putShort((short) data.remaining())
574 .flip();
575 connectionIO.asyncSend(header, data);
576 } else {
577 header.putShort((short) 0)
578 .flip();
579 connectionIO.asyncSend(header);
580 }
581 }
582
583
584
585
586 final void asyncSendClose(int sessionID) {
587 assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
588
589 ByteBuffer header = ByteBuffer.allocate(4);
590 header.put((byte) Close)
591 .put((byte) sessionID)
592 .putShort((short) 0)
593 .flip();
594 connectionIO.asyncSend(header);
595 }
596
597
598
599
600 final void asyncSendAcknowledgment(int sessionID) {
601 assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
602
603 ByteBuffer header = ByteBuffer.allocate(4);
604 header.put((byte) Acknowledgment)
605 .put((byte) sessionID)
606 .putShort((short) 0)
607 .flip();
608 connectionIO.asyncSend(header);
609 }
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629 final void asyncSendData(int op, int sessionID, ByteBuffer data) {
630 assert (op & 0xE1) == Data;
631 assert (op & Data_eof) != 0 ||
632 (op & Data_close & Data_ackRequired) == 0;
633 assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
634
635 ByteBuffer header = ByteBuffer.allocate(4);
636 header.put((byte) op)
637 .put((byte) sessionID);
638
639 if (data != null) {
640 assert data.remaining() <= 0xFFFF;
641 header.putShort((short) data.remaining())
642 .flip();
643 connectionIO.asyncSend(header, data);
644 } else {
645 header.putShort((short) 0)
646 .flip();
647 connectionIO.asyncSend(header);
648 }
649 }
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674 final IOFuture futureSendData(int op, int sessionID, ByteBuffer data) {
675 assert (op & 0xE1) == Data;
676 assert (op & Data_eof) != 0 ||
677 (op & Data_close & Data_ackRequired) == 0;
678 assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
679 assert data.remaining() <= 0xFFFF;
680
681 ByteBuffer header = ByteBuffer.allocate(4);
682 header.put((byte) op)
683 .put((byte) sessionID)
684 .putShort((short) data.remaining())
685 .flip();
686 return connectionIO.futureSend(header, data);
687 }
688
689
690
691
692 private static final int READ_CLIENT_CONNECTION_HEADER = 0;
693 private static final int READ_SERVER_CONNECTION_HEADER = 1;
694 private static final int READ_MESSAGE_HEADER = 2;
695 private static final int READ_MESSAGE_BODY = 3;
696
697
698
699
700 private final Object readStateLock = new Object();
701 private volatile int readState;
702 private int currentOp;
703 private int currentSessionID;
704 private int currentLengthRemaining;
705 private ByteBuffer currentDataBuffer = null;
706
707 void processIncomingData(ByteBuffer buffer) throws ProtocolException {
708 buffer.flip();
709 assert buffer.hasRemaining();
710
711 synchronized (readStateLock) {
712 stateLoop:
713 do {
714 switch (readState) {
715 case READ_CLIENT_CONNECTION_HEADER:
716 if (!readClientConnectionHeader(buffer)) break stateLoop;
717 break;
718
719 case READ_SERVER_CONNECTION_HEADER:
720 if (!readServerConnectionHeader(buffer)) break stateLoop;
721 break;
722
723 case READ_MESSAGE_HEADER:
724 if (!readMessageHeader(buffer)) break stateLoop;
725 break;
726
727 case READ_MESSAGE_BODY:
728 if (!readMessageBody(buffer)) break stateLoop;
729 break;
730
731 default:
732 throw new AssertionError();
733 }
734 } while (buffer.hasRemaining());
735 }
736
737 buffer.compact();
738 }
739
740 private boolean readClientConnectionHeader(ByteBuffer buffer)
741 throws ProtocolException
742 {
743 assert role == SERVER;
744 assert Thread.holdsLock(readStateLock);
745
746 validatePartialMagicNumber(buffer);
747 if (buffer.remaining() < 8) {
748 return false;
749 }
750 int headerPosition = buffer.position();
751 buffer.position(headerPosition + 4);
752 int version = (buffer.get() & 0xFF);
753 int ration = (buffer.getShort() & 0xFFFF) << 8;
754 int flags = (buffer.get() & 0xFF);
755 boolean negotiate = (flags & ClientConnectionHeader_negotiate) != 0;
756
757 synchronized (muxLock) {
758 initialOutboundRation = ration;
759 asyncSendServerConnectionHeader();
760
761 if (version == 0) {
762 throw new ProtocolException(
763 "bad protocol version: " + version);
764 }
765 if (version > VERSION) {
766 if (!negotiate) {
767 setDown("unsupported protocol version: " + version, null);
768 throw new ProtocolException(
769 "unsupported protocol version: " + version);
770 }
771 }
772
773 serverConnectionReady = true;
774 }
775
776 readState = READ_MESSAGE_HEADER;
777 return true;
778 }
779
780 private boolean readServerConnectionHeader(ByteBuffer buffer)
781 throws ProtocolException
782 {
783 assert role == CLIENT;
784 assert Thread.holdsLock(readStateLock);
785
786 validatePartialMagicNumber(buffer);
787
788 if (buffer.remaining() < 8) {
789 return false;
790 }
791 int headerPosition = buffer.position();
792 buffer.position(headerPosition + 4);
793 int version = (buffer.get() & 0xFF);
794 int ration = (buffer.getShort() & 0xFFFF) << 8;
795 int flags = (buffer.get() & 0xFF);
796
797 synchronized (muxLock) {
798 initialOutboundRation = ration;
799
800 if (version == 0) {
801 throw new ProtocolException(
802 "bad protocol version: " + version);
803 }
804 if (version > VERSION) {
805 throw new ProtocolException(
806 "unexpected protocol version: " + version);
807 }
808
809 clientConnectionReady = true;
810 muxLock.notifyAll();
811 }
812
813 readState = READ_MESSAGE_HEADER;
814 return true;
815 }
816
817 private void validatePartialMagicNumber(ByteBuffer buffer)
818 throws ProtocolException
819 {
820 if (buffer.remaining() > 0) {
821 byte[] temp = new byte[Math.min(buffer.remaining(), magic.length)];
822 buffer.mark();
823 buffer.get(temp);
824 buffer.reset();
825 for (int i = 0; i < temp.length; i++) {
826 if (temp[i] != magic[i]) {
827 setDown((role == CLIENT ? "server" : "client") +
828 " sent bad magic number: " + toHexString(temp), null);
829 throw new ProtocolException("bad magic number: " +
830 toHexString(temp));
831 }
832 }
833 }
834 }
835
836 private boolean readMessageHeader(ByteBuffer buffer)
837 throws ProtocolException
838 {
839 assert Thread.holdsLock(readStateLock);
840 if (buffer.remaining() < 4) {
841 return false;
842 }
843 int headerPosition = buffer.position();
844 if (logger.isLoggable(Level.FINEST)) {
845 logger.log(Level.FINEST,
846 "message header: " +
847 toHexString(buffer.getInt(headerPosition)));
848 }
849
850 int op = (buffer.get() & 0xFF);
851 if ((op & 0xE1) == Data) {
852 int sessionID = (buffer.get() & 0xFF);
853 if (sessionID > MAX_SESSION_ID) {
854 throw new ProtocolException("bad message header: " +
855 toHexString(buffer.getInt(headerPosition)));
856 }
857 currentOp = op;
858 currentSessionID = sessionID;
859 currentLengthRemaining = (buffer.getShort() & 0xFFFF);
860 if (currentLengthRemaining > 0) {
861 currentDataBuffer =
862 ByteBuffer.allocate(currentLengthRemaining);
863 readState = READ_MESSAGE_BODY;
864 } else {
865 dispatchCurrentMessage();
866 }
867 return true;
868
869 } else if ((op & 0xF1) == IncrementRation) {
870 int sessionID = (buffer.get() & 0xFF);
871 if (sessionID > MAX_SESSION_ID) {
872 throw new ProtocolException("bad message header: " +
873 toHexString(buffer.getInt(headerPosition)));
874 }
875 int increment = (buffer.getShort() & 0xFFFF);
876 int shift = op & IncrementRation_shift;
877 increment <<= shift;
878 handleIncrementRation(sessionID, increment);
879 return true;
880
881 } else if ((op & 0xFD) == Abort) {
882 int sessionID = (buffer.get() & 0xFF);
883 if (sessionID > MAX_SESSION_ID) {
884 throw new ProtocolException("bad message header: " +
885 toHexString(buffer.getInt(headerPosition)));
886 }
887 currentOp = op;
888 currentSessionID = sessionID;
889 currentLengthRemaining = (buffer.getShort() & 0xFFFF);
890 if (currentLengthRemaining > 0) {
891 currentDataBuffer =
892 ByteBuffer.allocate(currentLengthRemaining);
893 readState = READ_MESSAGE_BODY;
894 } else {
895 dispatchCurrentMessage();
896 }
897 return true;
898
899 }
900 switch (op) {
901 case NoOperation: {
902 if (buffer.get() != 0) {
903 throw new ProtocolException("bad message header: " +
904 toHexString(buffer.getInt(headerPosition)));
905 }
906 currentOp = op;
907 currentLengthRemaining = (buffer.getShort() & 0xFFFF);
908 currentDataBuffer = null;
909 if (currentLengthRemaining > 0) {
910 readState = READ_MESSAGE_BODY;
911 } else {
912 dispatchCurrentMessage();
913 }
914 return true;
915 }
916
917 case Shutdown: {
918 if (buffer.get() != 0) {
919 throw new ProtocolException("bad message header: " +
920 toHexString(buffer.getInt(headerPosition)));
921 }
922 currentOp = op;
923 currentLengthRemaining = (buffer.getShort() & 0xFFFF);
924 if (currentLengthRemaining > 0) {
925 currentDataBuffer =
926 ByteBuffer.allocate(currentLengthRemaining);
927 readState = READ_MESSAGE_BODY;
928 } else {
929 dispatchCurrentMessage();
930 }
931 return true;
932 }
933
934 case Ping: {
935 if (buffer.get() != 0) {
936 throw new ProtocolException("bad message header: " +
937 toHexString(buffer.getInt(headerPosition)));
938 }
939 int cookie = (buffer.getShort() & 0xFFFF);
940 handlePing(cookie);
941 return true;
942 }
943
944 case PingAck: {
945 if (buffer.get() != 0) {
946 throw new ProtocolException("bad message header: " +
947 toHexString(buffer.getInt(headerPosition)));
948 }
949 int cookie = (buffer.getShort() & 0xFFFF);
950 handlePingAck(cookie);
951 return true;
952 }
953
954 case Error: {
955 if (buffer.get() != 0) {
956 throw new ProtocolException("bad message header: " +
957 toHexString(buffer.getInt(headerPosition)));
958 }
959 currentOp = op;
960 currentLengthRemaining = (buffer.getShort() & 0xFFFF);
961 if (currentLengthRemaining > 0) {
962 currentDataBuffer =
963 ByteBuffer.allocate(currentLengthRemaining);
964 readState = READ_MESSAGE_BODY;
965 } else {
966 dispatchCurrentMessage();
967 }
968 return true;
969 }
970
971 case Close: {
972 int sessionID = (buffer.get() & 0xFF);
973 if (sessionID > MAX_SESSION_ID ||
974 buffer.getShort() != 0)
975 {
976 throw new ProtocolException("bad message header: " +
977 toHexString(buffer.getInt(headerPosition)));
978 }
979 handleClose(sessionID);
980 return true;
981 }
982
983 case Acknowledgment: {
984 int sessionID = (buffer.get() & 0xFF);
985 if (sessionID > MAX_SESSION_ID ||
986 buffer.getShort() != 0)
987 {
988 throw new ProtocolException("bad message header: " +
989 toHexString(buffer.getInt(headerPosition)));
990 }
991 handleAcknowledgment(sessionID);
992 return true;
993 }
994
995 default:
996 throw new ProtocolException("bad message header: " +
997 toHexString(buffer.getInt(headerPosition)));
998 }
999 }
1000
1001 private boolean readMessageBody(ByteBuffer buffer)
1002 throws ProtocolException
1003 {
1004 assert Thread.holdsLock(readStateLock);
1005 assert currentLengthRemaining > 0;
1006 assert currentDataBuffer == null ||
1007 currentDataBuffer.remaining() == currentLengthRemaining;
1008
1009 if (buffer.remaining() > currentLengthRemaining) {
1010 int origLimit = buffer.limit();
1011 buffer.limit(buffer.position() + currentLengthRemaining);
1012 if (currentDataBuffer != null) {
1013 currentDataBuffer.put(buffer);
1014 } else {
1015 buffer.position(buffer.position() + currentLengthRemaining);
1016 }
1017 currentLengthRemaining = 0;
1018 buffer.limit(origLimit);
1019 } else {
1020 currentLengthRemaining -= buffer.remaining();
1021 if (currentDataBuffer != null) {
1022 currentDataBuffer.put(buffer);
1023 } else {
1024 buffer.position(buffer.limit());
1025 }
1026 }
1027
1028 if (currentLengthRemaining > 0) {
1029 return false;
1030 } else {
1031 currentDataBuffer.flip();
1032 dispatchCurrentMessage();
1033 currentDataBuffer = null;
1034 readState = READ_MESSAGE_HEADER;
1035 return true;
1036 }
1037 }
1038
1039 private void dispatchCurrentMessage() throws ProtocolException {
1040 assert currentDataBuffer == null || currentDataBuffer.hasRemaining();
1041
1042 int op = currentOp;
1043 if ((op & 0xE1) == Data) {
1044 boolean open = (op & Data_open) != 0;
1045 boolean close = (op & Data_close) != 0;
1046 boolean eof = (op & Data_eof) != 0;
1047 boolean ackRequired = (op & Data_ackRequired) != 0;
1048 handleData(currentSessionID, open, close, eof, ackRequired,
1049 (currentDataBuffer != null ?
1050 currentDataBuffer : ByteBuffer.allocate(0)));
1051 return;
1052
1053 } else if ((op & 0xFD) == Abort) {
1054 boolean partial = (op & Abort_partial) != 0;
1055 handleAbort(currentSessionID, partial,
1056 (currentDataBuffer != null ?
1057 getStringFromUTF8Buffer(currentDataBuffer) : ""));
1058 return;
1059
1060 }
1061 switch (op) {
1062 case NoOperation:
1063 handleNoOperation();
1064 return;
1065
1066 case Shutdown:
1067 handleShutdown(currentDataBuffer != null ?
1068 getStringFromUTF8Buffer(currentDataBuffer) : "");
1069 return;
1070
1071 case Error:
1072 handleError(currentDataBuffer != null ?
1073 getStringFromUTF8Buffer(currentDataBuffer) : "");
1074 return;
1075
1076 default:
1077 throw new AssertionError(Integer.toHexString((byte) op));
1078 }
1079 }
1080
1081 private void handleNoOperation() throws ProtocolException {
1082 if (logger.isLoggable(Level.FINEST)) {
1083 logger.log(Level.FINEST, "NoOperation");
1084 }
1085
1086
1087 }
1088
1089 private void handleShutdown(String message) throws ProtocolException {
1090 if (logger.isLoggable(Level.FINEST)) {
1091 logger.log(Level.FINEST, "Shutdown");
1092 }
1093
1094 if (role != CLIENT) {
1095 throw new ProtocolException("Shutdown sent by client");
1096 }
1097 setDown("mux connection shut down gracefully", null);
1098 throw new ProtocolException("received Shutdown message");
1099 }
1100
1101 private void handlePing(int cookie) throws ProtocolException {
1102 if (logger.isLoggable(Level.FINEST)) {
1103 logger.log(Level.FINEST, "Ping: cookie=" + cookie);
1104 }
1105
1106 asyncSendPingAck(cookie);
1107 }
1108
1109 private void handlePingAck(int cookie) throws ProtocolException {
1110 if (logger.isLoggable(Level.FINEST)) {
1111 logger.log(Level.FINEST, "PingAck: cookie=" + cookie);
1112 }
1113
1114 synchronized (muxLock) {
1115 if (cookie != expectedPingCookie) {
1116 throw new ProtocolException(
1117 "unexpected ping cookie: " + cookie);
1118 } else {
1119 expectedPingCookie = -1;
1120
1121 }
1122 }
1123 }
1124
1125 private void handleError(String message) throws ProtocolException {
1126 if (logger.isLoggable(Level.FINEST)) {
1127 logger.log(Level.FINEST, "Error");
1128 }
1129
1130 setDown((role == CLIENT ? "server" : "client") +
1131 " reported protocol error: " + message, null);
1132 throw new ProtocolException("received Error message");
1133 }
1134
1135 private void handleIncrementRation(int sessionID, int increment)
1136 throws ProtocolException
1137 {
1138 if (logger.isLoggable(Level.FINEST)) {
1139 logger.log(Level.FINEST,
1140 "IncrementRation: sessionID=" + sessionID +
1141 ",increment=" + increment);
1142 }
1143
1144 getSession(sessionID).handleIncrementRation(increment);
1145 }
1146
1147 private void handleAbort(int sessionID, boolean partial, String message)
1148 throws ProtocolException
1149 {
1150 if (logger.isLoggable(Level.FINEST)) {
1151 logger.log(Level.FINEST,
1152 "Abort: sessionID=" + sessionID +
1153 ",partial=" + partial);
1154 }
1155
1156 getSession(sessionID).handleAbort(partial);
1157 }
1158
1159 private void handleClose(int sessionID) throws ProtocolException {
1160 if (logger.isLoggable(Level.FINEST)) {
1161 logger.log(Level.FINEST, "Close: sessionID=" + sessionID);
1162 }
1163
1164 getSession(sessionID).handleClose();
1165 }
1166
1167 private void handleAcknowledgment(int sessionID) throws ProtocolException {
1168 if (logger.isLoggable(Level.FINEST)) {
1169 logger.log(Level.FINEST, "Acknowledgment: sessionID=" + sessionID);
1170 }
1171
1172 getSession(sessionID).handleAcknowledgment();
1173 }
1174
1175 private void handleData(int sessionID, boolean open, boolean close, boolean eof, boolean ackRequired, ByteBuffer data)
1176 throws ProtocolException
1177 {
1178 if (logger.isLoggable(Level.FINEST)) {
1179 int length = data.remaining();
1180 HexDumpEncoder encoder = new HexDumpEncoder();
1181 byte[] bytes = new byte[data.remaining()];
1182 data.mark();
1183 data.get(bytes);
1184 data.reset();
1185 logger.log(Level.FINEST,
1186 "Data: sessionID={0}{1}{2}{3}{4},length={5}{6}",
1187 new Object[]{sessionID,
1188 open ? ",open" : "",
1189 close ? ",close" : "",
1190 eof ? ",eof" : "",
1191 ackRequired ? ",ackRequired" : "",
1192 length,
1193 length > 0 ? ",data=\n" + encoder.encode(bytes) : ""});
1194 }
1195
1196 if (!eof && (close || ackRequired)) {
1197 throw new ProtocolException("Data: eof=" + eof +
1198 ",close=" + close +
1199 ",ackRequired=" + ackRequired);
1200 }
1201
1202 if (open) {
1203 handleOpen(sessionID);
1204 }
1205
1206 getSession(sessionID).handleData(data, eof, close, ackRequired);
1207 }
1208
1209 private Session getSession(int sessionID) throws ProtocolException {
1210 synchronized (muxLock) {
1211 if (!busySessions.get(sessionID)) {
1212 throw new ProtocolException(
1213 "inactive sessionID: " + sessionID);
1214 }
1215 return sessions[sessionID];
1216 }
1217 }
1218
1219 private static ByteBuffer getUTF8BufferFromString(String s) {
1220 CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
1221 try {
1222 return encoder.encode(CharBuffer.wrap(s));
1223 } catch (CharacterCodingException e) {
1224 return null;
1225 }
1226 }
1227
1228 private static String getStringFromUTF8Buffer(ByteBuffer buffer) {
1229 CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
1230 try {
1231 return decoder.decode(buffer).toString();
1232 } catch (CharacterCodingException e) {
1233 return "(error decoding UTF-8 message: " + e.toString() + ")";
1234 }
1235 }
1236
1237
1238
1239
1240
1241
1242
1243
1244 private static String toHexString(int x) {
1245 char[] buf = new char[8];
1246 for (int i = 0; i < 8; i++) {
1247 buf[i] = toHexChar((x >> ((7 - i) * 4)) & 0xF);
1248 }
1249 return new String(buf);
1250 }
1251
1252 private static String toHexString(byte[] b) {
1253 char[] buf = new char[b.length * 2];
1254 int j = 0;
1255 for (int i = 0; i < b.length; i++) {
1256 buf[j++] = toHexChar((b[i] >> 4) & 0xF);
1257 buf[j++] = toHexChar(b[i] & 0xF);
1258 }
1259 return new String(buf);
1260 }
1261
1262 private static char toHexChar(int x) {
1263 return x < 10 ? (char) ('0' + x) : (char) ('A' - 10 + x);
1264 }
1265 }