View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.river.outrigger.snaplogstore;
19  
20  import org.apache.river.outrigger.OutriggerServerImpl;
21  
22  import java.io.BufferedInputStream;
23  import java.io.DataInputStream;
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.FileInputStream;
27  import java.io.IOException;
28  import java.io.ObjectInputStream;
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.Iterator;
32  import java.util.logging.Level;
33  import java.util.logging.Logger;
34  
35  import net.jini.space.InternalSpaceException;
36  
37  /**
38   * A class to help you read log files created by <code>LogOutputFile</code>.
39   *
40   * @author Sun Microsystems, Inc.
41   *
42   * @see LogOutputFile
43   */
44  class LogInputFile extends LogFile {
45      private final File		file;	// the current log file
46  
47      private static final long	intBytes = 4;
48  
49      /** Logger for logging persistent store related information */
50      private static final Logger logger = 
51  	Logger.getLogger(OutriggerServerImpl.storeLoggerName);
52  
53      /**
54       * Return an <code>Iterator</code> that will loop through all
55       * the logs that match the given <code>basePath</code> pattern,
56       * interpreted as described in the matching <code>LogStream</code>
57       * constructor.  If <code>returnAll</code> is <code>false</code>,
58       * the most recent file will be left off the list.  This would be
59       * the proper value for an ongoing poll looking for completed log
60       * files.  You would specify <code>true</code> during recovery,
61       * when all existing logs should be committed because no new ones
62       * are currently being created
63       *
64       * @see java.util.Iterator
65       */
66      // @see LogStream#LogStream(String)
67      static Iterator logs(String basePath, boolean returnAll)
68  	throws IOException
69      {
70  	LogFile lf = new LogFile(basePath);// an object to represent the path
71  	ArrayList inDir = new ArrayList();
72  	lf.existingLogs(inDir);
73  
74  	// strip off most recent if we're not trying to read them all
75  	if (!returnAll && inDir.size() > 0)
76  	    inDir.remove(inDir.size() - 1);
77  
78  	return new LogInputFileIterator(inDir, lf);
79      }
80  
81      /**
82       * The implementation of <code>Iterator</code> returned by
83       * <code>LogInputStream.logs</code>.  The <code>next</code> method
84       * occasionally returns <code>null</code>.
85       *
86       * @see LogInputFileIterator#next
87       */
88      private static class LogInputFileIterator implements Iterator {
89  	private final LogFile		baseLogFile;
90  	private final Iterator	fileList;
91  
92  	/**
93  	 * Create a new <code>LogInputFileIterator</code> object
94  	 * for the given list.
95  	 */
96  	LogInputFileIterator(Collection files, LogFile baseLogFile) {
97  	    this.baseLogFile = baseLogFile;
98  	    fileList = files.iterator();
99  	}
100 
101 	public boolean hasNext() {
102 	    return fileList.hasNext();
103 	}
104 
105 	/**
106 	 * Return the next <code>File</code> object, or
107 	 * <code>null</code>.  You will get <code>null</code> when the
108 	 * file existed at the time of listing, but no longer exists
109 	 * when the iterator gets to it.  For example, if a process is
110 	 * consuming all completed logs, the listing might find a log,
111 	 * but that process may have consumed and removed it by the
112 	 * time you invoke <code>next</code>, so you will get a
113 	 * <code>null</code>.
114 	 */
115 	public Object next() {
116 	    File file = (File) fileList.next();
117 	    try {
118 		return new LogInputFile(baseLogFile, file);
119 	    } catch (IOException e) {
120 		file.delete();	// file is malformed -- remove it
121 		return null;	// can't throw any reasonable exception,
122 				// so signal the problem with a null
123 	    }
124 	}
125 
126 	/**
127 	 * Remove the <code>File</code> object returned by the iterator
128 	 * from the list.  This does <em>not</em> remove the file
129 	 * itself.
130 	 */
131 	public void remove() {
132 	    fileList.remove();
133 	}
134     }
135 
136     /**
137      * Create a new <code>LogInputFile</code>.
138      * <p>
139      * <b>Note:</b> Don't invoke this.  This is needed by the
140      * enumeration returned by <code>logs</code>, which is how you
141      * should be getting <code>LogInputFile</code> objects.  When
142      * nested classes arrive, this constructor can be properly
143      * protected.
144      */
145     // @see logs
146     private LogInputFile(LogFile desc, File path) throws IOException {
147 	super(desc.baseDir, desc.baseFile);
148 	file = path;
149     }
150 
151     /**
152      * Consume the input file, invoking the appropriate operations on
153      * the given object.
154      */
155     synchronized void consume(BackEnd opOn) {
156 	try {
157 	    DataInputStream din = 
158 		new DataInputStream(new BufferedInputStream(
159 					new FileInputStream(file)));
160 	    ObjectInputStream in = new ObjectInputStream(din);
161 
162 	    long length = file.length();
163 	    int fileVer = din.readInt();
164 
165 	    if (fileVer != LOG_VERSION)
166 		failure("unsupported log version: " + fileVer);
167 
168 	    long logBytes = intBytes;
169 	    int updateLen = din.readInt();
170 
171 	    Long txnId;
172 	    int  count;
173 	    Resource rep;
174 	    byte[] cookie;
175 
176 	    while (updateLen != 0) {	/* 0 is expected termination case */
177 
178 		if (updateLen < 0)	/* serious corruption */
179 		    failure("file corrupted, negative record length at " +
180 			    logBytes);
181 
182 		if (length - logBytes - intBytes < updateLen)
183 
184 		    /* partial record at end of log; this should not happen
185                      * if forceToDisk is always true, but might happen if
186                      * buffered updates are used.
187                      */
188                     failure("file corrupted, partial record at " + logBytes);
189 
190 		int op = in.readByte();
191 
192 		switch (op) {
193 		  case BOOT_OP:
194 		    long time = in.readLong();
195 		    long sessionId = in.readLong();
196 		    opOn.bootOp(time, sessionId);
197 		    break;
198 
199 		  case JOINSTATE_OP:
200 		    BaseObject state = (BaseObject)in.readObject();
201 		    opOn.joinStateOp(state);
202 		    break;
203 
204 		  case WRITE_OP:
205 		    rep = (Resource)in.readObject();
206 		    txnId = (Long)in.readObject();
207 		    opOn.writeOp(rep, txnId);
208 		    break;
209 
210 		  case BATCH_WRITE_OP:
211 		    txnId = (Long)in.readObject();
212 		    count = in.readInt();
213 		    for (int i=0; i<count; i++) {
214 			rep = (Resource)in.readObject();
215 			opOn.writeOp(rep, txnId);
216 		    }
217 		    break;
218 
219 		  case TAKE_OP:
220 		    cookie = new byte[16]; 
221 		    in.readFully(cookie);
222 		    txnId = (Long)in.readObject();
223 		    opOn.takeOp(cookie, txnId);
224 		    break;
225 
226 		  case BATCH_TAKE_OP:
227 		    txnId = (Long)in.readObject();
228 		    count = in.readInt();
229 		    for (int i=0; i<count; i++) {
230 			cookie = new byte[16];
231 			in.readFully(cookie);
232 			opOn.takeOp(cookie, txnId);
233 		    }
234 		    break;
235 
236 		  case REGISTER_OP:
237 		    Registration registration = 
238 			(Registration)in.readObject();
239 		    opOn.registerOp(registration);
240 		    break;
241 
242 		  case RENEW_OP:
243 		    cookie = new byte[16];
244 		    in.readFully(cookie);
245 		    long expires = in.readLong();
246 		    opOn.renewOp(cookie, expires);
247 		    break;
248 
249 		  case CANCEL_OP:
250 		    cookie = new byte[16];
251 		    in.readFully(cookie);
252 		    opOn.cancelOp(cookie);
253 		    break;
254 
255 		  case PREPARE_OP:
256 		    txnId = (Long)in.readObject();
257 		    BaseObject transaction = (BaseObject)in.readObject();
258 		    opOn.prepareOp(txnId, transaction);
259 		    break;
260 
261 		  case COMMIT_OP:
262 		    txnId = (Long)in.readObject();
263 		    opOn.commitOp(txnId);
264 		    break;
265 
266 		  case ABORT_OP:
267 		    txnId = (Long)in.readObject();
268 		    opOn.abortOp(txnId);
269 		    break;
270 
271 		  case UUID_OP:
272 		    final byte uuid[] = new byte[16];
273 		    in.readFully(uuid);
274 		    opOn.uuidOp(uuid);
275 		    break;
276 
277 		  default:
278                     failure("log record corrupted, unknown opcode");
279 
280 		}  // case
281 
282 		logBytes += (intBytes + updateLen);
283 
284 		// deal with padding
285 		int offset = (int)logBytes & 3;
286 		if (offset > 0) {
287 		    offset = 4 - offset;
288 		    logBytes += offset;
289 		    din.skipBytes(offset);
290 		}
291 		updateLen = din.readInt();
292 
293 	    }  // while
294 	} catch (EOFException e) {
295 	    failure("unexpected end-of-file", e);
296 
297 	} catch (IOException e) {
298 	    failure("I/O error while consuming logs", e);
299 
300 	} catch (ClassNotFoundException e) {
301 	    failure("unexpected class?", e);
302 	}
303     }
304 
305     /**
306      * Report a failure consuming the log file and throw an
307      * <code>InternalSpaceException</code> containing <code>message</code>.
308      */
309     private void failure(String message) {
310 	failure(message, null);
311     }
312 
313     /**
314      * Report a exception while consuming the log file and throw an
315      * <code>InternalSpaceException</code> containing <code>message</code>.
316      */
317     private void failure(String message, Exception e) {
318 	String errorMsg = "Error consuming log file: " + file + ", " + 
319 	    message + "Log file consumption stopped";
320 
321 	final InternalSpaceException ise =
322 	    new InternalSpaceException(errorMsg, e);
323 	logger.log(Level.SEVERE, errorMsg, ise);
324 	throw ise;
325     }
326 
327     /**
328      * This log has been successfully drained, and committed -- it can be
329      * removed.
330      */
331     void finished() {
332 	file.delete();
333     }
334 
335     public String toString() {
336 	return file.toString();
337     }
338 }