1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.river.outrigger.snaplogstore;
19
20 import org.apache.river.outrigger.OutriggerServerImpl;
21
22 import java.io.BufferedInputStream;
23 import java.io.DataInputStream;
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.FileInputStream;
27 import java.io.IOException;
28 import java.io.ObjectInputStream;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Iterator;
32 import java.util.logging.Level;
33 import java.util.logging.Logger;
34
35 import net.jini.space.InternalSpaceException;
36
37
38
39
40
41
42
43
44 class LogInputFile extends LogFile {
45 private final File file;
46
47 private static final long intBytes = 4;
48
49
50 private static final Logger logger =
51 Logger.getLogger(OutriggerServerImpl.storeLoggerName);
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 static Iterator logs(String basePath, boolean returnAll)
68 throws IOException
69 {
70 LogFile lf = new LogFile(basePath);
71 ArrayList inDir = new ArrayList();
72 lf.existingLogs(inDir);
73
74
75 if (!returnAll && inDir.size() > 0)
76 inDir.remove(inDir.size() - 1);
77
78 return new LogInputFileIterator(inDir, lf);
79 }
80
81
82
83
84
85
86
87
88 private static class LogInputFileIterator implements Iterator {
89 private final LogFile baseLogFile;
90 private final Iterator fileList;
91
92
93
94
95
96 LogInputFileIterator(Collection files, LogFile baseLogFile) {
97 this.baseLogFile = baseLogFile;
98 fileList = files.iterator();
99 }
100
101 public boolean hasNext() {
102 return fileList.hasNext();
103 }
104
105
106
107
108
109
110
111
112
113
114
115 public Object next() {
116 File file = (File) fileList.next();
117 try {
118 return new LogInputFile(baseLogFile, file);
119 } catch (IOException e) {
120 file.delete();
121 return null;
122
123 }
124 }
125
126
127
128
129
130
131 public void remove() {
132 fileList.remove();
133 }
134 }
135
136
137
138
139
140
141
142
143
144
145
146 private LogInputFile(LogFile desc, File path) throws IOException {
147 super(desc.baseDir, desc.baseFile);
148 file = path;
149 }
150
151
152
153
154
155 synchronized void consume(BackEnd opOn) {
156 try {
157 DataInputStream din =
158 new DataInputStream(new BufferedInputStream(
159 new FileInputStream(file)));
160 ObjectInputStream in = new ObjectInputStream(din);
161
162 long length = file.length();
163 int fileVer = din.readInt();
164
165 if (fileVer != LOG_VERSION)
166 failure("unsupported log version: " + fileVer);
167
168 long logBytes = intBytes;
169 int updateLen = din.readInt();
170
171 Long txnId;
172 int count;
173 Resource rep;
174 byte[] cookie;
175
176 while (updateLen != 0) {
177
178 if (updateLen < 0)
179 failure("file corrupted, negative record length at " +
180 logBytes);
181
182 if (length - logBytes - intBytes < updateLen)
183
184
185
186
187
188 failure("file corrupted, partial record at " + logBytes);
189
190 int op = in.readByte();
191
192 switch (op) {
193 case BOOT_OP:
194 long time = in.readLong();
195 long sessionId = in.readLong();
196 opOn.bootOp(time, sessionId);
197 break;
198
199 case JOINSTATE_OP:
200 BaseObject state = (BaseObject)in.readObject();
201 opOn.joinStateOp(state);
202 break;
203
204 case WRITE_OP:
205 rep = (Resource)in.readObject();
206 txnId = (Long)in.readObject();
207 opOn.writeOp(rep, txnId);
208 break;
209
210 case BATCH_WRITE_OP:
211 txnId = (Long)in.readObject();
212 count = in.readInt();
213 for (int i=0; i<count; i++) {
214 rep = (Resource)in.readObject();
215 opOn.writeOp(rep, txnId);
216 }
217 break;
218
219 case TAKE_OP:
220 cookie = new byte[16];
221 in.readFully(cookie);
222 txnId = (Long)in.readObject();
223 opOn.takeOp(cookie, txnId);
224 break;
225
226 case BATCH_TAKE_OP:
227 txnId = (Long)in.readObject();
228 count = in.readInt();
229 for (int i=0; i<count; i++) {
230 cookie = new byte[16];
231 in.readFully(cookie);
232 opOn.takeOp(cookie, txnId);
233 }
234 break;
235
236 case REGISTER_OP:
237 Registration registration =
238 (Registration)in.readObject();
239 opOn.registerOp(registration);
240 break;
241
242 case RENEW_OP:
243 cookie = new byte[16];
244 in.readFully(cookie);
245 long expires = in.readLong();
246 opOn.renewOp(cookie, expires);
247 break;
248
249 case CANCEL_OP:
250 cookie = new byte[16];
251 in.readFully(cookie);
252 opOn.cancelOp(cookie);
253 break;
254
255 case PREPARE_OP:
256 txnId = (Long)in.readObject();
257 BaseObject transaction = (BaseObject)in.readObject();
258 opOn.prepareOp(txnId, transaction);
259 break;
260
261 case COMMIT_OP:
262 txnId = (Long)in.readObject();
263 opOn.commitOp(txnId);
264 break;
265
266 case ABORT_OP:
267 txnId = (Long)in.readObject();
268 opOn.abortOp(txnId);
269 break;
270
271 case UUID_OP:
272 final byte uuid[] = new byte[16];
273 in.readFully(uuid);
274 opOn.uuidOp(uuid);
275 break;
276
277 default:
278 failure("log record corrupted, unknown opcode");
279
280 }
281
282 logBytes += (intBytes + updateLen);
283
284
285 int offset = (int)logBytes & 3;
286 if (offset > 0) {
287 offset = 4 - offset;
288 logBytes += offset;
289 din.skipBytes(offset);
290 }
291 updateLen = din.readInt();
292
293 }
294 } catch (EOFException e) {
295 failure("unexpected end-of-file", e);
296
297 } catch (IOException e) {
298 failure("I/O error while consuming logs", e);
299
300 } catch (ClassNotFoundException e) {
301 failure("unexpected class?", e);
302 }
303 }
304
305
306
307
308
309 private void failure(String message) {
310 failure(message, null);
311 }
312
313
314
315
316
317 private void failure(String message, Exception e) {
318 String errorMsg = "Error consuming log file: " + file + ", " +
319 message + "Log file consumption stopped";
320
321 final InternalSpaceException ise =
322 new InternalSpaceException(errorMsg, e);
323 logger.log(Level.SEVERE, errorMsg, ise);
324 throw ise;
325 }
326
327
328
329
330
331 void finished() {
332 file.delete();
333 }
334
335 public String toString() {
336 return file.toString();
337 }
338 }