View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.river.jeri.internal.mux;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.nio.ByteBuffer;
24  import java.util.Deque;
25  import java.util.LinkedList;
26  
27  /**
28   * Output stream returned by OutboundRequests and InboundRequests for
29   * a session of a multiplexed connection.
30   */
31  class MuxInputStream extends InputStream {
32      private final Object sessionLock;
33      private final Session session;
34      private final Mux mux;
35      private final Deque<ByteBuffer> inBufQueue;
36      private IOException sessionDown = null;
37      private int inBufRemaining = 0;
38      private int inBufPos = 0;
39      private boolean inEOF = false;
40      private boolean inClosed = false;
41      private boolean sentAcknowledgment = false;
42  
43      MuxInputStream(Mux mux, Session session, Object sessionLock) {
44          this.mux = mux;
45          this.session = session;
46          this.sessionLock = sessionLock;
47          this.inBufQueue = new LinkedList<ByteBuffer>();
48      }
49  
50      void down(IOException e) {
51          sessionDown = e;
52      }
53  
54      void appendToBufQueue(ByteBuffer data) {
55          inBufQueue.addLast(data);
56      }
57  
58      @Override
59      public int read() throws IOException {
60          synchronized (sessionLock) {
61              if (inClosed) {
62                  throw new IOException("stream closed");
63              }
64              while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
65                  if (session.getInState() == Session.IDLE) {
66                      assert session.getOutState() == Session.IDLE;
67                      mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
68                      session.setOutState(Session.OPEN);
69                      session.setInState(Session.OPEN);
70                  }
71                  if (!session.inRationInfinite && session.getInRation() == 0) {
72                      int inc = mux.initialInboundRation;
73                      mux.asyncSendIncrementRation(session.sessionID, inc);
74                      session.setInRation(session.getInRation() + inc);
75                  }
76                  try {
77                      sessionLock.wait(5000L); // REMIND: timeout?
78                  } catch (InterruptedException e) {
79                      String message = "request I/O interrupted";
80                      session.setDown(message, e);
81                      throw wrap(message, e);
82                  }
83              }
84              if (inClosed) {
85                  throw new IOException("stream closed");
86              }
87              if (inBufRemaining == 0) {
88                  if (inEOF) {
89                      return -1;
90                  } else {
91                      if (session.getInState() == Session.TERMINATED) {
92                          throw new IOException("request aborted by remote endpoint");
93                      }
94                      assert sessionDown != null;
95                      throw sessionDown;
96                  }
97              }
98              assert inBufQueue.size() > 0;
99              int result = -1;
100             while (result == -1) {
101                 ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
102                 if (inBufPos < buf.limit()) {
103                     result = (buf.get() & 0xFF);
104                     inBufPos++;
105                     inBufRemaining--;
106                 }
107                 if (inBufPos == buf.limit()) {
108                     inBufQueue.removeFirst();
109                     inBufPos = 0;
110                 }
111             }
112             if (!session.inRationInfinite) {
113                 checkInboundRation();
114             }
115             return result;
116         }
117     }
118 
119     private IOException wrap(String message, Exception e) {
120         Throwable t;
121         if (Session.traceSupression()) {
122             t = e;
123         } else {
124             t = e.fillInStackTrace();
125         }
126         return new IOException(message, t);
127     }
128 
129     @Override
130     public int read(byte[] b, int off, int len) throws IOException {
131         if (b == null) {
132             throw new NullPointerException();
133         } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
134             throw new IndexOutOfBoundsException();
135         }
136         synchronized (sessionLock) {
137             if (inClosed) {
138                 throw new IOException("stream closed");
139             } else if (len == 0) {
140                 /*
141                  * REMIND: What if
142                  *     - stream is at EOF?
143                  *     - session was aborted?
144                  */
145                 return 0;
146             }
147             while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
148                 if (session.getInState() == Session.IDLE) {
149                     assert session.getOutState() == Session.IDLE;
150                     mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
151                     session.setOutState(Session.OPEN);
152                     session.setInState(Session.OPEN);
153                 }
154                 if (!session.inRationInfinite && session.getInRation() == 0) {
155                     int inc = mux.initialInboundRation;
156                     mux.asyncSendIncrementRation(session.sessionID, inc);
157                     session.setInRation(session.getInRation() + inc);
158                 }
159                 try {
160                     sessionLock.wait(5000L); // REMIND: timeout?
161                 } catch (InterruptedException e) {
162                     String message = "request I/O interrupted";
163                     session.setDown(message, e);
164                     throw wrap(message, e);
165                 }
166             }
167             if (inClosed) {
168                 throw new IOException("stream closed");
169             }
170             if (inBufRemaining == 0) {
171                 if (inEOF) {
172                     return -1;
173                 } else {
174                     if (session.getInState() == Session.TERMINATED) {
175                         throw new IOException("request aborted by remote endpoint");
176                     }
177                     assert sessionDown != null;
178                     throw sessionDown;
179                 }
180             }
181             assert inBufQueue.size() > 0;
182             int remaining = len;
183             while (remaining > 0 && inBufRemaining > 0) {
184                 ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
185                 if (inBufPos < buf.limit()) {
186                     int toCopy = Math.min(buf.limit() - inBufPos, remaining);
187                     buf.get(b, off, toCopy);
188                     inBufPos += toCopy;
189                     inBufRemaining -= toCopy;
190                     off += toCopy;
191                     remaining -= toCopy;
192                 }
193                 if (inBufPos == buf.limit()) {
194                     inBufQueue.removeFirst();
195                     inBufPos = 0;
196                 }
197             }
198             if (!session.inRationInfinite) {
199                 checkInboundRation();
200             }
201             return len - remaining;
202         }
203     }
204 
205     /**
206      * Sends ration increment, if read drained buffers below
207      * a certain mark.
208      *
209      * This method must NOT be invoked if the inbound ration in
210      * infinite, and it must ONLY be invoked while synchronized on
211      * this session's lock.
212      *
213      * REMIND: The implementation of this action will be a
214      * significant area for performance tuning.
215      */
216     private void checkInboundRation() {
217         assert Thread.holdsLock(sessionLock);
218         assert !session.inRationInfinite;
219         if (session.getInState() >= Session.FINISHED) {
220             return;
221         }
222         int mark = mux.initialInboundRation / 2;
223         int used = inBufRemaining + session.getInRation();
224         if (used <= mark) {
225             int inc = mux.initialInboundRation - used;
226             mux.asyncSendIncrementRation(session.sessionID, inc);
227             session.setInRation(session.getInRation() + inc);
228         }
229     }
230 
231     @Override
232     public int available() throws IOException {
233         synchronized (sessionLock) {
234             if (inClosed) {
235                 throw new IOException("stream closed");
236             }
237             /*
238              * REMIND: What if
239              *     - stream is at EOF?
240              *     - session was aborted?
241              */
242             return inBufRemaining;
243         }
244     }
245 
246     @Override
247     public void close() {
248         synchronized (sessionLock) {
249             if (inClosed) {
250                 return;
251             }
252             inClosed = true;
253             inBufQueue.clear(); // allow GC of unread data
254             if (session.role == Session.CLIENT && !sentAcknowledgment && session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) {
255                 mux.asyncSendAcknowledgment(session.sessionID);
256                 sentAcknowledgment = true;
257                 /*
258                  * If removing this session from the connection's
259                  * table was delayed in order to be able to send
260                  * an Acknowledgment message, then take care of
261                  * removing it now.
262                  */
263                 if (session.isRemoveLater()) {
264                     session.setOutState(Session.TERMINATED);
265                     mux.removeSession(session.sessionID);
266                     session.setRemoveLater(false);
267                 }
268             }
269             sessionLock.notifyAll();
270         }
271     }
272 
273     /**
274      * @return the sentAcknowledgment
275      */
276     boolean isSentAcknowledgment() {
277         return sentAcknowledgment;
278     }
279 
280     /**
281      * @return the inBufRemaining
282      */
283     int getBufRemaining() {
284         return inBufRemaining;
285     }
286 
287     /**
288      * @return the inClosed
289      */
290     boolean isClosed() {
291         return inClosed;
292     }
293 
294     /**
295      * @param inBufRemaining the inBufRemaining to set
296      */
297     void setBufRemaining(int inBufRemaining) {
298         this.inBufRemaining = inBufRemaining;
299     }
300 
301     /**
302      * @param inEOF the inEOF to set
303      */
304     void setEOF(boolean inEOF) {
305         this.inEOF = inEOF;
306     }
307     
308 }