1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.river.outrigger.snaplogstore;
19
20 import org.apache.river.constants.TimeConstants;
21 import org.apache.river.logging.Levels;
22 import org.apache.river.outrigger.Recover;
23 import org.apache.river.outrigger.StoredObject;
24 import org.apache.river.outrigger.OutriggerServerImpl;
25
26 import java.io.File;
27 import java.io.BufferedInputStream;
28 import java.io.FileInputStream;
29 import java.io.FileOutputStream;
30 import java.io.ObjectInputStream;
31 import java.io.ObjectOutputStream;
32 import java.io.IOException;
33 import java.io.Serializable;
34 import java.util.ArrayList;
35 import java.util.Date;
36 import java.util.Enumeration;
37 import java.util.Iterator;
38 import java.util.HashMap;
39 import java.util.Map;
40 import java.util.Observable;
41 import java.util.Observer;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.atomic.AtomicInteger;
44 import java.util.concurrent.atomic.AtomicLong;
45 import java.util.logging.Level;
46 import java.util.logging.Logger;
47
48 import net.jini.space.InternalSpaceException;
49
50
51
52
53
54
55
56
57
58
59
60
61 class BackEnd implements Observer {
62
63
64
65 private final AtomicLong sessionId;
66 private volatile StoredObject joinState;
67 private final Map<ByteArrayWrapper,Resource> entries;
68 private final Map<ByteArrayWrapper,Registration> registrations;
69 private final Map<Long,PendingTxn> pendingTxns;
70 private volatile byte topUuid[];
71 private volatile LastLog lastLog;
72
73
74 private volatile SnapshotFile snapshotFile;
75
76
77 private static final int SNAPSHOT_VERSION = LogFile.LOG_VERSION;
78
79
80
81
82 private final String logFileBase;
83
84
85
86
87 private final String snapshotFileBase;
88
89
90
91
92 private final ConsumerThread consumer;
93
94
95 private final static long WAIT_FOR_THREAD = 1 * TimeConstants.MINUTES;
96
97
98 private static final Logger logger =
99 Logger.getLogger(OutriggerServerImpl.storeLoggerName);
100
101
102
103
104 BackEnd(String path) {
105 logFileBase = new File(path, LogFile.LOG_TYPE).getAbsolutePath();
106 snapshotFileBase = new File(path, "Snapshot.").getAbsolutePath();
107 consumer = new ConsumerThread();
108 entries = new ConcurrentHashMap<ByteArrayWrapper,Resource>();
109 registrations = new ConcurrentHashMap<ByteArrayWrapper,Registration>();
110 pendingTxns = new ConcurrentHashMap<Long,PendingTxn>();
111 sessionId = new AtomicLong();
112 }
113
114
115
116
117 void setupStore(Recover space) {
118
119
120
121 recoverSnapshot();
122
123
124
125 consumeLogs(true);
126
127 if (logger.isLoggable(Level.FINE)) {
128 logger.log(Level.FINE, "recoverSnapshot: number of entries:{0}, " +
129 "number of pendingTxns:{1}, number of registrations:{2}",
130 new Object[]{Integer.valueOf(entries.size()),
131 Integer.valueOf(pendingTxns.size()),
132 Integer.valueOf(registrations.size())});
133 }
134
135
136
137 if (sessionId != null)
138 space.recoverSessionId(sessionId.longValue());
139
140
141
142 if (topUuid != null)
143 space.recoverUuid(ByteArrayWrapper.toUuid(topUuid));
144
145
146
147 if (joinState != null) {
148 try {
149 space.recoverJoinState(joinState);
150 } catch (Exception e) {
151 throw logAndThrowRecoveryException(
152 "Error recovering join state", e);
153 }
154 }
155
156
157
158 try {
159 Iterator i = entries.values().iterator();
160
161 while (i.hasNext()) {
162 space.recoverWrite((Resource)i.next(), null);
163 }
164 } catch (Exception e) {
165 throw logAndThrowRecoveryException("Error recovering entries", e);
166 }
167
168
169
170 try {
171 Iterator i = pendingTxns.values().iterator();
172
173 while (i.hasNext()) {
174 PendingTxn pt = (PendingTxn)i.next();
175
176
177
178
179 if(!pt.recover(space))
180 i.remove();
181 }
182 } catch (Exception e) {
183 throw logAndThrowRecoveryException("Error recovering transactions",
184 e);
185 }
186
187
188
189 try {
190 Iterator i = registrations.values().iterator();
191
192 while (i.hasNext()) {
193 Registration reg = (Registration)i.next();
194
195 final BaseObject[] templates = reg.getTemplates();
196
197 space.recoverRegister(reg, reg.getType(), templates);
198 }
199 } catch (Exception e) {
200 throw logAndThrowRecoveryException(
201 "Error recovering registrations", e);
202 }
203 startConsumer();
204 }
205
206 private void recoverSnapshot() {
207 try {
208 File[] snapshot = new File[1];
209 snapshotFile = new SnapshotFile(snapshotFileBase, snapshot);
210
211 if (snapshot[0] == null) {
212
213
214
215 topUuid = null;
216 lastLog = null;
217 return;
218 }
219
220 final ObjectInputStream in =
221 new ObjectInputStream(new BufferedInputStream(
222 new FileInputStream(snapshot[0])));
223
224 final int version = in.readInt();
225 if (version != SNAPSHOT_VERSION) {
226 logAndThrowRecoveryException(
227 "Wrong file version:" + version, null);
228 }
229
230 sessionId.set(((Long)in.readObject()).longValue());
231 joinState = (StoredObject)in.readObject();
232 entries.clear();
233 entries.putAll((Map<ByteArrayWrapper,Resource>)in.readObject());
234 registrations.clear();
235 registrations.putAll((Map<ByteArrayWrapper,Registration>)in.readObject());
236 pendingTxns.clear();
237 pendingTxns.putAll((Map)in.readObject());
238 topUuid = (byte[])in.readObject();
239 lastLog = (LastLog)in.readObject();
240 in.close();
241 } catch (RuntimeException t) {
242 throw t;
243 } catch (Throwable t) {
244 throw logAndThrowRecoveryException("Problem recovering snapshot",t);
245 }
246 }
247
248 private void startConsumer() {
249
250
251
252
253 consumer.start();
254 }
255
256
257
258
259
260
261 private class ConsumerThread extends Thread {
262
263 volatile private boolean interrupted = false;
264
265 ConsumerThread() {}
266
267 public void run() {
268 try {
269 while (!interrupted) {
270
271
272
273
274
275
276 synchronized(this) {
277 wait();
278 }
279
280
281
282
283
284
285
286
287
288
289
290 consumeLogs(false);
291 }
292 } catch (InterruptedException exit) {}
293 }
294
295
296
297 synchronized private void update() {
298
299 notify();
300 }
301
302
303
304
305 public void interrupt() {
306 interrupted = true;
307 super.interrupt();
308 }
309 }
310
311
312
313
314
315 public void update(Observable source, Object arg) {
316
317 if (!consumer.isAlive()) {
318 logger.log(Level.SEVERE, "Consumer thread no longer running");
319 return;
320 }
321 consumer.update();
322 }
323
324
325
326
327 void destroy() {
328 try {
329 consumer.interrupt();
330
331
332
333
334 consumer.join(WAIT_FOR_THREAD);
335
336 } catch (InterruptedException ignore) {
337 } finally {
338 try {
339 if (snapshotFile != null)
340 snapshotFile.destroy();
341 } catch (Throwable t) {
342 logger.log(Level.INFO,
343 "Exception encounter while destroying store", t);
344 }
345 }
346 }
347
348
349
350
351 void close() {
352 consumer.interrupt();
353
354
355
356 try {
357 consumer.join();
358 } catch (InterruptedException e) {
359
360 }
361 if (snapshotFile != null) {
362 try {
363 snapshotFile.close();
364 } catch (Throwable t) {
365 logger.log(Level.INFO,
366 "Exception encounter while closing store", t);
367 }
368 }
369 }
370
371
372
373
374
375
376 private PendingTxn pendingTxn(Long txnId) {
377 PendingTxn pt = (PendingTxn)pendingTxns.get(txnId);
378 if (pt == null) {
379 pt = new PendingTxn(txnId);
380 pendingTxns.put(txnId, pt);
381 }
382 return pt;
383 }
384
385
386
387
388
389 private void removePendingTxn(Long txnId) {
390 pendingTxns.remove(txnId);
391 }
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406 void bootOp(long time, long session) {
407 sessionId.set(session);
408 if (logger.isLoggable(Level.FINE))
409 logger.log(Level.FINE, "bootOp({0})", new Date(time));
410 }
411
412
413
414
415 void joinStateOp(StoredObject state) {
416 joinState = state;
417 if (logger.isLoggable(Level.FINE))
418 logger.log(Level.FINE, "joinStateOp()");
419 }
420
421
422
423
424
425
426
427 void writeOp(Resource entry, Long txnId ) {
428 if (logger.isLoggable(Level.FINE)) {
429 logger.log(Level.FINE, "writeOp({0},{1})",
430 new Object[]{entry,txnId});
431 }
432
433 if (txnId != null)
434 pendingTxn(txnId).addWrite(entry);
435 else
436 entries.put(entry.getCookieAsWrapper(), entry);
437 }
438
439
440
441
442
443
444
445 void takeOp(byte cookie[], Long txnId) {
446 if (logger.isLoggable(Level.FINE)) {
447 logger.log(Level.FINE, "takeOp({0},{1})",
448 new Object[]{ByteArrayWrapper.toUuid(cookie),txnId});
449 }
450
451 if (txnId != null)
452 pendingTxn(txnId).addTake(cookie);
453 else
454 entries.remove(new ByteArrayWrapper(cookie));
455 }
456
457
458
459
460 void registerOp(Registration registration) {
461 logger.log(Level.FINE, "registerOp({0})", registration);
462
463 registrations.put(registration.getCookieAsWrapper(), registration);
464 }
465
466
467
468
469
470 void renewOp(byte cookie[], long expiration) {
471 if (logger.isLoggable(Level.FINE)) {
472 logger.log(Level.FINE, "renewOp({0},{1})",
473 new Object[]{ByteArrayWrapper.toUuid(cookie),
474 Long.valueOf(expiration)});
475 }
476 final ByteArrayWrapper baw = new ByteArrayWrapper(cookie);
477
478 Resource resource;
479
480 if ((resource = (Resource)entries.get(baw)) == null) {
481
482 if ((resource = (Resource)registrations.get(baw)) == null) {
483
484
485 Iterator i = pendingTxns.values().iterator();
486 while (i.hasNext()) {
487 if ((resource = ((PendingTxn)i.next()).get(baw)) != null)
488 break;
489 }
490 }
491 }
492 if (resource != null)
493 resource.setExpiration(expiration);
494 }
495
496
497
498
499
500 void cancelOp(byte cookie[]) {
501 if (logger.isLoggable(Level.FINE)) {
502 logger.log(Level.FINE, "cancelOp({0})",
503 ByteArrayWrapper.toUuid(cookie));
504 }
505 final ByteArrayWrapper baw = new ByteArrayWrapper(cookie);
506
507 if (entries.remove(baw) == null) {
508 if (registrations.remove(baw) == null) {
509
510 Iterator i = pendingTxns.values().iterator();
511 while (i.hasNext()) {
512 if (((PendingTxn)i.next()).remove(baw) != null)
513 break;
514 }
515 }
516 }
517 }
518
519
520
521
522 void prepareOp(Long txnId, StoredObject transaction) {
523 logger.log(Level.FINE, "prepareOp({0})", txnId);
524
525 PendingTxn pt = pendingTxn(txnId);
526 pt.prepare(transaction);
527 }
528
529
530
531
532 void commitOp(Long txnId) {
533 logger.log(Level.FINE, "commitOp({0})", txnId);
534
535 PendingTxn pt = pendingTxn(txnId);
536 pt.commit(this);
537 removePendingTxn(txnId);
538 }
539
540
541
542
543 void abortOp(Long txnId) {
544 logger.log(Level.FINE, "abortOp({0})", txnId);
545
546 removePendingTxn(txnId);
547 }
548
549
550
551
552
553
554 void uuidOp(byte[] uuid) {
555 if (logger.isLoggable(Level.FINE)) {
556 logger.log(Level.FINE, "uuidOp({0})",
557 ByteArrayWrapper.toUuid(uuid));
558 }
559
560 topUuid = uuid;
561 }
562
563
564
565
566
567
568
569
570
571 private void consumeLogs(boolean all) {
572 Iterator it;
573 try {
574 it = LogInputFile.logs(logFileBase, all);
575 } catch (IOException e) {
576 final String msg = "couldn't open logs";
577 final InternalSpaceException ise =
578 new InternalSpaceException(msg, e);
579 logger.log(Level.SEVERE, msg , ise);
580 throw ise;
581 }
582
583 while (it.hasNext()) {
584 LogInputFile log = (LogInputFile)it.next();
585 logger.log(Level.FINE, "processing {0})", log);
586
587 if (log == null)
588 continue;
589
590 try {
591 String logFile = log.toString();
592 if (lastLog == null || !lastLog.sameAs(logFile))
593 log.consume(this);
594 lastLog = new LastLog(logFile);
595
596 ObjectOutputStream out = snapshotFile.next();
597
598 out.writeInt(SNAPSHOT_VERSION);
599 out.writeObject(sessionId.get());
600 out.writeObject(joinState);
601
602 out.writeObject(new HashMap(entries));
603 out.writeObject(new HashMap(registrations));
604 out.writeObject(new HashMap(pendingTxns));
605 out.writeObject(topUuid);
606 out.writeObject(lastLog);
607 snapshotFile.commit();
608 } catch (IOException e) {
609 final String msg = "error writing snapshot";
610 final InternalSpaceException ise =
611 new InternalSpaceException(msg, e);
612 logger.log(Level.SEVERE, msg , ise);
613 throw ise;
614 }
615 log.finished();
616 }
617 }
618
619
620
621
622
623
624
625
626 private static class LastLog implements Serializable {
627 private final String logFile;
628 private final long timeStamp;
629
630 LastLog(String path) {
631 logFile = path;
632 timeStamp = new File(logFile).lastModified();
633 }
634
635 boolean sameAs(String otherPath) {
636 if (!logFile.equals(otherPath))
637 return false;
638 return (new File(otherPath).lastModified() == timeStamp);
639 }
640 }
641
642
643
644
645
646 private InternalSpaceException logAndThrowRecoveryException(
647 String msg, Throwable nested)
648 {
649 final InternalSpaceException e =
650 new InternalSpaceException(msg, nested);
651 logger.log(Level.SEVERE, msg, e);
652 throw e;
653 }
654 }