View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.river.outrigger.snaplogstore;
19  
20  import org.apache.river.constants.TimeConstants;
21  import org.apache.river.logging.Levels;
22  import org.apache.river.outrigger.Recover;
23  import org.apache.river.outrigger.StoredObject;
24  import org.apache.river.outrigger.OutriggerServerImpl;
25  
26  import java.io.File;
27  import java.io.BufferedInputStream;
28  import java.io.FileInputStream;
29  import java.io.FileOutputStream;
30  import java.io.ObjectInputStream;
31  import java.io.ObjectOutputStream;
32  import java.io.IOException;
33  import java.io.Serializable;
34  import java.util.ArrayList;
35  import java.util.Date;
36  import java.util.Enumeration;
37  import java.util.Iterator;
38  import java.util.HashMap;
39  import java.util.Map;
40  import java.util.Observable;
41  import java.util.Observer;
42  import java.util.concurrent.ConcurrentHashMap;
43  import java.util.concurrent.atomic.AtomicInteger;
44  import java.util.concurrent.atomic.AtomicLong;
45  import java.util.logging.Level;
46  import java.util.logging.Logger;
47  
48  import net.jini.space.InternalSpaceException;
49  
50  /**
51   * Back end of snapshot log store. This class consumes logs written by
52   * LogOutputFile and stores the state as serilalzied objects. The class
53   * processes the logs to optimize what is stored in the snapshot. For
54   * example, a take log record will cause the removal of a write log
55   * record with the same id (if the transaction is null).  <p>
56   *
57   * Likewise, cancels will cause the removal of write and register
58   * records.  Also renew records update the expiration of the entry or
59   * registration and are not stored directly in the database. 
60   */
61  class BackEnd implements Observer {
62  
63      // The following data represent the persistent
64      // state.
65      private final AtomicLong	  sessionId;
66      private volatile StoredObject  joinState;
67      private final Map<ByteArrayWrapper,Resource>     entries;
68      private final Map<ByteArrayWrapper,Registration> 	  registrations;
69      private final Map<Long,PendingTxn> 	  pendingTxns;
70      private volatile byte          topUuid[];
71      private volatile LastLog	  lastLog;
72  
73      /** Snapshot object */
74      private volatile SnapshotFile	snapshotFile;
75  
76      /** Keep logs and snapshot tied, though not necessary */
77      private static final int SNAPSHOT_VERSION = LogFile.LOG_VERSION;
78  
79      /**
80       * The base name for the log files.
81       */
82      private final String	logFileBase;
83  
84      /**
85       * The base name for the snapshot files
86       */
87      private final String	snapshotFileBase;
88  
89      /**
90       * Log file consumer thread.
91       */
92      private final ConsumerThread	consumer;
93  
94      /** Max time to wait for the consumer thread to die on destroy */
95      private final static long WAIT_FOR_THREAD = 1 * TimeConstants.MINUTES;
96  
97      /** Logger for logging persistent store related information */
98      private static final Logger logger = 
99  	Logger.getLogger(OutriggerServerImpl.storeLoggerName);
100 
101     /**
102      * Create a new <code>BackEnd</code> with the given <code>path</code>.
103      */
104     BackEnd(String path) {
105 	logFileBase = new File(path, LogFile.LOG_TYPE).getAbsolutePath();
106 	snapshotFileBase = new File(path, "Snapshot.").getAbsolutePath();
107         consumer = new ConsumerThread();
108         entries		= new ConcurrentHashMap<ByteArrayWrapper,Resource>();
109         registrations	= new ConcurrentHashMap<ByteArrayWrapper,Registration>();
110         pendingTxns	= new ConcurrentHashMap<Long,PendingTxn>();
111         sessionId = new AtomicLong();
112     }
113 
114     /**
115      * Setup the database store and recover any existing state.
116      */
117     void setupStore(Recover space) {
118 
119 	// Recover the snapshot (if any)
120 	//
121 	recoverSnapshot();
122 
123 	// Consume any remaining log files.
124 	//
125 	consumeLogs(true);
126 
127 	if (logger.isLoggable(Level.FINE)) {
128 	    logger.log(Level.FINE, "recoverSnapshot: number of entries:{0}, " +
129 			"number of pendingTxns:{1}, number of registrations:{2}",
130 			new Object[]{Integer.valueOf(entries.size()),
131 			Integer.valueOf(pendingTxns.size()),
132 			Integer.valueOf(registrations.size())});
133 	}
134 
135 	// Recover the session id
136 	//
137 	if (sessionId != null)
138 	    space.recoverSessionId(sessionId.longValue());
139 
140 	// Recover the top level Uuid
141 	//
142 	if (topUuid != null)
143 	    space.recoverUuid(ByteArrayWrapper.toUuid(topUuid));
144 
145 	// Recover the join state
146 	//
147 	if (joinState != null) {
148 	    try {
149 		space.recoverJoinState(joinState);
150 	    } catch (Exception e) {
151 		throw logAndThrowRecoveryException(
152 		    "Error recovering join state", e);
153 	    }
154 	}
155 
156 	// Recover the entries
157 	//
158 	try {
159 	    Iterator i = entries.values().iterator();
160 
161 	    while (i.hasNext()) {
162 		space.recoverWrite((Resource)i.next(), null);
163 	    }
164 	} catch (Exception e) {
165 	    throw logAndThrowRecoveryException("Error recovering entries", e);
166 	}
167 
168 	// Recover the prepared transactions and remove any
169 	// non-prepared ones.
170 	try {
171 	    Iterator i = pendingTxns.values().iterator();
172 
173 	    while (i.hasNext()) {
174 		PendingTxn pt = (PendingTxn)i.next();
175 
176 		// If the pending transaction was not recovered
177 		// (i.e. it was not prepared) then we can remove it.
178 		//
179 		if(!pt.recover(space))
180 		    i.remove();
181 	    }
182 	} catch (Exception e) {
183 	    throw logAndThrowRecoveryException("Error recovering transactions",
184 					       e);
185 	}
186 
187 	// Recover the registrations
188 	//
189 	try {
190 	    Iterator i = registrations.values().iterator();
191 
192 	    while (i.hasNext()) {
193 		Registration reg = (Registration)i.next();
194 
195 		final BaseObject[] templates = reg.getTemplates();
196 
197 		space.recoverRegister(reg, reg.getType(), templates);
198 	    }
199 	} catch (Exception e) {
200 	    throw logAndThrowRecoveryException(
201                 "Error recovering registrations", e);
202 	}
203 	startConsumer();
204     }
205 
206     private void recoverSnapshot() {
207 	try {
208 	    File[] snapshot = new File[1];
209 	    snapshotFile = new SnapshotFile(snapshotFileBase, snapshot);
210 
211 	    if (snapshot[0] == null) {
212 
213 		// no snapshot, initialize fields and return
214 		
215 		topUuid		= null;
216 		lastLog		= null;
217 		return;
218 	    } 
219 
220 	    final ObjectInputStream in =
221 		new ObjectInputStream(new BufferedInputStream(
222 		    new FileInputStream(snapshot[0])));
223 
224 	    final int version = in.readInt();
225 	    if (version != SNAPSHOT_VERSION) {
226 		logAndThrowRecoveryException(
227 		    "Wrong file version:" + version, null);
228 	    }
229 
230 	    sessionId.set(((Long)in.readObject()).longValue());
231 	    joinState	= (StoredObject)in.readObject();
232             entries.clear();
233 	    entries.putAll((Map<ByteArrayWrapper,Resource>)in.readObject());
234             registrations.clear();
235 	    registrations.putAll((Map<ByteArrayWrapper,Registration>)in.readObject());
236             pendingTxns.clear();
237 	    pendingTxns.putAll((Map)in.readObject());
238 	    topUuid	= (byte[])in.readObject();
239 	    lastLog	= (LastLog)in.readObject();
240 	    in.close();
241 	} catch (RuntimeException t) {
242 	    throw t;
243 	} catch (Throwable t) {
244 	    throw logAndThrowRecoveryException("Problem recovering snapshot",t);
245 	}
246     }
247 
248     private void startConsumer() {
249 
250 	// Create and start the log consumer thread
251 	//
252 	
253 	consumer.start();
254     }
255 
256     /**
257      * Thread to consume log files. <code>LogOutputFile</code> calls
258      * <code>update</code> (through the <code>Observer</code> interface
259      * each time a log file is written.
260      */
261     private class ConsumerThread extends Thread {
262 
263 	volatile private boolean interrupted = false;
264 
265 	ConsumerThread() {}
266 
267 	public void run() {
268 	    try {
269 		while (!interrupted) {
270 
271 		    // This block is first because when start is
272 		    // called in setup there will not be any log files
273 		    // to process. LogOutputFile is created after
274 		    // setup returns.
275 		    //
276                     synchronized(this) {
277                         wait();
278                     }
279 
280                     // There is a small window between the wait and
281                     // the consumeLogs where update can be called,
282                     // setting more to true and yet consumeLogs
283                     // actually consumes the log file that caused the
284                     // update. This unlikely situation is ok since
285                     // consumeLogs does the right thing if there are
286                     // no logs to process We could sync around
287                     // consumeLogs but we don't want LogOutputFile to
288                     // wait.
289                     //
290                     consumeLogs(false);
291 		}
292 	    } catch (InterruptedException exit) {}
293 	}
294 
295 	// Cause the thread to consume a log file.
296 	//
297 	synchronized private void update() {
298             // For the case it is processing log files
299 	    notify();		// For the case is it waiting
300 	}
301 
302 	// Set a local flag just in case someone clears the thread's own
303 	// interrupted status.
304 	//
305 	public void interrupt() {
306 	    interrupted = true;
307 	    super.interrupt();
308 	}
309     }
310 
311     //---------------------
312     // Required by Observer
313     //---------------------
314 
315     public void update(Observable source, Object arg) {
316 
317 	if (!consumer.isAlive()) {
318             logger.log(Level.SEVERE, "Consumer thread no longer running");
319             return;
320 	}
321 	consumer.update();
322     }
323 
324     /**
325      * Destroy the consumer thread and database
326      */
327     void destroy() {
328 	try {
329 	    consumer.interrupt();
330 
331 	    // wait for consumeLogs to finish in order to avoid errors
332 	    // once the database and log files are destroyed.
333 	    //
334 	    consumer.join(WAIT_FOR_THREAD);
335 
336 	} catch (InterruptedException ignore) {
337 	} finally {
338 	    try {
339 		if (snapshotFile != null)
340 		    snapshotFile.destroy();
341 	    } catch (Throwable t) {
342 		logger.log(Level.INFO, 
343 		    "Exception encounter while destroying store", t);
344 	    }
345 	}
346     }
347 
348     /**
349      * Stop the consumer and close the database.
350      */
351     void close() {
352 	consumer.interrupt();
353 	// Wait forever, can't close database until 
354 	// consumer stops (which during startup should
355 	// not be long.
356 	try {
357 	    consumer.join();
358 	} catch (InterruptedException e) {
359 	    // never happens
360 	}
361 	if (snapshotFile != null) {
362 	    try {
363 		snapshotFile.close();
364 	    } catch (Throwable t) {
365 		logger.log(Level.INFO, 
366 		    "Exception encounter while closing store", t);
367 	    }
368 	}
369     }
370 
371     /**
372      * Return the pending transaction description for the given
373      * transaction, creating the object and adding it to the table if
374      * necessary.
375      */
376     private PendingTxn pendingTxn(Long txnId) {
377 	PendingTxn pt = (PendingTxn)pendingTxns.get(txnId);
378 	if (pt == null) {
379 	    pt = new PendingTxn(txnId);
380 	    pendingTxns.put(txnId, pt);
381 	}
382 	return pt;
383     }
384 
385     /**
386      * Remove a pending transaction from the table.  If it isn't there,
387      * this call is harmless.
388      */
389     private void removePendingTxn(Long txnId) {
390 	pendingTxns.remove(txnId); // if it fails, it wasn't there to remove
391     }
392 
393     // ------------------------------------------------------------
394     //                  Log stuff
395     // ------------------------------------------------------------
396 
397     // The following methods are called when a recovered log element
398     // is read from the log file. Some methods, writeOp and takeOp
399     // can also be called when a pending transaction is committed.
400     //
401 
402     /**
403      * This method sets the session id in the database. It's value is
404      * only used during recovery after a restart.
405      */
406     void bootOp(long time, long session) {
407 	sessionId.set(session);
408 	if (logger.isLoggable(Level.FINE))
409 	    logger.log(Level.FINE, "bootOp({0})", new Date(time));
410     }
411 
412     /**
413      * Record the join state.
414      */
415     void joinStateOp(StoredObject state) {
416 	joinState = state;
417 	if (logger.isLoggable(Level.FINE))
418 	    logger.log(Level.FINE, "joinStateOp()");
419     }
420 
421     /**
422      * This method records a logged write operation. If under a
423      * transaction the resource is held in a list for the pending
424      * transaction. When committed this method will be called again
425      * with the resource and a null transaction id.
426      */
427     void writeOp(Resource entry, Long txnId ) {
428 	if (logger.isLoggable(Level.FINE)) { 
429 	    logger.log(Level.FINE, "writeOp({0},{1})", 
430 		       new Object[]{entry,txnId});
431 	}
432 
433 	if (txnId != null)
434 	    pendingTxn(txnId).addWrite(entry);
435 	else
436 	    entries.put(entry.getCookieAsWrapper(), entry);
437     }
438 
439     /**
440      * This method records a logged take operation. If under a
441      * transaction the resource is held in a list for the pending
442      * transaction. When committed this method will be called again
443      * with the resource and a null transaction id.  
444      */
445     void takeOp(byte cookie[], Long txnId) {
446 	if (logger.isLoggable(Level.FINE)) { 
447 	    logger.log(Level.FINE, "takeOp({0},{1})", 
448 		       new Object[]{ByteArrayWrapper.toUuid(cookie),txnId});
449 	}
450 
451 	if (txnId != null)
452 	    pendingTxn(txnId).addTake(cookie);
453 	else
454 	    entries.remove(new ByteArrayWrapper(cookie));
455     }
456 
457     /*
458      * This method records a logged event registration.
459      */
460     void registerOp(Registration registration) {
461 	logger.log(Level.FINE, "registerOp({0})", registration);
462 
463 	registrations.put(registration.getCookieAsWrapper(), registration);
464     }
465 
466     /**
467      * This method processes a logged renew operation. Renew operations
468      * apply to resources passed into writeOp and registerOp.
469      */ 
470     void renewOp(byte cookie[], long expiration) {
471 	if (logger.isLoggable(Level.FINE)) { 
472 	    logger.log(Level.FINE, "renewOp({0},{1})", 
473 		       new Object[]{ByteArrayWrapper.toUuid(cookie),
474 				    Long.valueOf(expiration)});
475 	}
476 	final ByteArrayWrapper baw = new ByteArrayWrapper(cookie);
477 
478 	Resource resource;
479 
480 	if ((resource = (Resource)entries.get(baw)) == null) {
481 	    // not an entry, try event registrations
482 	    if ((resource = (Resource)registrations.get(baw)) == null) {
483 
484 		// No registration either, try transactional writes
485 		Iterator i = pendingTxns.values().iterator();
486 		while (i.hasNext()) {
487 		    if ((resource = ((PendingTxn)i.next()).get(baw)) != null)
488 			break;
489 		}
490 	    }
491 	}
492 	if (resource != null)
493 	    resource.setExpiration(expiration);
494     }
495 
496     /**
497      * This method processes a logged cancel operation. Cancel operations
498      * apply to resources passed into writeOp and registerOp.
499      */ 
500     void cancelOp(byte cookie[]) {
501 	if (logger.isLoggable(Level.FINE)) { 
502 	    logger.log(Level.FINE, "cancelOp({0})", 
503 		       ByteArrayWrapper.toUuid(cookie));
504 	}
505 	final ByteArrayWrapper baw = new ByteArrayWrapper(cookie);
506 
507 	if (entries.remove(baw) == null) {
508 	    if (registrations.remove(baw) == null) {
509 
510 		Iterator i = pendingTxns.values().iterator();
511 		while (i.hasNext()) {
512 		    if (((PendingTxn)i.next()).remove(baw) != null)
513 			break;
514 		}
515 	    }
516 	}
517     }
518 
519     /**
520      * This method prepares a pending transaction.
521      */
522     void prepareOp(Long txnId, StoredObject transaction) {
523 	logger.log(Level.FINE, "prepareOp({0})", txnId);
524 
525 	PendingTxn pt = pendingTxn(txnId);
526 	pt.prepare(transaction);
527     }
528 
529     /**
530      * This method commits a pending transaction.
531      */
532     void commitOp(Long txnId) {
533 	logger.log(Level.FINE, "commitOp({0})", txnId);
534 
535 	PendingTxn pt = pendingTxn(txnId);
536 	pt.commit(this);
537 	removePendingTxn(txnId);
538     }
539 
540     /**
541      * This method aborts a pending transaction.
542      */
543     void abortOp(Long txnId) {
544 	logger.log(Level.FINE, "abortOp({0})", txnId);
545 
546 	removePendingTxn(txnId);
547     }
548 
549     /**
550      * This method records the service's top level <code>Uuid</code>
551      * @param uuid The service's <code>Uuid</code> represented as a
552      *             <code>byte[16]</code>.
553      */
554     void uuidOp(byte[] uuid) {
555 	if (logger.isLoggable(Level.FINE)) {
556 	    logger.log(Level.FINE, "uuidOp({0})", 
557 		       ByteArrayWrapper.toUuid(uuid));
558 	}
559 
560 	topUuid = uuid;
561     }
562 
563     /**
564      * Consume the log files that exist.  If <code>all</code> is
565      * <code>true</code>, all found log files will be processed.
566      * If <code>log</code> is <code>false</code>, then all but the
567      * most recent will be processed; this will prevent the back
568      * end from reading the log file that is currently being
569      * produced by the front end.
570      */
571     private void consumeLogs(boolean all) {
572 	Iterator it;
573 	try {
574 	    it = LogInputFile.logs(logFileBase, all);
575 	} catch (IOException e) {
576 	    final String msg = "couldn't open logs";
577 	    final InternalSpaceException ise = 
578 		new InternalSpaceException(msg, e);
579 	    logger.log(Level.SEVERE, msg , ise);
580 	    throw ise;
581 	}
582 
583 	while (it.hasNext()) {
584 	    LogInputFile log = (LogInputFile)it.next();
585 	    logger.log(Level.FINE, "processing {0})", log);
586 
587 	    if (log == null)		// file already consumed
588 		continue;
589 	
590 	    try {
591 		String logFile = log.toString();
592 		if (lastLog == null || !lastLog.sameAs(logFile))
593 		    log.consume(this);
594 		lastLog = new LastLog(logFile);
595 
596 		ObjectOutputStream out = snapshotFile.next();
597 
598 		out.writeInt(SNAPSHOT_VERSION);
599 		out.writeObject(sessionId.get());
600 		out.writeObject(joinState);
601                 // Serial form of maps is HashMap, cannot be changed.
602 		out.writeObject(new HashMap(entries));
603 		out.writeObject(new HashMap(registrations));
604 		out.writeObject(new HashMap(pendingTxns));
605 		out.writeObject(topUuid);
606 		out.writeObject(lastLog);
607 		snapshotFile.commit();
608 	    } catch (IOException e) {
609 		final String msg = "error writing snapshot";
610 		final InternalSpaceException ise = 
611 		new InternalSpaceException(msg, e);
612 			logger.log(Level.SEVERE, msg , ise);
613 		throw ise;
614 	    }
615 	    log.finished();
616 	}
617     }
618 
619     /**
620      * This class remembers which log file was the last to be
621      * successfully consumed.  If the recovery mechanism reopens this
622      * file, then it will skip its contents -- this indicates a crash
623      * happened after the contents were committed to the snapshot but
624      * before the file was unlinked.  
625      */
626     private static class LastLog implements Serializable {
627 	private final String	logFile;
628 	private final long	timeStamp;
629 
630 	LastLog(String path) {
631 	    logFile = path;
632 	    timeStamp = new File(logFile).lastModified();
633 	}
634 
635 	boolean sameAs(String otherPath) {
636 	    if (!logFile.equals(otherPath))
637 		return false;
638 	    return (new File(otherPath).lastModified() == timeStamp);
639 	}
640     }
641 
642     /**
643      * Log and throw an InternalSpaceException to flag a store
644      * recovery problem.
645      */
646     private InternalSpaceException logAndThrowRecoveryException(
647 	    String msg, Throwable nested)
648     {
649 	final InternalSpaceException e = 
650 	    new InternalSpaceException(msg, nested);
651 	logger.log(Level.SEVERE, msg, e);
652 	throw e;
653     }
654 }