View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.river.jeri.internal.mux;
20  
21  import org.apache.river.thread.Executor;
22  import org.apache.river.thread.GetThreadPoolAction;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.nio.ByteBuffer;
27  import java.security.AccessController;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.logging.Level;
32  import java.util.logging.Logger;
33  import net.jini.core.constraint.InvocationConstraints;
34  import net.jini.io.UnsupportedConstraintException;
35  import net.jini.io.context.AcknowledgmentSource;
36  import net.jini.jeri.InboundRequest;
37  import net.jini.jeri.OutboundRequest;
38  
39  /**
40   * A Session represents a single session of a multiplexed connection,
41   * for either client-side and server-side perspective.  The particular
42   * role (CLIENT or SERVER) is indicated at construction time.
43   *
44   * @author Sun Microsystems, Inc.
45   **/
46  final class Session {
47  
48      static final int CLIENT = 0;
49      static final int SERVER = 1;
50  
51      static final int IDLE	= 0;
52      static final int OPEN	= 1;
53      static final int FINISHED	= 2;
54      static final int TERMINATED	= 3;
55      private static final String[] stateNames = {
56  	"idle", "open", "finished", "terminated"
57      };
58     
59      /** 
60       * This method prevents a SecurityException from being thrown for
61       * a client proxy that doesn't have permission to read the property.
62       * When this is the case, the secure trace supression option
63       * is chosen.
64       * This is not optimised, because exception conditions
65       * are exceptional.
66       */
67      static boolean traceSupression(){
68          try {
69              return AccessController.doPrivileged(
70                  new PrivilegedAction<Boolean>() 
71                  {
72                      @Override
73                      public Boolean run() {
74                          return Boolean.getBoolean("org.apache.river.jeri.server.suppressStackTraces");
75                      }
76                  }
77              );
78          } catch (SecurityException e) {
79              return true;
80          }
81      }
82  
83      /**
84       * pool of threads for executing tasks in system thread group: used for
85       * I/O (reader and writer) threads and other asynchronous tasks
86       **/
87      private static final Executor systemThreadPool =
88  	(Executor) AccessController.doPrivileged(
89  	    new GetThreadPoolAction(false));
90  
91      /** mux logger */
92      static final Logger logger =
93  	Logger.getLogger("net.jini.jeri.connection.mux");
94  
95      private final Mux mux;
96      final int sessionID;
97      final int role;
98  
99      private final MuxOutputStream out;
100     private final MuxInputStream in;
101 
102     /** lock guarding all mutable instance state (below) */
103     private final Object sessionLock;
104     private boolean sessionDown;
105 
106     private int outState;
107     private int outRation;
108     final boolean outRationInfinite;
109     private boolean partialDeliveryStatus;
110 
111     private int inState;
112     private int inRation;
113     final boolean inRationInfinite;
114 		
115     private boolean removeLater;		// REMIND
116 
117     private boolean receivedAckRequired;
118 
119     private final Collection<AcknowledgmentSource.Listener> ackListeners;
120     private boolean sentAckRequired;
121     private boolean receivedAcknowledgment;
122 
123     /**
124      *
125      */
126     Session(Mux mux, int sessionID, int role) {
127         this.receivedAcknowledgment = false;
128         this.sentAckRequired = false;
129         this.receivedAckRequired = false;
130         this.removeLater = false;
131         this.partialDeliveryStatus = false;
132         this.sessionDown = false;
133         this.ackListeners = new ArrayList<AcknowledgmentSource.Listener>(3);
134         this.sessionLock = new Object();
135 	this.mux = mux;
136 	this.sessionID = sessionID;
137 	this.role = role;
138         
139 	outState = (role == CLIENT ? IDLE : OPEN);
140 	outRation = mux.initialOutboundRation;
141 	outRationInfinite = (outRation == 0);
142 
143 	inState = (role == CLIENT ? IDLE : OPEN);
144 	inRation = mux.initialInboundRation;
145 	inRationInfinite = (inRation == 0);
146         out = new MuxOutputStream(mux, this, sessionLock);
147 	in = new MuxInputStream(mux, this, sessionLock);
148     }
149 
150     /**
151      *
152      */
153     OutboundRequest getOutboundRequest() {
154 	assert role == CLIENT;
155 	return new OutboundRequest() {
156             @Override
157 	    public void populateContext(Collection context) {
158 		((MuxClient) mux).populateContext(context);
159 	    }
160             @Override
161 	    public InvocationConstraints getUnfulfilledConstraints() {
162 		/*
163 		 * NYI: We currently have no request-specific hook
164 		 * back to the transport implementation, so we must
165 		 * depend on OutboundRequest wrapping for this method.
166 		 */
167 		throw new AssertionError();
168 	    }
169             @Override
170 	    public OutputStream getRequestOutputStream() { return out; }
171             @Override
172 	    public InputStream getResponseInputStream() { return in; }
173             @Override
174 	    public boolean getDeliveryStatus() {
175 		synchronized (sessionLock) {
176 		    return partialDeliveryStatus;
177 		}
178 	    }
179             @Override
180 	    public void abort() { Session.this.abort(); }
181 	};
182     }
183 
184     /**
185      *
186      */
187     InboundRequest getInboundRequest() {
188 	assert role == SERVER;
189 	return new InboundRequest() {
190             @Override
191 	    public void checkPermissions() {
192 		((MuxServer) mux).checkPermissions();
193 	    }
194             @Override
195 	    public InvocationConstraints
196 		checkConstraints(InvocationConstraints constraints)
197 		throws UnsupportedConstraintException
198 	    {
199 		return ((MuxServer) mux).checkConstraints(constraints);
200 	    }
201             @Override
202 	    public void populateContext(Collection context) {
203 		context.add(new AcknowledgmentSource() {
204                     @Override
205 		    public boolean addAcknowledgmentListener(
206 			AcknowledgmentSource.Listener listener)
207 		    {
208 			if (listener == null) {
209 			    throw new NullPointerException();
210 			}
211 			synchronized (sessionLock) {
212 			    if (getOutState() < FINISHED) {
213 				ackListeners.add(listener);
214 				return true;
215 			    } else {
216 				return false;
217 			    }
218 			}
219 		    }
220 		});
221 		((MuxServer) mux).populateContext(context);
222 	    }
223             @Override
224 	    public InputStream getRequestInputStream() { return in; }
225             @Override
226 	    public OutputStream getResponseOutputStream() { return out; }
227             @Override
228 	    public void abort() { Session.this.abort(); }
229 	};
230     }
231 
232     /**
233      *
234      */
235     void abort() {
236 	synchronized (sessionLock) {
237 	    if (!sessionDown) {
238 		if (logger.isLoggable(Level.FINEST)) {
239 		    logger.log(Level.FINEST,
240                             "outState={0},inState={1},role={2}",
241                             new Object[]{stateNames[getOutState()],
242                                 stateNames[inState],
243                                 role == CLIENT ? "CLIENT" : "SERVER"});
244 		}
245 
246 		if (getOutState() == IDLE) {
247 		    mux.removeSession(sessionID);
248 		} else if (getOutState() < TERMINATED) {
249 		    if (role == SERVER && getOutState() == FINISHED) {
250 			/*
251 			 * In this case, send Close rather than Abort, so that
252 			 * a client that still hasn't finished writing will not
253 			 * get an unnecessary failure and will be able to read
254 			 * the complete response as intended (still permitting
255 			 * server-side defensive abort() invocation).
256 			 */
257 			mux.asyncSendClose(sessionID);
258 		    } else {
259 			mux.asyncSendAbort(Mux.Abort | (role == SERVER ?
260 							Mux.Abort_partial : 0),
261 					   sessionID, null);
262 		    }
263 		    setOutState(TERMINATED);
264 		}
265 
266 		setDown("request aborted", null);
267 	    }
268 	    /*
269 	     * After the application has invoked abort() on the request, we
270 	     * must no longer try to "fake" an OK session.
271 	     */
272             out.abort();
273 
274 	    /*
275 	     * If removing this session from the connection's table
276 	     * was delayed in order to be able to send an
277 	     * Acknowledgment message, then we remove it on local
278 	     * abort in order to clean up resources.  Also make sure
279 	     * that our state is considered terminated so that no
280 	     * future Acknowledgment message will be sent.
281 	     */
282 	    if (removeLater) {
283 		if (getOutState() < TERMINATED) {
284 		    setOutState(TERMINATED);
285 		}
286 		mux.removeSession(sessionID);
287 		removeLater = false;
288 	    }
289 	}
290     }
291 
292     /**
293      *
294      */
295     void setDown(String message, Throwable cause) {
296 	synchronized (sessionLock) {
297 	    if (!sessionDown) {
298 		sessionDown = true;
299                 IOException ex = new IOException(message, cause);
300                 out.down(ex);
301                 in.down(ex);
302 		sessionLock.notifyAll();
303 	    }
304 	}
305     }
306 
307     /**
308      *
309      */
310     void handleIncrementRation(int increment) throws ProtocolException {
311 	synchronized (sessionLock) {
312 	    if (inState == IDLE || inState == TERMINATED) {
313 		throw new ProtocolException("IncrementRation on " +
314 		    stateNames[inState] + " session: " + sessionID);
315 	    }
316 	    if (!outRationInfinite) {
317 		if (outRation + increment < outRation) {
318 		    throw new ProtocolException("ration overflow");
319 		}
320 		if (getOutState() == OPEN) {
321 		    if (increment > 0) {
322 			if (outRation == 0) {
323 			    sessionLock.notifyAll();
324 			}
325 			outRation += increment;
326 		    }
327 		}
328 	    } // ignore message if outbound ration is infinite
329 	}
330     }
331 
332     /**
333      *
334      */
335     void handleAbort(boolean partial) throws ProtocolException {
336 	synchronized (sessionLock) {
337 	    if (inState == IDLE || inState == TERMINATED) {
338 		throw new ProtocolException("Abort on " +
339 		    stateNames[inState] + " session: " + sessionID);
340 	    }
341 
342 	    setInState(TERMINATED);
343 	    partialDeliveryStatus = partial;
344 
345 	    /*
346 	     * Respond with an abort of this side of the session, if it's
347 	     * still open.
348 	     */
349 	    /*
350 	     * REMIND: Technically, the client should not have to send
351 	     * an Abort message here if it is already in the finished
352 	     * state, although the spec would seem to suggest that it
353 	     * should do so regardless.  A particular reason that we
354 	     * send it here in that case, though, is that it should be
355 	     * a cheap way to avoid 4827402 for that case-- to ensure
356 	     * that no late Acknowledgment message gets sent after the
357 	     * session has been removed.
358 	     */
359 	    if (getOutState() < TERMINATED) {
360 		mux.asyncSendAbort(Mux.Abort | (role == SERVER ?
361 						Mux.Abort_partial : 0),
362 				   sessionID, null);
363 		setOutState(TERMINATED);
364 	    }
365 
366 	    setDown("request aborted by remote endpoint", null);
367 
368 	    if (sentAckRequired && !receivedAcknowledgment) {
369 		notifyAcknowledgmentListeners(false);
370 	    }	// REMIND: what about other dangling acknowledgments?
371 
372 	    mux.removeSession(sessionID);
373 	}
374     }
375 
376     /**
377      *
378      */
379     void handleClose() throws ProtocolException {
380 	if (role != CLIENT) {
381 	    throw new ProtocolException("Close sent by client");
382 	}
383 
384 	synchronized (sessionLock) {
385 	    if (inState != FINISHED) {
386 		throw new ProtocolException("Close on " +
387 		    stateNames[inState] + " session: " + sessionID);
388 	    }
389 	    if (getOutState() < FINISHED) {
390 		/*
391 		 * From a protocol perspective, we need to terminate the
392 		 * session at this point (because we're not finished, but
393 		 * we don't want to hold on to it unnecessarily).  But we
394 		 * also don't want the session to appear failed while the
395 		 * client is still writing-- instead, we want the client
396 		 * to be able to successfully read the complete response
397 		 * that was received-- so this flag is set to
398 		 * (temporarily) fake that the session is still in OK
399 		 * shape (but not send any more data for it).
400 		 */
401                 out.handleClose();
402 		mux.asyncSendAbort(Mux.Abort, sessionID, null);
403 		setOutState(TERMINATED);
404 		/*
405 		 * REMIND: This approach causes a premature negative
406 		 * acknowledgment to the server.  It seems that
407 		 * ideally, if receivedAckRequired is true, we should
408 		 * delay sending the Abort message until the response
409 		 * input stream is closed and an Acknowledgment has
410 		 * been sent-- although that would be somewhat at odds
411 		 * with the "timely fashion" prescription of the Close
412 		 * message specification.
413 		 */
414 	    }
415 
416 	    setInState(TERMINATED);
417 
418 	    setDown("request closed by server", null);
419 
420 	    /*
421 	     * If we still (might) need to send an Acknowledgment,
422 	     * then we must delay removing this session from the
423 	     * connection's table now, to prevent the sessionID being
424 	     * reused before the Acknowledgment message is sent.
425 	     */
426 	    if (getOutState() == TERMINATED ||
427 		!receivedAckRequired || in.isSentAcknowledgment())
428 	    {
429 		mux.removeSession(sessionID);
430 	    } else {
431 		removeLater = true;
432 	    }
433 	}
434     }
435 
436     /**
437      *
438      */
439     void handleAcknowledgment() throws ProtocolException {
440 	if (role != SERVER) {
441 	    throw new ProtocolException("Acknowledgment sent by server");
442 	}
443 
444 	synchronized (sessionLock) {
445 	    if (inState == IDLE || inState == TERMINATED) {
446 		throw new ProtocolException("Acknowledgment on " +
447 		    stateNames[inState] + " session: " + sessionID);
448 	    }
449 	    if (getOutState() < FINISHED) {
450 		throw new ProtocolException(
451 		    "acknowledgment received before EOF sent");
452 	    }
453 	    if (!sentAckRequired) {
454 		throw new ProtocolException("acknowledgment not requested");
455 	    }
456 	    if (receivedAcknowledgment) {
457 		throw new ProtocolException("duplicate acknowledgment");
458 	    }
459 	    receivedAcknowledgment = true;
460 
461 	    notifyAcknowledgmentListeners(true);
462 	}
463     }
464 
465     /**
466      *
467      */
468     void handleData(ByteBuffer data,
469 		    boolean eof, boolean close, boolean ackRequired)
470 	throws ProtocolException
471     {
472 	assert eof || (!close && !ackRequired);
473 
474 	if (ackRequired && role != CLIENT) {
475 	    throw new ProtocolException("Data/ackRequired sent by client");
476 	}
477 
478 	synchronized (sessionLock) {
479 	    boolean notified = close;	// close always causes notification
480 
481 	    if (inState != OPEN) {
482 		throw new ProtocolException("Data on " +
483 		    stateNames[inState] + " session: " + sessionID);
484 	    }
485 	    int length = data.remaining();
486 	    if (!inRationInfinite && length > inRation) {
487 		throw new ProtocolException("input ration exceeded");
488 	    }
489 	    if (!in.isClosed() && getOutState() < TERMINATED) {
490 		if (length > 0) {
491 		    if (in.getBufRemaining() == 0) {
492 			sessionLock.notifyAll();
493 			notified = true;
494 		    }
495 		    in.appendToBufQueue(data);
496 		    in.setBufRemaining(in.getBufRemaining() + length);
497 		    if (!inRationInfinite) {
498 			inRation -= length;
499 		    }
500 		}
501 	    }
502 
503 	    if (eof) {
504 		in.setEOF(true);
505 		setInState(FINISHED);
506 		if (!notified) {
507 		    sessionLock.notifyAll();
508 		}
509 
510 		if (ackRequired) {
511 		    receivedAckRequired = true;
512 		    // send acknowledgment if input stream already closed?
513 		}
514 
515 		if (close) {
516 		    handleClose();
517 		}
518 		// REMIND: send Close if appropriate?
519 	    }
520 	}
521     }
522 
523     /**
524      *
525      */
526     void handleOpen() throws ProtocolException {
527 	assert role == SERVER;
528 	synchronized (sessionLock) {
529 	    if (inState < FINISHED || getOutState() < TERMINATED) {
530 		throw new ProtocolException(
531                     inState < FINISHED ?
532 		    ("Data/open on " +
533 		     stateNames[inState] + " session: " + sessionID) :
534 		    ("Data/open before previous session terminated"));
535 	    }
536 
537 	    setInState(TERMINATED);
538 	    // REMIND: process dangling acknowledgments here?
539 
540 	    setDown("old request", null);	// extraneous?
541 	    sessionLock.notifyAll();
542 
543 	    mux.removeSession(sessionID);
544 	}
545     }
546 
547     /**
548      *
549      */
550     void setOutState(int newState) {
551         assert Thread.holdsLock(sessionLock);
552 	assert newState > outState;
553 	outState = newState;
554     }
555 
556     /**
557      *
558      */
559     void setInState(int newState) {
560         assert Thread.holdsLock(sessionLock);
561 	assert newState > inState;
562 	inState = newState;
563     }
564     
565     boolean ackListeners(){
566         assert Thread.holdsLock(sessionLock);
567         return !ackListeners.isEmpty();
568     }
569 
570     private void notifyAcknowledgmentListeners(final boolean received) {
571 	if (!ackListeners.isEmpty()) {
572 	    systemThreadPool.execute(
573                     new NotifyAcknowledgementListeners(ackListeners, received),
574                     "Mux ack notifier");
575 	}
576     }
577     
578     private static class NotifyAcknowledgementListeners implements Runnable {
579         final Collection<AcknowledgmentSource.Listener> ackListeners;
580         final boolean received;
581         
582         NotifyAcknowledgementListeners(
583                 Collection<AcknowledgmentSource.Listener> ackListeners,
584                 boolean received)
585         {
586             this.ackListeners = new ArrayList<AcknowledgmentSource.Listener>(ackListeners);
587             this.received = received;
588         }
589         
590         @Override
591         public void run() {
592             for (AcknowledgmentSource.Listener listener : ackListeners) {
593                 listener.acknowledgmentReceived(received);
594             }
595         }
596     }
597 
598     /**
599      * @return the outState
600      */
601     int getOutState() {
602         assert Thread.holdsLock(sessionLock);
603         return outState;
604     }
605 
606     /**
607      * @return the outRation
608      */
609     int getOutRation() {
610         assert Thread.holdsLock(sessionLock);
611         return outRation;
612     }
613 
614     /**
615      * @param outRation the outRation to set
616      */
617     void setOutRation(int outRation) {
618         assert Thread.holdsLock(sessionLock);
619         this.outRation = outRation;
620     }
621 
622     /**
623      * @return the inState
624      */
625     int getInState() {
626         assert Thread.holdsLock(sessionLock);
627         return inState;
628     }
629 
630     /**
631      * @param partialDeliveryStatus the partialDeliveryStatus to set
632      */
633     void setPartialDeliveryStatus(boolean partialDeliveryStatus) {
634         assert Thread.holdsLock(sessionLock);
635         this.partialDeliveryStatus = partialDeliveryStatus;
636     }
637 
638     /**
639      * @return the sentAckRequired
640      */
641     boolean isSentAckRequired() {
642         assert Thread.holdsLock(sessionLock);
643         return sentAckRequired;
644     }
645 
646     /**
647      * @param sentAckRequired the sentAckRequired to set
648      */
649     void setSentAckRequired(boolean sentAckRequired) {
650         assert Thread.holdsLock(sessionLock);
651         this.sentAckRequired = sentAckRequired;
652     }
653 
654     /**
655      * @return the inRation
656      */
657     int getInRation() {
658         assert Thread.holdsLock(sessionLock);
659         return inRation;
660     }
661 
662     /**
663      * @param inRation the inRation to set
664      */
665     void setInRation(int inRation) {
666         assert Thread.holdsLock(sessionLock);
667         this.inRation = inRation;
668     }
669 
670     /**
671      * @return the removeLater
672      */
673     boolean isRemoveLater() {
674         assert Thread.holdsLock(sessionLock);
675         return removeLater;
676     }
677 
678     /**
679      * @param removeLater the removeLater to set
680      */
681     void setRemoveLater(boolean removeLater) {
682         assert Thread.holdsLock(sessionLock);
683         this.removeLater = removeLater;
684     }
685 
686     /**
687      * @return the receivedAckRequired
688      */
689     boolean isReceivedAckRequired() {
690         assert Thread.holdsLock(sessionLock);
691         return receivedAckRequired;
692     }
693 }