View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.river.norm;
19  
20  import java.io.File;
21  import java.io.IOException;
22  import java.util.Iterator;
23  import java.util.LinkedList;
24  import java.util.List;
25  import java.util.logging.Level;
26  import java.util.logging.Logger;
27  
28  import net.jini.config.ConfigurationException;
29  import org.apache.river.norm.proxy.StoreException;
30  import org.apache.river.norm.proxy.CorruptedStoreException;
31  import org.apache.river.norm.lookup.SubStore;
32  import org.apache.river.reliableLog.LogHandler;
33  import org.apache.river.reliableLog.ReliableLog;
34  import org.apache.river.system.FileSystem;
35  import org.apache.river.thread.ReadersWriter;
36  
37  /**
38   * Class that actually stores a Norm server's state to disk.  Basically
39   * a wrapper around ReliableLog with the addition of lock management.
40   *
41   * @author Sun Microsystems, Inc.
42   */
43  class PersistentStore {
44      /** Logger for logging messages for this class */
45      private static final Logger logger = Logger.getLogger("org.apache.river.norm");
46  
47      /**
48       * Object we use to reliably and persistently log updates to our
49       * state, or null if not persistent.
50       */
51      private ReliableLog log;
52  
53      /**
54       * No mutation of persistent state can occur during a snapshot,
55       * however, we can have multiple mutators, use
56       * <code>ReadersWriter</code> object to manage this invariant.  Note
57       * as far as the lock is concerned the mutators are the readers and
58       * snapshot thread the writer since for us mutation is the
59       * non-exclusive operation.
60       */
61      final private ReadersWriter mutatorLock = new ReadersWriter();
62  
63      /**
64       * Thread local that tracks if (and how many times) the current thread
65       * has acquired a non-exclusive mutator lock.
66       */
67      final static private ThreadLocal lockState = new ThreadLocal();
68  
69      /** Cache a <code>Long</code> object with a zero value */
70      final static private Long zero = Long.valueOf(0);
71  
72      /** Location of the persistent store, or null if not persistent */
73      final private File storeLocation;
74  
75      /** Object that handles the recovery of logs, or null if not persistent  */
76      final private LogHandler logHandler;
77  
78      /** The NormServer we are part of */
79      private NormServerBaseImpl server;
80  
81      /** Number of updates since last snapshot */
82      private int updateCount;
83  
84      /** A list of all of the sub-stores */
85      private List subStores = new LinkedList();
86  
87      /**
88       * Construct a store that will persist its data to the specified
89       * directory.
90       *
91       * @param logDir directory where the store should persist its data,
92       *        which must exist, unless it is <code>null</code>, in which case
93       *	      there is no persistence
94       * @param logHandler object that will process the log and last
95       *        snapshot to recover the server's state
96       * @param server the server is called back after an update so it can
97       *        decide whether or not to do a snapshot
98       * @throws StoreException if there is a problem setting up the store
99       */
100     PersistentStore(String logDir, LogHandler logHandler, 
101 		    NormServerBaseImpl server)
102 	throws StoreException
103     {
104 	this.logHandler = logHandler;
105 	this.server = server;
106 	if (logDir == null) {
107 	    storeLocation = null;
108 	} else {
109 	    storeLocation = new File(logDir);
110 	    try {
111 		log = new ReliableLog(
112 		    storeLocation.getCanonicalPath(), logHandler);
113 	    } catch (IOException e) {
114 		throw new CorruptedStoreException(
115 		    "Failure creating reliable log", e);
116 	    }
117 
118 	    try {
119 		log.recover();
120 	    } catch (IOException e) {
121 		throw new CorruptedStoreException(
122 		    "Failure recovering reliable log", e);	    
123 	    }
124 	}
125     }
126     
127     /**
128      * Can only be called once after construction.
129      * @param server 
130      */
131     void setServer(NormServerBaseImpl server){
132         synchronized (this){
133             if (this.server == null) this.server = server;
134         }
135     }
136 
137     /**
138      * Destroy the store.
139      *
140      * @throws IOException if it has difficulty removing the log files 
141      */
142     void destroy() throws IOException {
143 	// Prep all the sub-stores to be destroyed
144         synchronized (subStores){
145             for (Iterator i = subStores.iterator(); i.hasNext(); ) {
146                 SubStore subStore = (SubStore) i.next();
147                 subStore.prepareDestroy();
148             }
149         }
150 	if (log != null) {
151 	    log.deletePersistentStore();
152 	    FileSystem.destroy(storeLocation, true);
153 	}
154     }
155 
156     /**
157      * Inform the store of a sub-store
158      */
159     void addSubStore(SubStore subStore) throws StoreException {
160 	try {
161 	    if (log == null) {
162 		subStore.setDirectory(null);
163 	    } else {
164 		final String subDir = subStore.subDirectory();
165 
166 		if (subDir == null) {
167 		    subStore.setDirectory(storeLocation);
168 		} else {
169 		    subStore.setDirectory(new File(storeLocation, subDir));
170 		}
171 	    }
172 	    synchronized (subStores){
173                 subStores.add(subStore);
174             }
175 	} catch (IOException e) {
176 	    throw new StoreException("Failure adding substore " + subStore,
177 				     e);
178 	} catch (ConfigurationException e) {
179 	    throw new StoreException("Failure adding substore " + subStore,
180 				     e);
181 	}
182     }
183 
184 
185     /////////////////////////////////////////////////////////////////
186     // Methods for obtaining and releasing the locks on the store
187 
188     /**
189      * Block until we can acquire a non-exclusive mutator lock on the
190      * server's persistent state.  This lock should be acquired in a
191      * <code>try</code> block and a <code>releaseMutatorLock</code> call
192      * should be placed in a <code>finally</code> block.
193      */
194     void acquireMutatorLock() {
195 	// Do we already hold a lock?
196 
197 	Long lockStateVal = (Long) lockState.get();
198 	if (lockStateVal == null) 
199 	    lockStateVal = zero;
200 
201 	final long longVal = lockStateVal.longValue();
202 
203 	if (longVal == 0) {
204 	    // No, this thread currently does not hold a lock,
205 	    // grab non-exclusive lock (which for mutatorLock is a
206 	    // read lock) 
207 	    mutatorLock.readLock();
208 	} 
209 
210 	// Either way, bump the lock count and update our thread state
211 	lockState.set(Long.valueOf(longVal + 1));
212     }
213 
214     /**
215      * Release one level of mutator locks if this thread holds at least one.
216      */
217     void releaseMutatorLock() {
218 	Long lockStateVal = (Long) lockState.get();
219 	if (lockStateVal == null) 
220 	    lockStateVal = zero;
221 
222 	final long longVal = lockStateVal.longValue();
223 
224 	if (longVal == 0) {
225 	    // No lock to release, return
226 	    return;
227 	}
228 
229 	if (longVal == 1) {
230 	    // Last one on stack release lock
231 	    // Using read lock because we want a non-exclusive lock
232 	    mutatorLock.readUnlock();
233 	    lockStateVal = zero;
234 	} else {
235 	    lockStateVal = Long.valueOf(longVal - 1);
236 	}
237 
238 	lockState.set(lockStateVal);
239     }
240 
241     //////////////////////////////////////////////////////////////////
242     // Methods for writing records to the log and taking and
243     // coordinating snapshots
244 
245     /**
246      * Log an update. Will flush to disk before returning.
247      *
248      * @throws IllegalStateException if the current thread does not hold
249      *	       a non-exclusive mutator lock
250      * @throws IOException
251      * @see ReliableLog#update
252      */
253     void update(Object o) {
254 	if (log == null) {
255 	    return;
256 	}
257 	final Long lockStateVal = (Long) lockState.get();
258 	if (lockStateVal == null || lockStateVal.longValue() == 0) {
259 	    throw new IllegalStateException("PersistentStore.update:" +
260 	        "Must acquire mutator lock before calling update()");
261 	}
262 
263 	synchronized (this) { 
264 	    try {
265 		log.update(o, true);
266 		updateCount++;
267 		server.updatePerformed(updateCount);
268 	    } catch (IOException e) {
269 		// $$$ should probably be propagating this exception
270 		logger.log(Level.WARNING, "IOException while updating log", e);
271 	    }
272 	}
273     }
274 
275     /**
276      * Generate a snapshot, will perform the necessary locking to ensure no
277      * threads are mutating the state of the server before creating the 
278      * snapshot.
279      *
280      * @throws IOException
281      * @see ReliableLog#snapshot
282      */
283     void snapshot() throws IOException {
284 	if (log == null) {
285 	    return;
286 	}
287 	try {
288 	    // Using write lock because we want an exclusive lock
289 	    mutatorLock.writeLock();
290             synchronized (this){
291                 updateCount = 0;
292             }
293 
294 	    // Don't need to sync on this because
295 	    // mutatorLock.writeLock() gives us an exclusive lock
296 	    log.snapshot();
297 	} finally {
298 	    // Using write lock because we want an exclusive lock
299 	    mutatorLock.writeUnlock();
300 	}
301     }
302 }