1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.river.jeri.internal.mux;
20
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.nio.ByteBuffer;
24 import java.util.logging.Level;
25
26
27
28
29
30 class MuxOutputStream extends OutputStream {
31 private final ByteBuffer buffer;
32 private final Object sessionLock;
33 private final Session session;
34 private final Mux mux;
35 private boolean fakeOKtoWrite = false;
36 private IOException sessionDown = null;
37
38 MuxOutputStream(Mux mux, Session session, Object sessionLock) {
39 this.sessionLock = sessionLock;
40 this.session = session;
41 this.mux = mux;
42 this.buffer = mux.directBuffersUseful()
43 ? ByteBuffer.allocateDirect(mux.maxFragmentSize)
44 : ByteBuffer.allocate(mux.maxFragmentSize);
45 }
46
47 void abort() {
48 fakeOKtoWrite = false;
49 }
50
51 void handleClose() {
52 fakeOKtoWrite = true;
53 }
54
55 void down(IOException e) {
56 sessionDown = e;
57 }
58
59 @Override
60 public void write(int b) throws IOException {
61 if (!buffer.hasRemaining()) {
62 writeBuffer(false);
63 } else {
64 synchronized (sessionLock) {
65
66 ensureOpen();
67 }
68 }
69 buffer.put((byte) b);
70 }
71
72 @Override
73 public void write(byte[] b, int off, int len) throws IOException {
74 if (b == null) {
75 throw new NullPointerException();
76 } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
77 throw new IndexOutOfBoundsException();
78 } else if (len == 0) {
79 synchronized (sessionLock) {
80 ensureOpen();
81 }
82 return;
83 }
84 while (len > 0) {
85 int avail = buffer.remaining();
86 if (len <= avail) {
87 synchronized (sessionLock) {
88 ensureOpen();
89 }
90 buffer.put(b, off, len);
91 return;
92 }
93 buffer.put(b, off, avail);
94 off += avail;
95 len -= avail;
96 writeBuffer(false);
97 }
98 }
99
100
101
102
103
104
105
106
107
108
109
110
111 @Override
112 public void close() throws IOException {
113 if (Session.logger.isLoggable(Level.FINEST)) {
114 Session.logger.log(Level.FINEST, "STACK TRACE", new Throwable("STACK TRACE"));
115 }
116 synchronized (sessionLock) {
117 ensureOpen();
118 }
119 while (!writeBuffer(true)) {
120 }
121 }
122
123
124
125
126
127
128 private void ensureOpen() throws IOException {
129 assert Thread.holdsLock(sessionLock);
130
131
132
133
134 if (fakeOKtoWrite) {
135 return;
136 }
137 int outState = session.getOutState();
138 if (outState > Session.OPEN) {
139 if (outState == Session.FINISHED) {
140 throw new IOException("stream closed");
141 } else {
142 throw new IOException("session terminated");
143 }
144 } else if (sessionDown != null) {
145 throw sessionDown;
146 }
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160
161 private boolean writeBuffer(boolean closeIfComplete) throws IOException {
162 boolean hasData;
163 int origLimit;
164 buffer.flip();
165 origLimit = buffer.limit();
166 int toSend;
167 IOFuture future = null;
168 boolean eofSent = false;
169 synchronized (sessionLock) {
170 while (buffer.remaining() > 0
171 && !session.outRationInfinite
172 && session.getOutRation() < 1
173 && sessionDown == null
174 && session.getOutState() == Session.OPEN)
175 {
176 try {
177 sessionLock.wait();
178 } catch (InterruptedException e) {
179 String message = "request I/O interrupted";
180 session.setDown(message, e);
181 throw new IOException(message, e);
182 }
183 }
184 ensureOpen();
185 assert buffer.remaining() == 0 || session.outRationInfinite || session.getOutRation() > 0 || fakeOKtoWrite;
186
187
188
189
190
191
192 if (fakeOKtoWrite) {
193 assert session.role == Session.CLIENT
194 && session.getInState() == Session.TERMINATED;
195 if (closeIfComplete) fakeOKtoWrite = false;
196 buffer.position(origLimit);
197 buffer.compact();
198 return closeIfComplete;
199 }
200 boolean complete;
201 if (session.outRationInfinite || buffer.remaining() <= session.getOutRation()) {
202 toSend = buffer.remaining();
203 complete = true;
204 } else {
205 toSend = session.getOutRation();
206 buffer.limit(toSend);
207 complete = false;
208 }
209 if (!session.outRationInfinite) {
210 session.setOutRation(session.getOutRation() - toSend);
211 }
212 session.setPartialDeliveryStatus(true);
213 boolean open = session.getOutState() == Session.IDLE;
214 boolean eof = closeIfComplete && complete;
215 boolean close = session.role == Session.SERVER && eof
216 && session.getInState() > Session.OPEN;
217 boolean ackRequired = session.role == Session.SERVER
218 && eof && session.ackListeners();
219 int op = Mux.Data | (open ? Mux.Data_open : 0)
220 | (eof ? Mux.Data_eof : 0) | (close ? Mux.Data_close : 0)
221 | (ackRequired ? Mux.Data_ackRequired : 0);
222
223
224
225
226
227
228
229
230
231
232
233 if (!eof || session.role == Session.SERVER) {
234 future = mux.futureSendData(op, session.sessionID, buffer.duplicate());
235 } else {
236 mux.asyncSendData(op, session.sessionID, buffer.duplicate());
237 }
238
239 if (session.getOutState() == Session.IDLE) {
240 session.setOutState(Session.OPEN);
241 session.setInState(Session.OPEN);
242 }
243 if (eof) {
244 eofSent = true;
245 session.setOutState(close ? Session.TERMINATED : Session.FINISHED);
246 if (ackRequired) {
247 session.setSentAckRequired(true);
248 }
249 sessionLock.notifyAll();
250 }
251 }
252 if (future != null) {
253
254
255
256
257
258
259 hasData = waitForIO(future);
260 if (hasData) {
261 buffer.position(future.getPosition()).limit(origLimit);
262 buffer.compact();
263 } else {
264 buffer.clear();
265 }
266 } else {
267 buffer.clear();
268 }
269 return eofSent;
270 }
271
272
273
274
275
276
277 private boolean waitForIO(IOFuture future) throws IOException {
278 assert !Thread.holdsLock(sessionLock);
279 try {
280 return future.waitUntilDone();
281 } catch (InterruptedException e) {
282 String message = "request I/O interrupted";
283 session.setDown(message, e);
284 throw new IOException(message, e);
285 } catch (IOException e) {
286 session.setDown(e.getMessage(), e.getCause());
287 throw e;
288 }
289 }
290
291 }