1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.river.outrigger.snaplogstore;
19
20 import org.apache.river.outrigger.LogOps;
21 import org.apache.river.outrigger.OutriggerServerImpl;
22 import org.apache.river.outrigger.proxy.StorableObject;
23 import org.apache.river.outrigger.proxy.StorableResource;
24 import net.jini.id.Uuid;
25
26 import java.io.File;
27 import java.io.FileDescriptor;
28 import java.io.IOException;
29 import java.io.ObjectOutputStream;
30 import java.io.RandomAccessFile;
31 import java.util.ArrayList;
32 import java.util.Observable;
33 import java.util.logging.Level;
34 import java.util.logging.Logger;
35
36 import net.jini.space.InternalSpaceException;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 class LogOutputFile extends LogFile implements LogOps {
57 private volatile RandomAccessFile logFile = null;
58 private volatile FileDescriptor logFD;
59 private volatile ObjectOutputStream out;
60 private volatile int suffix;
61 private volatile int opCnt;
62 private volatile int maxOps;
63 private volatile Observable observable;
64
65 private volatile long logBytes = 0;
66 private final byte[] intBuf = new byte[4];
67 private final byte[] zeroBuf = new byte[4];
68
69 private volatile long deferedUpdateLength = 0;
70 private volatile long deferedPosition = 0;
71
72 private static final long intBytes = 4;
73
74
75 private static final Logger logger =
76 Logger.getLogger(OutriggerServerImpl.storeLoggerName);
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 LogOutputFile(String basePath, int maxOps) throws IOException {
93 super(basePath);
94 ArrayList inDir = new ArrayList();
95 suffix = existingLogs(inDir);
96 this.maxOps = maxOps;
97 nextPath();
98 }
99
100
101
102
103
104
105
106 Observable observable() {
107 if (observable == null) {
108 observable = new Observable() {
109 public void notifyObservers() {
110 setChanged();
111 super.notifyObservers();
112 }
113 public void notifyObservers(Object arg) {
114 setChanged();
115 super.notifyObservers(arg);
116 }
117 };
118 }
119 return observable;
120 }
121
122
123
124
125 private void nextPath() throws IOException {
126 boolean completed = false;
127
128 if (logFile != null) {
129
130
131
132 if (deferedUpdateLength != 0) {
133 logFD.sync();
134 logFile.seek(deferedPosition);
135 writeInt((int)deferedUpdateLength);
136 }
137 try {
138 close();
139 } catch (IOException ignore) { }
140 completed = true;
141 }
142
143 suffix++;
144 logFile = new RandomAccessFile(baseDir.getPath() + File.separator +
145 baseFile + suffix, "rw");
146 logFD = logFile.getFD();
147 out = new ObjectOutputStream(new LogOutputStream(logFile));
148
149 writeInt(LOG_VERSION);
150
151 logBytes = logFile.getFilePointer();
152 logFile.setLength(logBytes);
153
154
155 logFile.write(zeroBuf);
156
157
158 logFD.sync();
159
160 deferedUpdateLength = 0;
161 opCnt = 0;
162
163
164
165
166
167
168 if (observable != null && completed)
169 observable.notifyObservers();
170 }
171
172
173
174
175 synchronized void close() throws IOException {
176 if (logFile != null) {
177 try {
178 out.close();
179 logFile.close();
180 } finally {
181 logFile = null;
182 }
183 }
184 }
185
186
187
188
189
190 void destroy() {
191 try {
192 close();
193 } catch (Throwable t) {
194
195 }
196 super.destroy();
197 }
198
199
200
201
202 public synchronized void bootOp(long time, long sessionId) {
203 try {
204 out.writeByte(BOOT_OP);
205 out.writeLong(time);
206 out.writeLong(sessionId);
207 flush();
208 } catch (IOException e) {
209 failed(e);
210 }
211 }
212
213
214
215
216 public synchronized void joinStateOp(StorableObject state) {
217 try {
218 out.writeByte(JOINSTATE_OP);
219 out.writeObject(new BaseObject(state));
220 flush();
221 } catch (IOException e) {
222 failed(e);
223 }
224 }
225
226
227
228
229 public synchronized void writeOp(StorableResource entry, Long txnId) {
230 try {
231 out.writeByte(WRITE_OP);
232 out.writeObject(new Resource(entry));
233 out.writeObject(txnId);
234
235
236
237
238 flush(txnId == null);
239 } catch (IOException e) {
240 failed(e);
241 }
242 }
243
244
245 public synchronized void writeOp(StorableResource entries[], Long txnId) {
246 try {
247 out.writeByte(BATCH_WRITE_OP);
248 out.writeObject(txnId);
249
250
251
252 out.writeInt(entries.length);
253 for (int i=0; i<entries.length; i++) {
254 out.writeObject(new Resource(entries[i]));
255 }
256
257
258
259
260 flush(txnId == null, entries.length);
261 } catch (IOException e) {
262 failed(e);
263 }
264 }
265
266
267
268
269 public synchronized void takeOp(Uuid cookie, Long txnId) {
270 try {
271 out.writeByte(TAKE_OP);
272 cookie.write(out);
273 out.writeObject(txnId);
274
275
276
277
278 flush(txnId == null);
279 } catch (IOException e) {
280 failed(e);
281 }
282 }
283
284
285 public synchronized void takeOp(Uuid cookies[], Long txnId) {
286 try {
287 out.writeByte(BATCH_TAKE_OP);
288 out.writeObject(txnId);
289
290
291
292 out.writeInt(cookies.length);
293 for (int i=0; i<cookies.length; i++) {
294 cookies[i].write(out);
295 }
296
297
298
299
300 flush(txnId == null, cookies.length);
301 } catch (IOException e) {
302 failed(e);
303 }
304 }
305
306
307
308
309 public synchronized void registerOp(StorableResource registration,
310 String type, StorableObject[] templates)
311 {
312 try {
313 out.writeByte(REGISTER_OP);
314 out.writeObject(new Registration(registration, type, templates));
315 flush();
316 } catch (IOException e) {
317 failed(e);
318 }
319 }
320
321
322
323
324 public synchronized void renewOp(Uuid cookie, long expiration) {
325 try {
326 out.writeByte(RENEW_OP);
327 cookie.write(out);
328 out.writeLong(expiration);
329 flush();
330 } catch (IOException e) {
331 failed(e);
332 }
333 }
334
335
336
337
338 public synchronized void cancelOp(Uuid cookie, boolean expired) {
339 try {
340 out.writeByte(CANCEL_OP);
341 cookie.write(out);
342
343
344
345 flush(!expired);
346 } catch (IOException e) {
347 failed(e);
348 }
349 }
350
351
352
353
354 public synchronized void prepareOp(Long txnId,
355 StorableObject transaction) {
356 try {
357 out.writeByte(PREPARE_OP);
358 out.writeObject(txnId);
359 out.writeObject(new BaseObject(transaction));
360 flush();
361 } catch (IOException e) {
362 failed(e);
363 }
364 }
365
366
367
368
369 public synchronized void commitOp(Long txnId) {
370 try {
371 out.writeByte(COMMIT_OP);
372 out.writeObject(txnId);
373 flush();
374 } catch (IOException e) {
375 failed(e);
376 }
377 }
378
379
380
381
382 public synchronized void abortOp(Long txnId) {
383 try {
384 out.writeByte(ABORT_OP);
385 out.writeObject(txnId);
386 flush();
387 } catch (IOException e) {
388 failed(e);
389 }
390 }
391
392 public synchronized void uuidOp(Uuid uuid) {
393 try {
394 out.writeByte(UUID_OP);
395 uuid.write(out);
396 flush();
397 } catch (IOException e) {
398 failed(e);
399 }
400 }
401
402
403
404
405
406 private void flush() throws IOException {
407 flush(true);
408 }
409
410
411
412
413
414
415
416 private synchronized void flush(boolean forceToDisk)
417 throws IOException
418 {
419 flush(forceToDisk, 1);
420 }
421
422
423
424
425
426
427 private synchronized void flush(boolean forceToDisk,
428 int effectiveOpCount)
429 throws IOException
430 {
431 assert effectiveOpCount > 0;
432
433 out.flush();
434
435 if (forceToDisk) {
436
437
438 logFD.sync();
439 }
440
441 long entryEnd = logFile.getFilePointer();
442 long updateLen = entryEnd - logBytes - intBytes;
443
444
445
446
447
448
449 if (!forceToDisk) {
450
451
452
453
454
455 if (deferedUpdateLength == 0) {
456 deferedUpdateLength = updateLen;
457 deferedPosition = logBytes;
458 } else {
459
460 logFile.seek(logBytes);
461 writeInt((int)updateLen);
462 }
463 } else {
464
465
466
467
468 if (deferedUpdateLength != 0) {
469 logFile.seek(deferedPosition);
470 writeInt((int)deferedUpdateLength);
471 deferedUpdateLength = 0;
472 }
473
474 logFile.seek(logBytes);
475 writeInt((int)updateLen);
476 }
477
478
479 entryEnd = (entryEnd + 3) & ~3L;
480
481
482 logFile.seek(entryEnd);
483 logFile.write(zeroBuf);
484 logBytes = entryEnd;
485
486 if (forceToDisk)
487 logFD.sync();
488
489 opCnt += effectiveOpCount;
490 if (opCnt >= maxOps)
491 nextPath();
492 else
493 out.reset();
494 }
495
496
497
498
499
500
501
502
503
504
505
506 private void writeInt(int val) throws IOException {
507 intBuf[0] = (byte) (val >> 24);
508 intBuf[1] = (byte) (val >> 16);
509 intBuf[2] = (byte) (val >> 8);
510 intBuf[3] = (byte) val;
511 logFile.write(intBuf);
512 }
513
514 private void failed(Exception e) throws InternalSpaceException {
515 logger.log(Level.SEVERE,
516 "Unexpected I/O error while persisting Space data",
517 e);
518 System.exit(-5);
519 }
520 }