View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.river.jeri.internal.mux;
20  
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  import java.util.logging.Level;
25  
26  /**
27   * Output stream returned by OutboundRequests and InboundRequests for
28   * a session of a multiplexed connection.
29   */
30  class MuxOutputStream extends OutputStream {
31      private final ByteBuffer buffer;
32      private final Object sessionLock;
33      private final Session session;
34      private final Mux mux;
35      private boolean fakeOKtoWrite = false; // REMIND
36      private IOException sessionDown = null;
37  
38      MuxOutputStream(Mux mux, Session session, Object sessionLock) {
39          this.sessionLock = sessionLock;
40          this.session = session;
41          this.mux = mux;
42          this.buffer = mux.directBuffersUseful() 
43                  ? ByteBuffer.allocateDirect(mux.maxFragmentSize) 
44                  : ByteBuffer.allocate(mux.maxFragmentSize);
45      }
46  
47      void abort() {
48          fakeOKtoWrite = false;
49      }
50  
51      void handleClose() {
52          fakeOKtoWrite = true;
53      }
54  
55      void down(IOException e) {
56          sessionDown = e;
57      }
58  
59      @Override
60      public void write(int b) throws IOException {
61          if (!buffer.hasRemaining()) {
62              writeBuffer(false);
63          } else {
64              synchronized (sessionLock) {
65                  // REMIND: necessary?
66                  ensureOpen();
67              }
68          }
69          buffer.put((byte) b);
70      }
71  
72      @Override
73      public void write(byte[] b, int off, int len) throws IOException {
74          if (b == null) {
75              throw new NullPointerException();
76          } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
77              throw new IndexOutOfBoundsException();
78          } else if (len == 0) {
79              synchronized (sessionLock) {
80                  ensureOpen();
81              }
82              return;
83          }
84          while (len > 0) {
85              int avail = buffer.remaining();
86              if (len <= avail) {
87                  synchronized (sessionLock) {
88                      ensureOpen();
89                  }
90                  buffer.put(b, off, len);
91                  return;
92              }
93              buffer.put(b, off, avail);
94              off += avail;
95              len -= avail;
96              writeBuffer(false);
97          }
98      }
99  
100     /** Flush method causes deadlock */
101 //    @Override
102 //    public void flush() throws IOException {
103 //        synchronized (sessionLock) {
104 //            ensureOpen();
105 //        }
106 //        while (buffer.hasRemaining()) {
107 //            writeBuffer(false);
108 //        }
109 //    }
110 
111     @Override
112     public void close() throws IOException {
113         if (Session.logger.isLoggable(Level.FINEST)) {
114             Session.logger.log(Level.FINEST, "STACK TRACE", new Throwable("STACK TRACE"));
115         }
116         synchronized (sessionLock) {
117             ensureOpen();
118         }
119         while (!writeBuffer(true)) {
120         }
121     }
122 
123     /**
124      *
125      * This method must ONLY be invoked while synchronized on
126      * this session's lock.
127      */
128     private void ensureOpen() throws IOException {
129         assert Thread.holdsLock(sessionLock);
130         /*
131          * While we're faking that the session is still OK when it really
132          * isn't (see above comments), return silently from here.
133          */
134         if (fakeOKtoWrite) {
135             return;
136         }
137         int outState = session.getOutState();
138         if (outState > Session.OPEN) {
139             if (outState == Session.FINISHED) {
140                 throw new IOException("stream closed");
141             } else {
142                 throw new IOException("session terminated");
143             }
144         } else if (sessionDown != null) {
145             throw sessionDown;
146         }
147     }
148 
149     /**
150      * Writes as much of the contents of this stream's output buffer
151      * as is allowed by the current output ration.  Upon normal return,
152      * at least one byte will have been transferred from the buffer to
153      * the multiplexed connection output queue, and the buffer will have
154      * been compacted, ready to be filled at the current position.
155      *
156      * Returns true if closeIfComplete and session was marked EOF (with
157      * complete buffer written); if true, stream's output buffer should
158      * no longer be accessed (because this method will not wait for
159      * actual writing of the message).
160      */
161     private boolean writeBuffer(boolean closeIfComplete) throws IOException {
162         boolean hasData;
163         int origLimit;
164         buffer.flip();
165         origLimit = buffer.limit();
166         int toSend;
167         IOFuture future = null;
168         boolean eofSent = false;
169         synchronized (sessionLock) {
170             while (buffer.remaining() > 0 
171                     && !session.outRationInfinite 
172                     && session.getOutRation() < 1 
173                     && sessionDown == null 
174                     && session.getOutState() == Session.OPEN) 
175             {
176                 try {
177                     sessionLock.wait(); // REMIND: timeout?
178                 } catch (InterruptedException e) {
179                     String message = "request I/O interrupted";
180                     session.setDown(message, e);
181                     throw new IOException(message, e);
182                 }
183             }
184             ensureOpen();
185             assert buffer.remaining() == 0 || session.outRationInfinite || session.getOutRation() > 0 || fakeOKtoWrite;
186             /*
187              * If we're just faking that the session is OK when it really
188              * isn't, then we need to stop the writing from proceeding
189              * past this barrier-- and if a close was requested, then
190              * satisfy it right away.
191              */
192             if (fakeOKtoWrite) {
193                 assert session.role == Session.CLIENT 
194                         && session.getInState() == Session.TERMINATED;
195                 if (closeIfComplete) fakeOKtoWrite = false;
196                 buffer.position(origLimit);
197                 buffer.compact();
198                 return closeIfComplete;
199             }
200             boolean complete;
201             if (session.outRationInfinite || buffer.remaining() <= session.getOutRation()) {
202                 toSend = buffer.remaining();
203                 complete = true;
204             } else {
205                 toSend = session.getOutRation();
206                 buffer.limit(toSend);
207                 complete = false;
208             }
209             if (!session.outRationInfinite) {
210                 session.setOutRation(session.getOutRation() - toSend);
211             }
212             session.setPartialDeliveryStatus(true);
213             boolean open = session.getOutState() == Session.IDLE;
214             boolean eof = closeIfComplete && complete;
215             boolean close = session.role == Session.SERVER && eof 
216                     && session.getInState() > Session.OPEN;
217             boolean ackRequired = session.role == Session.SERVER 
218                     && eof && session.ackListeners();
219             int op = Mux.Data | (open ? Mux.Data_open : 0) 
220                     | (eof ? Mux.Data_eof : 0) | (close ? Mux.Data_close : 0)
221                     | (ackRequired ? Mux.Data_ackRequired : 0);
222             /*
223              * If we are the server-side, send even the final Data message
224              * for this session synchronously with this method, so that the
225              * VM will not exit before it gets delivered.  Otherwise, let
226              * final Data messages (those with eof true) be sent after this
227              * method completes.
228              *
229              * Buffers are duplicated to avoid a data race that occurred in
230              * StreamConnectionIO.  IOFuture now provides the buffer's position
231              * after sending.
232              */
233             if (!eof || session.role == Session.SERVER) {
234                 future = mux.futureSendData(op, session.sessionID, buffer.duplicate());
235             } else {
236                 mux.asyncSendData(op, session.sessionID, buffer.duplicate());
237             }
238 
239             if (session.getOutState() == Session.IDLE) {
240                 session.setOutState(Session.OPEN);
241                 session.setInState(Session.OPEN);
242             }
243             if (eof) {
244                 eofSent = true;
245                 session.setOutState(close ? Session.TERMINATED : Session.FINISHED);
246                 if (ackRequired) {
247                     session.setSentAckRequired(true);
248                 }
249                 sessionLock.notifyAll();
250             }
251         }
252         if (future != null) {
253             /* StreamConnectionIO uses a dedicated thread for sending buffers, 
254              * but synchronization is only used to get buffers for processing,
255              * no locks are held while sending, for this reason the state of the
256              * buffers position must be obtained from IOFuture, previously, 
257              * reading buffer position depended on a data race.
258              */
259             hasData = waitForIO(future);
260             if (hasData) {
261                 buffer.position(future.getPosition()).limit(origLimit);
262                 buffer.compact();
263             } else {
264                 buffer.clear();
265             }
266         } else {
267             buffer.clear();
268         }
269         return eofSent;
270     }
271 
272     /**
273      *
274      * This method must NOT be invoked while synchronized on
275      * this session's lock.
276      */
277     private boolean waitForIO(IOFuture future) throws IOException {
278         assert !Thread.holdsLock(sessionLock);
279         try {
280             return future.waitUntilDone();
281         } catch (InterruptedException e) {
282             String message = "request I/O interrupted";
283             session.setDown(message, e);
284             throw new IOException(message, e);
285         } catch (IOException e) {
286             session.setDown(e.getMessage(), e.getCause());
287             throw e;
288         }
289     }
290     
291 }