View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.river.jeri.internal.mux;
20  
21  import org.apache.river.logging.Levels;
22  import org.apache.river.thread.Executor;
23  import org.apache.river.thread.GetThreadPoolAction;
24  import java.io.EOFException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.OutputStream;
28  import java.nio.ByteBuffer;
29  import java.nio.channels.ReadableByteChannel;
30  import java.nio.channels.WritableByteChannel;
31  import java.security.AccessController;
32  import java.util.Deque;
33  import java.util.LinkedList;
34  import java.util.logging.Level;
35  import java.util.logging.Logger;
36  
37  /**
38   * StreamConnectionIO implements the ConnectionIO abstraction for a
39   * connection accessible through standard (blocking) I/O streams, i.e.
40   * java.io.OutputStream and java.io.InputStream.
41   *
42   * @author Sun Microsystems, Inc.
43   **/
44  final class StreamConnectionIO extends ConnectionIO {
45  
46      private static final int RECEIVE_BUFFER_SIZE = 2048;
47  
48      /**
49       * pool of threads for executing tasks in system thread group:
50       * used for I/O (reader and writer) threads
51       */
52      private static final Executor systemThreadPool =
53  	(Executor) AccessController.doPrivileged(
54  	    new GetThreadPoolAction(false));
55  
56      /** mux logger */
57      private static final Logger logger =
58  	Logger.getLogger("net.jini.jeri.connection.mux");
59  
60      /** I/O streams for underlying connection */
61      private final OutputStream out;
62      private final InputStream in;
63  
64      /** channels wrapped around underlying I/O streams */
65      private final WritableByteChannel outChannel;
66      private final ReadableByteChannel inChannel;
67  
68      /**
69       * queue of buffers of data to be sent over connection, interspersed
70       * with IOFuture objects that need to be notified in sequence
71       * 
72       * Synchronised on super.mux.muxLock;
73       */
74      private final Deque sendQueue;
75  
76      
77      /**
78       * Creates a new StreamConnectionIO for the connection represented by
79       * the supplied OutputStream and InputStream pair.
80       */
81      StreamConnectionIO(Mux mux, OutputStream out, InputStream in) {
82  	super(mux);
83  	this.out = out;
84  //	this.out = new BufferedOutputStream(out);
85  	this.in = in;
86  
87  	outChannel = newChannel(out);
88  	inChannel = newChannel(in);
89          sendQueue = new LinkedList();
90      }
91  
92      /**
93       * Starts processing connection data.  This method starts
94       * asynchronous actions to read and write from the connection.
95       */
96      @Override
97      void start() throws IOException {
98  	try {
99  	    systemThreadPool.execute(new Writer(), "mux writer");
100 	    systemThreadPool.execute(new Reader(), "mux reader");
101 	} catch (OutOfMemoryError e) {	// assume out of threads
102 	    try {
103 		logger.log(Level.WARNING,
104 			   "could not create thread for request dispatch", e);
105 	    } catch (Throwable t) {
106 	    }
107 	    throw new IOException("could not create I/O threads", e);
108 	}
109     }
110 
111     @Override
112     void asyncSend(ByteBuffer buffer) {
113 	synchronized (mux.muxLock) {
114 	    if (mux.muxDown) {
115 		return;
116 	    }
117 	    sendQueue.addLast(buffer);
118 	    mux.muxLock.notifyAll();
119 	}
120     }
121 
122     @Override
123     void asyncSend(ByteBuffer first, ByteBuffer second) {
124 	synchronized (mux.muxLock) {
125 	    if (mux.muxDown) {
126 		return;
127 	    }
128 	    sendQueue.addLast(first);
129 	    sendQueue.addLast(second);
130 	    mux.muxLock.notifyAll();
131 	}
132     }
133 
134     @Override
135     IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
136 	synchronized (mux.muxLock) {
137 	    IOFuture future = new IOFuture();
138 	    if (mux.muxDown) {
139 		IOException ioe = new IOException(mux.muxDownMessage, mux.muxDownCause);
140 		future.done(ioe);
141 		return future;
142 	    }
143 	    sendQueue.addLast(first);
144 	    sendQueue.addLast(second);
145 	    sendQueue.addLast(future);
146 	    mux.muxLock.notifyAll();
147 	    return future;
148 	}
149 	/*
150 	 * REMIND: Can/should we implement any sort of
151 	 * priority inversion avoidance scheme here?
152 	 */
153     }
154 
155     private class Writer implements Runnable {
156 	Writer() { }
157 
158         @Override
159 	public void run() {
160 	    Deque localQueue = null;
161 	    try {
162 		while (true) {
163 		    synchronized (mux.muxLock) {
164 			while (!mux.muxDown && sendQueue.isEmpty()) {
165 			    /*
166 			     * REMIND: Should we use a timeout here, to send
167 			     * occasional PING messages during periods of
168 			     * inactivity, to make sure connection is alive?
169 			     */
170 			    mux.muxLock.wait();
171 			    /*
172 			     * Let an interrupt during the wait just kill this
173 			     * thread, because an interrupt during an I/O write
174 			     * would leave it in an unrecoverable state anyway.
175 			     */
176 			}
177 			if (mux.muxDown && sendQueue.isEmpty()) {
178 			    logger.log(Level.FINEST,
179 				       "mux writer thread dying, connection " +
180 				       "down and nothing more to send");
181 			    break;
182 			}
183                         /* Clone an unshared copy and clear the queue while synchronized */
184 			localQueue = new LinkedList(sendQueue);
185 			sendQueue.clear();
186 		    }
187 
188 		    boolean needToFlush = false;
189                     ByteBuffer last = null;
190                     int lastIndex = Integer.MIN_VALUE;
191 		    for  ( int i = 0; !localQueue.isEmpty(); i++) {
192 			Object next = localQueue.getFirst();
193 			if (next instanceof ByteBuffer) {
194                             ByteBuffer buffer = (ByteBuffer) next;
195 			    outChannel.write((buffer));
196                             last = buffer;
197                             lastIndex = i;
198 			    needToFlush = true;
199 			} else {
200 			    assert next instanceof IOFuture;
201 			    if (needToFlush) {
202 				out.flush();
203 				needToFlush = false;
204 			    }
205                             if (lastIndex == i - 1 && last != null){
206                                 ((IOFuture) next).done(last.position());
207                             } else {
208                                 ((IOFuture) next).done();
209                             }
210 			}
211 			localQueue.removeFirst();
212 		    }
213 		    if (needToFlush) {
214 			out.flush();
215 		    }
216 		}
217 	    } catch (InterruptedException e) {
218 		try {
219 		    logger.log(Level.WARNING,
220 			       "mux writer thread dying, interrupted", e);
221 		} catch (Throwable t) {
222 		}
223 		mux.setDown("mux writer thread interrupted", e);
224 	    } catch (IOException e) {
225 		try {
226 		    logger.log(Levels.HANDLED,
227 			       "mux writer thread dying, I/O error", e);
228 		} catch (Throwable t) {
229 		}
230 		mux.setDown("I/O error writing to mux connection: " +
231 			    e.toString(), e);
232 	    } catch (Throwable t) {
233 		try {
234 		    logger.log(Level.WARNING,
235 			"mux writer thread dying, unexpected exception", t);
236 		} catch (Throwable tt) {
237 		}
238 		mux.setDown("unexpected exception in mux writer thread: " +
239 			    t.toString(), t);
240 	    } finally {
241 		synchronized (mux.muxLock) {
242 		    assert mux.muxDown;
243 		    if (localQueue != null) {
244 			drainQueue(localQueue);
245 		    }
246 		    drainQueue(sendQueue);
247 		}
248 		try {
249 		    outChannel.close();
250 		} catch (IOException e) {
251 		}
252 	    }
253 	}
254     }
255 
256     private void drainQueue(Deque queue) {
257 	while (!queue.isEmpty()) {
258 	    Object next = queue.removeFirst();
259 	    if (next instanceof IOFuture) {
260 		IOException ioe = new IOException(mux.muxDownMessage, mux.muxDownCause);
261 		((IOFuture) next).done(ioe);
262 	    }
263 	}
264     }
265 
266     private class Reader implements Runnable {
267         /** buffer for reading incoming data from connection */
268         private final ByteBuffer inputBuffer =
269             ByteBuffer.allocate(RECEIVE_BUFFER_SIZE);	// ready for reading
270 
271 	Reader() { }
272 
273 	public void run() {
274 	    try {
275 		while (true) {
276 		    int n = inChannel.read(inputBuffer);
277 		    if (n == -1) {
278 			throw new EOFException();
279 		    }
280 		    assert n > 0;	// channel is assumed to be blocking
281 		    mux.processIncomingData(inputBuffer);
282 		    assert inputBuffer.hasRemaining();
283 		}
284 	    } catch (ProtocolException e) {
285 		IOFuture future = null;
286 		synchronized (mux.muxLock) {
287 		    /*
288 		     * If mux connection is already down, then we probably got
289 		     * here because of the receipt of a normal protocol-ending
290 		     * message, like Shutdown or Error, or else something else
291 		     * went wrong anyway.  Otherwise, a real protocol violation
292 		     * was detected, so respond with an Error message before
293 		     * taking down the whole mux connection.
294 		     */
295 		    if (!mux.muxDown) {
296 			try {
297 			    logger.log(Levels.HANDLED,
298 				"mux reader thread dying, protocol error", e);
299 			} catch (Throwable t) {
300 			}
301 			future = mux.futureSendError(e.getMessage());
302 			mux.setDown("protocol violation detected: " +	
303 				    e.getMessage(), null);
304 		    } else {
305 			try {
306 			    logger.log(Level.FINEST,
307 				"mux reader thread dying: " + e.getMessage());
308 			} catch (Throwable t) {
309 			}
310 		    }
311 		}
312 		if (future != null) {
313 		    try {
314 			future.waitUntilDone();
315 		    } catch (IOException ignore) {
316 		    } catch (InterruptedException interrupt) {
317 			Thread.currentThread().interrupt();
318 		    }
319 		}
320 	    } catch (IOException e) {
321 		try {
322 		    logger.log(Levels.HANDLED,
323 			       "mux reader thread dying, I/O error", e);
324 		} catch (Throwable t) {
325 		}
326 		mux.setDown("I/O error reading from mux connection: " +
327 			    e.toString(), e);
328 	    } catch (Throwable t) {
329 		try {
330 		    logger.log(Level.WARNING,
331 			"mux reader thread dying, unexpected exception", t);
332 		} catch (Throwable tt) {
333 		}
334 		mux.setDown("unexpected exception in mux reader thread: " +
335 			    t.toString(), t);
336 	    } finally {
337 		try {
338 		    inChannel.close();
339 		} catch (IOException e) {
340 		}
341 	    }
342 	}
343     }
344 
345     /**
346      * The following two methods are modifications of their
347      * equivalents in java.nio.channels.Channels with the assumption
348      * that the supplied byte buffers are backed by arrays, so no
349      * additional copying is required.
350      */
351 
352     public static ReadableByteChannel newChannel(final InputStream in) {
353 	return new ReadableByteChannel() {
354 	    private boolean open = true;
355 
356             // must be synchronized as per ReadableByteChannel contract
357             @Override
358 	    public synchronized int read(ByteBuffer dst) throws IOException {
359 		assert dst.hasArray();
360 		byte[] array = dst.array();
361 		int arrayOffset = dst.arrayOffset();
362 
363 		int totalRead = 0;
364 		int bytesRead = 0;
365 		int bytesToRead;
366 		while ((bytesToRead = dst.remaining()) > 0) {
367 		    if ((totalRead > 0) && !(in.available() > 0)) {
368 			break; // block at most once
369 		    }
370 		    int pos = dst.position();
371 		    bytesRead = in.read(array, arrayOffset + pos, bytesToRead);
372 		    if (bytesRead < 0) {
373 			break;
374 		    } else {
375 			dst.position(pos + bytesRead);
376 			totalRead += bytesRead;
377 		    }
378 		}
379 		if ((bytesRead < 0) && (totalRead == 0)) {
380 		    return -1;
381 		}
382 
383 		return totalRead;
384 	    }
385                 
386             @Override
387 	    public synchronized boolean isOpen() {
388 		return open;
389 	    }
390             
391             // Blocking as per Channel contract
392             @Override
393 	    public synchronized void close() throws IOException {
394 		in.close();
395 		open = false;
396 	    }
397 	};
398     }
399 
400     public static WritableByteChannel newChannel(final OutputStream out) {
401 	return new WritableByteChannel() {
402 	    private volatile boolean open = true;
403             
404             // This method must block while writing as per WritableByteChannel contract.
405             @Override
406 	    public synchronized int write(ByteBuffer src) throws IOException {
407                     assert src.hasArray();
408 
409                     int len = src.remaining();
410                     if (len > 0) {
411                         int pos = src.position();
412                         out.write(src.array(), src.arrayOffset() + pos, len);
413                         src.position(pos + len);
414                     }
415                     return len;
416                 }
417                 
418             @Override
419 	    public boolean isOpen() {
420 		return open;
421 	    }
422 
423             // This method must block as per the Channel contract
424             @Override
425 	    public synchronized void close() throws IOException {
426 		out.close();
427 		open = false;
428 	    }
429 	};
430     }
431 
432 }