View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.river.outrigger.snaplogstore;
19  
20  import org.apache.river.outrigger.LogOps;
21  import org.apache.river.outrigger.OutriggerServerImpl;
22  import org.apache.river.outrigger.proxy.StorableObject;
23  import org.apache.river.outrigger.proxy.StorableResource;
24  import net.jini.id.Uuid;
25  
26  import java.io.File;
27  import java.io.FileDescriptor;
28  import java.io.IOException;
29  import java.io.ObjectOutputStream;
30  import java.io.RandomAccessFile;
31  import java.util.ArrayList;
32  import java.util.Observable;
33  import java.util.logging.Level;
34  import java.util.logging.Logger;
35  
36  import net.jini.space.InternalSpaceException;
37  
38  /**
39   * A class to write a log file, to be read later by
40   * <code>LogInputFile</code>.  Each operation on the file is forced to
41   * disk, so when the operation logging function returns, the data is
42   * committed to the log in a recoverable way.
43   * <p>
44   * <code>LogOutputFile</code> cannot extend <code>Observable</code>
45   * because it must extend <code>LogFile</code> (clearly
46   * <code>Observable</code> should have been an interface).  It acts as
47   * an <code>Observable</code> by having a method that returns its
48   * "observable part", which is an object that reports observable
49   * events.  Right now the only observable event is the switching to a
50   * new physical file when the current one becomes full.
51   *
52   * @author Sun Microsystems, Inc.
53   * @see LogInputFile
54   * @see java.util.Observable
55   */
56  class LogOutputFile extends LogFile implements LogOps {
57      private volatile RandomAccessFile	logFile = null;// the current output log file
58      private volatile FileDescriptor	logFD;	   // the current log file descriptor
59      private volatile ObjectOutputStream	out;	   // objects written
60      private volatile int			suffix;	   // the current suffix number
61      private volatile int			opCnt;	   // number of ops on current file
62      private volatile int			maxOps;	   // max ops to allow in file
63      private volatile Observable		observable;// handle Observer/Observable
64  
65      private volatile long logBytes = 0;
66      private final byte[] intBuf = new byte[4];
67      private final byte[] zeroBuf = new byte[4];
68  
69      private volatile long deferedUpdateLength = 0;
70      private volatile long deferedPosition = 0;
71  
72      private static final long intBytes = 4;
73  
74      /** Logger for logging persistent store related information */
75      private static final Logger logger = 
76  	Logger.getLogger(OutriggerServerImpl.storeLoggerName);
77  
78      /**
79       * Create a <code>LogOutputFile</code> object that will stream
80       * output to a series of files described by <code>basePath</code>,
81       * as interpreted by the relevant <code>LogFile</code>
82       * constructor.  When the file becomes full (the maximum number of
83       * operations is reached), the file is closed and a new file with
84       * the next highest suffix is created.  The
85       * <code>Observable</code> notification for this event passes a
86       * <code>File</code> argument for the filled file as the argument
87       * to <code>Observer</code>.
88       * 
89       * @see #observable()
90       */
91      //@see org.apache.river.mercury.LogStream#LogStream(String)
92      LogOutputFile(String basePath, int maxOps) throws IOException {
93  	super(basePath);
94  	ArrayList inDir = new ArrayList();
95  	suffix = existingLogs(inDir);
96  	this.maxOps = maxOps;
97  	nextPath();
98      }
99  
100     /**
101      * Return an <code>Observable</code> object that represents this object
102      * in the Observer/Observable pattern.
103      *
104      * @see java.util.Observer
105      */
106     Observable observable() {
107 	if (observable == null) {	     // defer allocation until needed
108 	    observable = new Observable() {  // we only use this if changed
109 		public void notifyObservers() {
110 		    setChanged();
111 		    super.notifyObservers();
112 		}
113 		public void notifyObservers(Object arg) {
114 		    setChanged();
115 		    super.notifyObservers(arg);
116 		}
117 	    };
118 	}
119 	return observable;
120     }
121 
122     /**
123      * Switch this over to the next path in the list
124      */
125     private void nextPath() throws IOException {
126 	boolean completed = false;
127 
128 	if (logFile != null) {
129 
130 	    // If there was a deferred header, write it out now
131 	    //
132 	    if (deferedUpdateLength != 0) {
133 		logFD.sync();		// force the bytes to disk
134 		logFile.seek(deferedPosition);
135 		writeInt((int)deferedUpdateLength);
136 	    }
137 	    try {
138 		close();   	        // close the stream and the file
139 	    } catch (IOException ignore) { } // assume this is okay
140 	    completed = true;
141 	}
142 
143 	suffix++;			// go to next suffix
144 	logFile = new RandomAccessFile(baseDir.getPath() + File.separator +
145 				       baseFile + suffix, "rw");
146 	logFD = logFile.getFD();
147 	out = new ObjectOutputStream(new LogOutputStream(logFile));
148 
149 	writeInt(LOG_VERSION);
150 
151 	logBytes = logFile.getFilePointer();
152 	logFile.setLength(logBytes);
153 
154 	// always start out with zero length header for the next update
155 	logFile.write(zeroBuf);
156 
157 	// force length header to disk 
158 	logFD.sync();
159 
160 	deferedUpdateLength = 0;
161 	opCnt = 0;
162 
163 	/*
164 	 * Tell consumer about the completed log.  This is done after the
165 	 * new one is created so that the old path can be known not
166 	 * to be the newest (because something newer is there).
167 	 */
168 	if (observable != null && completed)
169 	    observable.notifyObservers();
170     }
171 
172     /**
173      * Close the log, but don't remove it.
174      */
175     synchronized void close() throws IOException {
176 	if (logFile != null) {
177 	    try {
178 		out.close();
179 		logFile.close();
180 	    } finally {
181 		logFile = null;
182 	    }
183 	}
184     }
185 
186     /**
187      * Override destroy so we can try to close logFile before calling
188      * super tries to delete all the files.
189      */
190     void destroy() {
191 	try {
192 	    close();
193 	} catch (Throwable t) {
194 	    // Don't let failure keep us from deleting the files we can	    
195 	}
196 	super.destroy();
197     }
198 
199     /**
200      * Log a server boot.
201      */
202     public synchronized void bootOp(long time, long sessionId) {
203 	try {
204 	    out.writeByte(BOOT_OP);
205 	    out.writeLong(time);
206 	    out.writeLong(sessionId);
207 	    flush();
208 	} catch (IOException e) {
209 	    failed(e);
210 	}
211     }
212 
213     /**
214      * Log a change in join state
215      */
216     public synchronized void joinStateOp(StorableObject state) {
217 	try {
218 	    out.writeByte(JOINSTATE_OP);
219 	    out.writeObject(new BaseObject(state));
220 	    flush();
221 	} catch (IOException e) {
222 	    failed(e);
223 	}
224     }
225 
226     /**
227      * Log a <code>write</code> operation.
228      */
229     public synchronized void writeOp(StorableResource entry, Long txnId) {
230 	try {
231 	    out.writeByte(WRITE_OP);
232 	    out.writeObject(new Resource(entry));
233 	    out.writeObject(txnId);
234 
235 	    // A write operation under a transaction does not need to be
236 	    // flushed until it is prepared.
237 	    //
238 	    flush(txnId == null);
239 	} catch (IOException e) {
240 	    failed(e);
241 	}
242     }
243 
244     // Inherit java doc from supertype
245     public synchronized void writeOp(StorableResource entries[], Long txnId) {
246 	try {
247 	    out.writeByte(BATCH_WRITE_OP);
248 	    out.writeObject(txnId);
249 
250 	    // In the middle of records we need to use the stream's
251 	    // writeInt, not our private one	    
252 	    out.writeInt(entries.length);
253 	    for (int i=0; i<entries.length; i++) {
254 		out.writeObject(new Resource(entries[i]));
255 	    }
256 
257 	    // A write operation under a transaction does not need to be
258 	    // flushed until it is prepared.
259 	    //
260 	    flush(txnId == null, entries.length);
261 	} catch (IOException e) {
262 	    failed(e);
263 	}
264     }
265 
266     /**
267      * Log a <code>take</code> operation.
268      */
269     public synchronized void takeOp(Uuid cookie, Long txnId) {
270 	try {
271 	    out.writeByte(TAKE_OP);
272 	    cookie.write(out);
273 	    out.writeObject(txnId);
274 
275 	    // A take operation under a transaction does not need to be
276 	    // flushed until it is prepared.
277 	    //
278 	    flush(txnId == null);
279 	} catch (IOException e) {
280 	    failed(e);
281 	}
282     }
283 
284     // Inherit java doc from supertype
285     public synchronized void takeOp(Uuid cookies[], Long txnId) {
286 	try {
287 	    out.writeByte(BATCH_TAKE_OP);
288 	    out.writeObject(txnId);
289 
290 	    // In the middle of records we need to use the stream's
291 	    // writeInt, not our private one	    
292 	    out.writeInt(cookies.length);
293 	    for (int i=0; i<cookies.length; i++) {
294 		cookies[i].write(out);
295 	    }
296 
297 	    // A take operation under a transaction does not need to be
298 	    // flushed until it is prepared.
299 	    //
300 	    flush(txnId == null, cookies.length);
301 	} catch (IOException e) {
302 	    failed(e);
303 	}
304     }
305 
306     /**
307      * Log a <code>notify</code> operation.
308      */
309     public synchronized void registerOp(StorableResource registration,
310 					String type, StorableObject[] templates) 
311     {
312 	try {
313 	    out.writeByte(REGISTER_OP);
314 	    out.writeObject(new Registration(registration, type, templates));
315 	    flush();
316 	} catch (IOException e) {
317 	    failed(e);
318 	}
319     }
320 
321     /**
322      * Log a <code>renew</code> operation.
323      */
324     public synchronized void renewOp(Uuid cookie, long expiration) {
325 	try {
326 	    out.writeByte(RENEW_OP);
327 	    cookie.write(out);
328 	    out.writeLong(expiration);
329 	    flush();
330 	} catch (IOException e) {
331 	    failed(e);
332 	}
333     }
334 
335     /**
336      * Log a <code>cancel</code> operation.
337      */
338     public synchronized void cancelOp(Uuid cookie, boolean expired) {
339 	try {
340 	    out.writeByte(CANCEL_OP);
341 	    cookie.write(out);
342 
343 	    // cancels due to expiration don't need to be flushed
344 	    // right away
345 	    flush(!expired);
346 	} catch (IOException e) {
347 	    failed(e);
348 	}
349     }
350 
351     /**
352      * Log a transaction <code>prepare</code> operation.
353      */
354     public synchronized void prepareOp(Long txnId,
355 				       StorableObject transaction) {
356 	try {
357 	    out.writeByte(PREPARE_OP);
358 	    out.writeObject(txnId);
359 	    out.writeObject(new BaseObject(transaction));
360 	    flush();
361 	} catch (IOException e) {
362 	    failed(e);
363 	}
364     }
365 
366     /**
367      * Log a transaction <code>commit</code> operation.
368      */
369     public synchronized void commitOp(Long txnId) {
370 	try {
371 	    out.writeByte(COMMIT_OP);
372 	    out.writeObject(txnId);
373 	    flush();
374 	} catch (IOException e) {
375 	    failed(e);
376 	}
377     }
378 
379     /**
380      * Log a transaction <code>abort</code> operation.
381      */
382     public synchronized void abortOp(Long txnId) {
383 	try {
384 	    out.writeByte(ABORT_OP);
385 	    out.writeObject(txnId);
386 	    flush();
387 	} catch (IOException e) {
388 	    failed(e);
389 	}
390     }
391 
392     public synchronized void uuidOp(Uuid uuid) {
393 	try {
394 	    out.writeByte(UUID_OP);
395 	    uuid.write(out);
396 	    flush();
397 	} catch (IOException e) {
398 	    failed(e);
399 	}
400     }
401 
402     /**
403      * Flush the current output after an operation.  If the number of
404      * operations is exceeded, shift over to the next path.  
405      */
406     private void flush() throws IOException {
407 	flush(true);
408     }
409     
410 
411     /**
412      * Conditionally flush the current output. If the number of
413      * operations is exceeded, shift over to the next path even if
414      * <code>forceToDisk</code> is <code>false</code>.
415      */
416     private synchronized void flush(boolean forceToDisk) 
417 	throws IOException 
418     {
419 	flush(forceToDisk, 1);
420     }
421 
422     /**
423      * Conditionally flush the current output. If the number of
424      * operations is exceeded, shift over to the next path even if
425      * <code>forceToDisk</code> is <code>false</code>.
426      */
427     private synchronized void flush(boolean forceToDisk,
428 				    int effectiveOpCount)
429 	throws IOException 
430     {
431 	assert effectiveOpCount > 0;
432 
433 	out.flush();
434 
435 	if (forceToDisk) {
436 
437 	    // must force contents to disk before writing real length header
438 	    logFD.sync();
439 	}
440 
441 	long entryEnd = logFile.getFilePointer();
442 	long updateLen = entryEnd - logBytes - intBytes;
443 
444         // If we are not forcing to disk, we want to defer the write of the
445         // first header. This will leave a zero just after the last sync'ed
446         // record and will assure that LogInputFile will not read a partially
447         // written record.
448         //
449         if (!forceToDisk) {
450 
451 	    // If this is the first flush(false) we save the header information
452 	    // and location for later. Otherwise we write out the header
453 	    // normally.
454 	    //
455 	    if (deferedUpdateLength == 0) {
456 		deferedUpdateLength = updateLen;  // save the header length
457 		deferedPosition = logBytes;       // and position for later
458 	    } else {
459 		// write real length header
460 		logFile.seek(logBytes);
461 		writeInt((int)updateLen);
462 	    }
463 	} else {
464 
465 	    // If there was a deferred header, write that out now and
466 	    // then write the current header.
467 	    //
468 	    if (deferedUpdateLength != 0) {
469 		logFile.seek(deferedPosition);
470 		writeInt((int)deferedUpdateLength);
471 		deferedUpdateLength = 0;
472 	    }
473 	    // write real length header
474 	    logFile.seek(logBytes);
475 	    writeInt((int)updateLen);
476 	}
477 
478 	// pad out update record so length header does not span disk blocks
479 	entryEnd = (entryEnd + 3) & ~3L;
480 
481 	// write zero length header for next update
482 	logFile.seek(entryEnd);
483 	logFile.write(zeroBuf);
484 	logBytes = entryEnd;
485 
486 	if (forceToDisk)
487 	    logFD.sync();
488 	
489 	opCnt += effectiveOpCount;
490 	if (opCnt >= maxOps)
491 	    nextPath();
492 	else
493 	    out.reset();		// not critical to flush this
494     }
495 
496     /**
497      * Write an int value in single write operation. Note we only use
498      * this method when writing log file and recored headers.  We
499      * can't use it inside records because the data inside records is
500      * written/read using <code>ObjectIn/OutputStream</code> and this
501      * method writes directly to the <code>RandomAccessFile</code>.
502      *   
503      * @param val int value
504      * @throws IOException if any other I/O error occurs 
505      */
506     private void writeInt(int val) throws IOException {
507 	intBuf[0] = (byte) (val >> 24);
508 	intBuf[1] = (byte) (val >> 16);
509 	intBuf[2] = (byte) (val >> 8);
510 	intBuf[3] = (byte) val;
511 	logFile.write(intBuf);
512     }
513 
514     private void failed(Exception e) throws InternalSpaceException {
515 	logger.log(Level.SEVERE, 
516 		   "Unexpected I/O error while persisting Space data",
517 		   e);
518 	System.exit(-5);
519     }
520 }