1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.river.jeri.internal.mux;
20
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.nio.ByteBuffer;
24 import java.util.logging.Level;
25
26 /**
27 * Output stream returned by OutboundRequests and InboundRequests for
28 * a session of a multiplexed connection.
29 */
30 class MuxOutputStream extends OutputStream {
31 private final ByteBuffer buffer;
32 private final Object sessionLock;
33 private final Session session;
34 private final Mux mux;
35 private boolean fakeOKtoWrite = false; // REMIND
36 private IOException sessionDown = null;
37
38 MuxOutputStream(Mux mux, Session session, Object sessionLock) {
39 this.sessionLock = sessionLock;
40 this.session = session;
41 this.mux = mux;
42 this.buffer = mux.directBuffersUseful()
43 ? ByteBuffer.allocateDirect(mux.maxFragmentSize)
44 : ByteBuffer.allocate(mux.maxFragmentSize);
45 }
46
47 void abort() {
48 fakeOKtoWrite = false;
49 }
50
51 void handleClose() {
52 fakeOKtoWrite = true;
53 }
54
55 void down(IOException e) {
56 sessionDown = e;
57 }
58
59 @Override
60 public void write(int b) throws IOException {
61 if (!buffer.hasRemaining()) {
62 writeBuffer(false);
63 } else {
64 synchronized (sessionLock) {
65 // REMIND: necessary?
66 ensureOpen();
67 }
68 }
69 buffer.put((byte) b);
70 }
71
72 @Override
73 public void write(byte[] b, int off, int len) throws IOException {
74 if (b == null) {
75 throw new NullPointerException();
76 } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
77 throw new IndexOutOfBoundsException();
78 } else if (len == 0) {
79 synchronized (sessionLock) {
80 ensureOpen();
81 }
82 return;
83 }
84 while (len > 0) {
85 int avail = buffer.remaining();
86 if (len <= avail) {
87 synchronized (sessionLock) {
88 ensureOpen();
89 }
90 buffer.put(b, off, len);
91 return;
92 }
93 buffer.put(b, off, avail);
94 off += avail;
95 len -= avail;
96 writeBuffer(false);
97 }
98 }
99
100 /** Flush method causes deadlock */
101 // @Override
102 // public void flush() throws IOException {
103 // synchronized (sessionLock) {
104 // ensureOpen();
105 // }
106 // while (buffer.hasRemaining()) {
107 // writeBuffer(false);
108 // }
109 // }
110
111 @Override
112 public void close() throws IOException {
113 if (Session.logger.isLoggable(Level.FINEST)) {
114 Session.logger.log(Level.FINEST, "STACK TRACE", new Throwable("STACK TRACE"));
115 }
116 synchronized (sessionLock) {
117 ensureOpen();
118 }
119 while (!writeBuffer(true)) {
120 }
121 }
122
123 /**
124 *
125 * This method must ONLY be invoked while synchronized on
126 * this session's lock.
127 */
128 private void ensureOpen() throws IOException {
129 assert Thread.holdsLock(sessionLock);
130 /*
131 * While we're faking that the session is still OK when it really
132 * isn't (see above comments), return silently from here.
133 */
134 if (fakeOKtoWrite) {
135 return;
136 }
137 int outState = session.getOutState();
138 if (outState > Session.OPEN) {
139 if (outState == Session.FINISHED) {
140 throw new IOException("stream closed");
141 } else {
142 throw new IOException("session terminated");
143 }
144 } else if (sessionDown != null) {
145 throw sessionDown;
146 }
147 }
148
149 /**
150 * Writes as much of the contents of this stream's output buffer
151 * as is allowed by the current output ration. Upon normal return,
152 * at least one byte will have been transferred from the buffer to
153 * the multiplexed connection output queue, and the buffer will have
154 * been compacted, ready to be filled at the current position.
155 *
156 * Returns true if closeIfComplete and session was marked EOF (with
157 * complete buffer written); if true, stream's output buffer should
158 * no longer be accessed (because this method will not wait for
159 * actual writing of the message).
160 */
161 private boolean writeBuffer(boolean closeIfComplete) throws IOException {
162 boolean hasData;
163 int origLimit;
164 buffer.flip();
165 origLimit = buffer.limit();
166 int toSend;
167 IOFuture future = null;
168 boolean eofSent = false;
169 synchronized (sessionLock) {
170 while (buffer.remaining() > 0
171 && !session.outRationInfinite
172 && session.getOutRation() < 1
173 && sessionDown == null
174 && session.getOutState() == Session.OPEN)
175 {
176 try {
177 sessionLock.wait(); // REMIND: timeout?
178 } catch (InterruptedException e) {
179 String message = "request I/O interrupted";
180 session.setDown(message, e);
181 throw new IOException(message, e);
182 }
183 }
184 ensureOpen();
185 assert buffer.remaining() == 0 || session.outRationInfinite || session.getOutRation() > 0 || fakeOKtoWrite;
186 /*
187 * If we're just faking that the session is OK when it really
188 * isn't, then we need to stop the writing from proceeding
189 * past this barrier-- and if a close was requested, then
190 * satisfy it right away.
191 */
192 if (fakeOKtoWrite) {
193 assert session.role == Session.CLIENT
194 && session.getInState() == Session.TERMINATED;
195 if (closeIfComplete) fakeOKtoWrite = false;
196 buffer.position(origLimit);
197 buffer.compact();
198 return closeIfComplete;
199 }
200 boolean complete;
201 if (session.outRationInfinite || buffer.remaining() <= session.getOutRation()) {
202 toSend = buffer.remaining();
203 complete = true;
204 } else {
205 toSend = session.getOutRation();
206 buffer.limit(toSend);
207 complete = false;
208 }
209 if (!session.outRationInfinite) {
210 session.setOutRation(session.getOutRation() - toSend);
211 }
212 session.setPartialDeliveryStatus(true);
213 boolean open = session.getOutState() == Session.IDLE;
214 boolean eof = closeIfComplete && complete;
215 boolean close = session.role == Session.SERVER && eof
216 && session.getInState() > Session.OPEN;
217 boolean ackRequired = session.role == Session.SERVER
218 && eof && session.ackListeners();
219 int op = Mux.Data | (open ? Mux.Data_open : 0)
220 | (eof ? Mux.Data_eof : 0) | (close ? Mux.Data_close : 0)
221 | (ackRequired ? Mux.Data_ackRequired : 0);
222 /*
223 * If we are the server-side, send even the final Data message
224 * for this session synchronously with this method, so that the
225 * VM will not exit before it gets delivered. Otherwise, let
226 * final Data messages (those with eof true) be sent after this
227 * method completes.
228 *
229 * Buffers are duplicated to avoid a data race that occurred in
230 * StreamConnectionIO. IOFuture now provides the buffer's position
231 * after sending.
232 */
233 if (!eof || session.role == Session.SERVER) {
234 future = mux.futureSendData(op, session.sessionID, buffer.duplicate());
235 } else {
236 mux.asyncSendData(op, session.sessionID, buffer.duplicate());
237 }
238
239 if (session.getOutState() == Session.IDLE) {
240 session.setOutState(Session.OPEN);
241 session.setInState(Session.OPEN);
242 }
243 if (eof) {
244 eofSent = true;
245 session.setOutState(close ? Session.TERMINATED : Session.FINISHED);
246 if (ackRequired) {
247 session.setSentAckRequired(true);
248 }
249 sessionLock.notifyAll();
250 }
251 }
252 if (future != null) {
253 /* StreamConnectionIO uses a dedicated thread for sending buffers,
254 * but synchronization is only used to get buffers for processing,
255 * no locks are held while sending, for this reason the state of the
256 * buffers position must be obtained from IOFuture, previously,
257 * reading buffer position depended on a data race.
258 */
259 hasData = waitForIO(future);
260 if (hasData) {
261 buffer.position(future.getPosition()).limit(origLimit);
262 buffer.compact();
263 } else {
264 buffer.clear();
265 }
266 } else {
267 buffer.clear();
268 }
269 return eofSent;
270 }
271
272 /**
273 *
274 * This method must NOT be invoked while synchronized on
275 * this session's lock.
276 */
277 private boolean waitForIO(IOFuture future) throws IOException {
278 assert !Thread.holdsLock(sessionLock);
279 try {
280 return future.waitUntilDone();
281 } catch (InterruptedException e) {
282 String message = "request I/O interrupted";
283 session.setDown(message, e);
284 throw new IOException(message, e);
285 } catch (IOException e) {
286 session.setDown(e.getMessage(), e.getCause());
287 throw e;
288 }
289 }
290
291 }