1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.river.jeri.internal.mux;
20
21 import org.apache.river.thread.Executor;
22 import org.apache.river.thread.GetThreadPoolAction;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.nio.ByteBuffer;
27 import java.security.AccessController;
28 import java.security.PrivilegedAction;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.logging.Level;
32 import java.util.logging.Logger;
33 import net.jini.core.constraint.InvocationConstraints;
34 import net.jini.io.UnsupportedConstraintException;
35 import net.jini.io.context.AcknowledgmentSource;
36 import net.jini.jeri.InboundRequest;
37 import net.jini.jeri.OutboundRequest;
38
39
40
41
42
43
44
45
46 final class Session {
47
48 static final int CLIENT = 0;
49 static final int SERVER = 1;
50
51 static final int IDLE = 0;
52 static final int OPEN = 1;
53 static final int FINISHED = 2;
54 static final int TERMINATED = 3;
55 private static final String[] stateNames = {
56 "idle", "open", "finished", "terminated"
57 };
58
59
60
61
62
63
64
65
66
67 static boolean traceSupression(){
68 try {
69 return AccessController.doPrivileged(
70 new PrivilegedAction<Boolean>()
71 {
72 @Override
73 public Boolean run() {
74 return Boolean.getBoolean("org.apache.river.jeri.server.suppressStackTraces");
75 }
76 }
77 );
78 } catch (SecurityException e) {
79 return true;
80 }
81 }
82
83
84
85
86
87 private static final Executor systemThreadPool =
88 (Executor) AccessController.doPrivileged(
89 new GetThreadPoolAction(false));
90
91
92 static final Logger logger =
93 Logger.getLogger("net.jini.jeri.connection.mux");
94
95 private final Mux mux;
96 final int sessionID;
97 final int role;
98
99 private final MuxOutputStream out;
100 private final MuxInputStream in;
101
102
103 private final Object sessionLock;
104 private boolean sessionDown;
105
106 private int outState;
107 private int outRation;
108 final boolean outRationInfinite;
109 private boolean partialDeliveryStatus;
110
111 private int inState;
112 private int inRation;
113 final boolean inRationInfinite;
114
115 private boolean removeLater;
116
117 private boolean receivedAckRequired;
118
119 private final Collection<AcknowledgmentSource.Listener> ackListeners;
120 private boolean sentAckRequired;
121 private boolean receivedAcknowledgment;
122
123
124
125
126 Session(Mux mux, int sessionID, int role) {
127 this.receivedAcknowledgment = false;
128 this.sentAckRequired = false;
129 this.receivedAckRequired = false;
130 this.removeLater = false;
131 this.partialDeliveryStatus = false;
132 this.sessionDown = false;
133 this.ackListeners = new ArrayList<AcknowledgmentSource.Listener>(3);
134 this.sessionLock = new Object();
135 this.mux = mux;
136 this.sessionID = sessionID;
137 this.role = role;
138
139 outState = (role == CLIENT ? IDLE : OPEN);
140 outRation = mux.initialOutboundRation;
141 outRationInfinite = (outRation == 0);
142
143 inState = (role == CLIENT ? IDLE : OPEN);
144 inRation = mux.initialInboundRation;
145 inRationInfinite = (inRation == 0);
146 out = new MuxOutputStream(mux, this, sessionLock);
147 in = new MuxInputStream(mux, this, sessionLock);
148 }
149
150
151
152
153 OutboundRequest getOutboundRequest() {
154 assert role == CLIENT;
155 return new OutboundRequest() {
156 @Override
157 public void populateContext(Collection context) {
158 ((MuxClient) mux).populateContext(context);
159 }
160 @Override
161 public InvocationConstraints getUnfulfilledConstraints() {
162
163
164
165
166
167 throw new AssertionError();
168 }
169 @Override
170 public OutputStream getRequestOutputStream() { return out; }
171 @Override
172 public InputStream getResponseInputStream() { return in; }
173 @Override
174 public boolean getDeliveryStatus() {
175 synchronized (sessionLock) {
176 return partialDeliveryStatus;
177 }
178 }
179 @Override
180 public void abort() { Session.this.abort(); }
181 };
182 }
183
184
185
186
187 InboundRequest getInboundRequest() {
188 assert role == SERVER;
189 return new InboundRequest() {
190 @Override
191 public void checkPermissions() {
192 ((MuxServer) mux).checkPermissions();
193 }
194 @Override
195 public InvocationConstraints
196 checkConstraints(InvocationConstraints constraints)
197 throws UnsupportedConstraintException
198 {
199 return ((MuxServer) mux).checkConstraints(constraints);
200 }
201 @Override
202 public void populateContext(Collection context) {
203 context.add(new AcknowledgmentSource() {
204 @Override
205 public boolean addAcknowledgmentListener(
206 AcknowledgmentSource.Listener listener)
207 {
208 if (listener == null) {
209 throw new NullPointerException();
210 }
211 synchronized (sessionLock) {
212 if (getOutState() < FINISHED) {
213 ackListeners.add(listener);
214 return true;
215 } else {
216 return false;
217 }
218 }
219 }
220 });
221 ((MuxServer) mux).populateContext(context);
222 }
223 @Override
224 public InputStream getRequestInputStream() { return in; }
225 @Override
226 public OutputStream getResponseOutputStream() { return out; }
227 @Override
228 public void abort() { Session.this.abort(); }
229 };
230 }
231
232
233
234
235 void abort() {
236 synchronized (sessionLock) {
237 if (!sessionDown) {
238 if (logger.isLoggable(Level.FINEST)) {
239 logger.log(Level.FINEST,
240 "outState={0},inState={1},role={2}",
241 new Object[]{stateNames[getOutState()],
242 stateNames[inState],
243 role == CLIENT ? "CLIENT" : "SERVER"});
244 }
245
246 if (getOutState() == IDLE) {
247 mux.removeSession(sessionID);
248 } else if (getOutState() < TERMINATED) {
249 if (role == SERVER && getOutState() == FINISHED) {
250
251
252
253
254
255
256
257 mux.asyncSendClose(sessionID);
258 } else {
259 mux.asyncSendAbort(Mux.Abort | (role == SERVER ?
260 Mux.Abort_partial : 0),
261 sessionID, null);
262 }
263 setOutState(TERMINATED);
264 }
265
266 setDown("request aborted", null);
267 }
268
269
270
271
272 out.abort();
273
274
275
276
277
278
279
280
281
282 if (removeLater) {
283 if (getOutState() < TERMINATED) {
284 setOutState(TERMINATED);
285 }
286 mux.removeSession(sessionID);
287 removeLater = false;
288 }
289 }
290 }
291
292
293
294
295 void setDown(String message, Throwable cause) {
296 synchronized (sessionLock) {
297 if (!sessionDown) {
298 sessionDown = true;
299 IOException ex = new IOException(message, cause);
300 out.down(ex);
301 in.down(ex);
302 sessionLock.notifyAll();
303 }
304 }
305 }
306
307
308
309
310 void handleIncrementRation(int increment) throws ProtocolException {
311 synchronized (sessionLock) {
312 if (inState == IDLE || inState == TERMINATED) {
313 throw new ProtocolException("IncrementRation on " +
314 stateNames[inState] + " session: " + sessionID);
315 }
316 if (!outRationInfinite) {
317 if (outRation + increment < outRation) {
318 throw new ProtocolException("ration overflow");
319 }
320 if (getOutState() == OPEN) {
321 if (increment > 0) {
322 if (outRation == 0) {
323 sessionLock.notifyAll();
324 }
325 outRation += increment;
326 }
327 }
328 }
329 }
330 }
331
332
333
334
335 void handleAbort(boolean partial) throws ProtocolException {
336 synchronized (sessionLock) {
337 if (inState == IDLE || inState == TERMINATED) {
338 throw new ProtocolException("Abort on " +
339 stateNames[inState] + " session: " + sessionID);
340 }
341
342 setInState(TERMINATED);
343 partialDeliveryStatus = partial;
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359 if (getOutState() < TERMINATED) {
360 mux.asyncSendAbort(Mux.Abort | (role == SERVER ?
361 Mux.Abort_partial : 0),
362 sessionID, null);
363 setOutState(TERMINATED);
364 }
365
366 setDown("request aborted by remote endpoint", null);
367
368 if (sentAckRequired && !receivedAcknowledgment) {
369 notifyAcknowledgmentListeners(false);
370 }
371
372 mux.removeSession(sessionID);
373 }
374 }
375
376
377
378
379 void handleClose() throws ProtocolException {
380 if (role != CLIENT) {
381 throw new ProtocolException("Close sent by client");
382 }
383
384 synchronized (sessionLock) {
385 if (inState != FINISHED) {
386 throw new ProtocolException("Close on " +
387 stateNames[inState] + " session: " + sessionID);
388 }
389 if (getOutState() < FINISHED) {
390
391
392
393
394
395
396
397
398
399
400
401 out.handleClose();
402 mux.asyncSendAbort(Mux.Abort, sessionID, null);
403 setOutState(TERMINATED);
404
405
406
407
408
409
410
411
412
413
414 }
415
416 setInState(TERMINATED);
417
418 setDown("request closed by server", null);
419
420
421
422
423
424
425
426 if (getOutState() == TERMINATED ||
427 !receivedAckRequired || in.isSentAcknowledgment())
428 {
429 mux.removeSession(sessionID);
430 } else {
431 removeLater = true;
432 }
433 }
434 }
435
436
437
438
439 void handleAcknowledgment() throws ProtocolException {
440 if (role != SERVER) {
441 throw new ProtocolException("Acknowledgment sent by server");
442 }
443
444 synchronized (sessionLock) {
445 if (inState == IDLE || inState == TERMINATED) {
446 throw new ProtocolException("Acknowledgment on " +
447 stateNames[inState] + " session: " + sessionID);
448 }
449 if (getOutState() < FINISHED) {
450 throw new ProtocolException(
451 "acknowledgment received before EOF sent");
452 }
453 if (!sentAckRequired) {
454 throw new ProtocolException("acknowledgment not requested");
455 }
456 if (receivedAcknowledgment) {
457 throw new ProtocolException("duplicate acknowledgment");
458 }
459 receivedAcknowledgment = true;
460
461 notifyAcknowledgmentListeners(true);
462 }
463 }
464
465
466
467
468 void handleData(ByteBuffer data,
469 boolean eof, boolean close, boolean ackRequired)
470 throws ProtocolException
471 {
472 assert eof || (!close && !ackRequired);
473
474 if (ackRequired && role != CLIENT) {
475 throw new ProtocolException("Data/ackRequired sent by client");
476 }
477
478 synchronized (sessionLock) {
479 boolean notified = close;
480
481 if (inState != OPEN) {
482 throw new ProtocolException("Data on " +
483 stateNames[inState] + " session: " + sessionID);
484 }
485 int length = data.remaining();
486 if (!inRationInfinite && length > inRation) {
487 throw new ProtocolException("input ration exceeded");
488 }
489 if (!in.isClosed() && getOutState() < TERMINATED) {
490 if (length > 0) {
491 if (in.getBufRemaining() == 0) {
492 sessionLock.notifyAll();
493 notified = true;
494 }
495 in.appendToBufQueue(data);
496 in.setBufRemaining(in.getBufRemaining() + length);
497 if (!inRationInfinite) {
498 inRation -= length;
499 }
500 }
501 }
502
503 if (eof) {
504 in.setEOF(true);
505 setInState(FINISHED);
506 if (!notified) {
507 sessionLock.notifyAll();
508 }
509
510 if (ackRequired) {
511 receivedAckRequired = true;
512
513 }
514
515 if (close) {
516 handleClose();
517 }
518
519 }
520 }
521 }
522
523
524
525
526 void handleOpen() throws ProtocolException {
527 assert role == SERVER;
528 synchronized (sessionLock) {
529 if (inState < FINISHED || getOutState() < TERMINATED) {
530 throw new ProtocolException(
531 inState < FINISHED ?
532 ("Data/open on " +
533 stateNames[inState] + " session: " + sessionID) :
534 ("Data/open before previous session terminated"));
535 }
536
537 setInState(TERMINATED);
538
539
540 setDown("old request", null);
541 sessionLock.notifyAll();
542
543 mux.removeSession(sessionID);
544 }
545 }
546
547
548
549
550 void setOutState(int newState) {
551 assert Thread.holdsLock(sessionLock);
552 assert newState > outState;
553 outState = newState;
554 }
555
556
557
558
559 void setInState(int newState) {
560 assert Thread.holdsLock(sessionLock);
561 assert newState > inState;
562 inState = newState;
563 }
564
565 boolean ackListeners(){
566 assert Thread.holdsLock(sessionLock);
567 return !ackListeners.isEmpty();
568 }
569
570 private void notifyAcknowledgmentListeners(final boolean received) {
571 if (!ackListeners.isEmpty()) {
572 systemThreadPool.execute(
573 new NotifyAcknowledgementListeners(ackListeners, received),
574 "Mux ack notifier");
575 }
576 }
577
578 private static class NotifyAcknowledgementListeners implements Runnable {
579 final Collection<AcknowledgmentSource.Listener> ackListeners;
580 final boolean received;
581
582 NotifyAcknowledgementListeners(
583 Collection<AcknowledgmentSource.Listener> ackListeners,
584 boolean received)
585 {
586 this.ackListeners = new ArrayList<AcknowledgmentSource.Listener>(ackListeners);
587 this.received = received;
588 }
589
590 @Override
591 public void run() {
592 for (AcknowledgmentSource.Listener listener : ackListeners) {
593 listener.acknowledgmentReceived(received);
594 }
595 }
596 }
597
598
599
600
601 int getOutState() {
602 assert Thread.holdsLock(sessionLock);
603 return outState;
604 }
605
606
607
608
609 int getOutRation() {
610 assert Thread.holdsLock(sessionLock);
611 return outRation;
612 }
613
614
615
616
617 void setOutRation(int outRation) {
618 assert Thread.holdsLock(sessionLock);
619 this.outRation = outRation;
620 }
621
622
623
624
625 int getInState() {
626 assert Thread.holdsLock(sessionLock);
627 return inState;
628 }
629
630
631
632
633 void setPartialDeliveryStatus(boolean partialDeliveryStatus) {
634 assert Thread.holdsLock(sessionLock);
635 this.partialDeliveryStatus = partialDeliveryStatus;
636 }
637
638
639
640
641 boolean isSentAckRequired() {
642 assert Thread.holdsLock(sessionLock);
643 return sentAckRequired;
644 }
645
646
647
648
649 void setSentAckRequired(boolean sentAckRequired) {
650 assert Thread.holdsLock(sessionLock);
651 this.sentAckRequired = sentAckRequired;
652 }
653
654
655
656
657 int getInRation() {
658 assert Thread.holdsLock(sessionLock);
659 return inRation;
660 }
661
662
663
664
665 void setInRation(int inRation) {
666 assert Thread.holdsLock(sessionLock);
667 this.inRation = inRation;
668 }
669
670
671
672
673 boolean isRemoveLater() {
674 assert Thread.holdsLock(sessionLock);
675 return removeLater;
676 }
677
678
679
680
681 void setRemoveLater(boolean removeLater) {
682 assert Thread.holdsLock(sessionLock);
683 this.removeLater = removeLater;
684 }
685
686
687
688
689 boolean isReceivedAckRequired() {
690 assert Thread.holdsLock(sessionLock);
691 return receivedAckRequired;
692 }
693 }