1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.river.norm.event;
19
20 import java.io.IOException;
21 import java.io.ObjectOutputStream;
22 import java.io.Serializable;
23 import java.rmi.MarshalledObject;
24 import java.rmi.RemoteException;
25 import java.security.AccessControlContext;
26 import java.security.AccessController;
27 import java.security.PrivilegedActionException;
28 import java.security.PrivilegedExceptionAction;
29 import java.util.concurrent.ExecutorService;
30 import java.util.logging.Level;
31 import java.util.logging.Logger;
32 import net.jini.core.event.RemoteEvent;
33 import net.jini.core.event.RemoteEventListener;
34 import net.jini.io.MarshalledInstance;
35 import net.jini.security.ProxyPreparer;
36 import org.apache.river.api.io.AtomicSerial;
37 import org.apache.river.api.io.AtomicSerial.GetArg;
38 import org.apache.river.constants.ThrowableConstants;
39 import org.apache.river.logging.Levels;
40 import org.apache.river.thread.wakeup.RetryTask;
41 import org.apache.river.thread.wakeup.WakeupManager;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 @AtomicSerial
59 public class EventType implements Serializable {
60 private static final long serialVersionUID = 2;
61
62
63 private static final Logger logger = Logger.getLogger("org.apache.river.norm");
64
65
66
67
68
69
70
71
72
73 private MarshalledObject marshalledListener;
74
75
76 private transient RemoteEventListener listener;
77
78
79
80
81
82
83 private transient ProxyPreparer recoveredListenerPreparer;
84
85
86
87
88
89 private MarshalledObject handback;
90
91
92
93
94
95
96
97 private long registrationNumber;
98
99
100
101
102
103 private long lastSeqNum;
104
105
106
107
108
109 private final long evID;
110
111
112
113
114
115
116 private transient SendMonitor monitor;
117
118
119
120
121
122 private transient EventTypeGenerator generator;
123
124 private transient AccessControlContext context;
125
126
127
128
129
130
131
132
133
134
135
136
137 EventType(EventTypeGenerator generator,
138 SendMonitor monitor,
139 long evID,
140 RemoteEventListener listener,
141 MarshalledObject handback,
142 AccessControlContext context) throws IOException
143 {
144 if (generator == null) {
145 throw new NullPointerException("EventType(): Must create event " +
146 "type objects with a non-null generator");
147 }
148
149 if (monitor == null) {
150 throw new NullPointerException("EventType(): Must create event " +
151 "type objects with a non-null monitor");
152 }
153
154 this.generator = generator;
155 this.monitor = monitor;
156 this.evID = evID;
157 this.context = context;
158 setLastSequenceNumber(0);
159 setListener(listener, handback);
160 }
161
162
163
164
165
166
167 public EventType(GetArg arg) throws IOException{
168 this(arg.get("marshalledListener", null, MarshalledObject.class),
169 arg.get("handback", null, MarshalledObject.class),
170 arg.get("registrationNumber", 0L),
171 arg.get("lastSeqNum", 0L),
172 arg.get("evID", 0L)
173 );
174 }
175
176 private EventType(MarshalledObject marshalledListener, MarshalledObject handback,
177 long registrationNumber, long lastSeqNum, long evID)
178 {
179 this.marshalledListener = marshalledListener;
180 this.handback = handback;
181 this.registrationNumber = registrationNumber;
182 this.lastSeqNum = lastSeqNum;
183 this.evID = evID;
184 }
185
186 private synchronized void writeObject(ObjectOutputStream out) throws IOException {
187 out.defaultWriteObject();
188 }
189
190
191 private void clearListener() {
192 listener = null;
193 handback = null;
194 marshalledListener = null;
195 }
196
197
198
199
200
201
202
203
204
205
206
207
208
209 public final synchronized void setListener(RemoteEventListener listener,
210 MarshalledObject handback)
211 throws IOException
212 {
213 registrationNumber++;
214
215 if (listener == null) {
216 clearListener();
217 } else {
218 marshalledListener =
219 new MarshalledInstance(listener).convertToMarshalledObject();
220 this.listener = listener;
221 this.handback = handback;
222 }
223 }
224
225
226
227
228
229 public synchronized boolean haveListener() {
230 return marshalledListener != null;
231 }
232
233
234
235
236
237
238
239 private RemoteEventListener getListener() {
240 if (!haveListener())
241 return null;
242
243 if (listener != null)
244 return listener;
245
246
247 RemoteEventListener unpreparedListener = null;
248 try {
249 unpreparedListener =
250 (RemoteEventListener) new MarshalledInstance(marshalledListener).get(false);
251 } catch (IOException e) {
252 logger.log(Levels.HANDLED,
253 "Problem unmarshalling listener -- will retry later",
254 e);
255
256
257 } catch (ClassNotFoundException e) {
258 logger.log(Levels.HANDLED,
259 "Problem unmarshalling listener -- will retry later",
260 e);
261 }
262
263 if (unpreparedListener != null) {
264
265 try {
266 listener = (RemoteEventListener)
267 recoveredListenerPreparer.prepareProxy(unpreparedListener);
268 } catch (RemoteException e) {
269 logger.log(Levels.HANDLED,
270 "Problem preparing listener -- will retry later",
271 e);
272 } catch (SecurityException e) {
273 logger.log(Levels.HANDLED,
274 "Problem preparing listener -- will retry later",
275 e);
276 }
277 }
278
279 return listener;
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297 public synchronized boolean clearListenerIfSequenceMatch(
298 long oldSequenceNumber)
299 {
300 if (oldSequenceNumber == registrationNumber) {
301 clearListener();
302 return true;
303 }
304
305 return false;
306 }
307
308
309
310
311
312
313
314
315 public final synchronized void setLastSequenceNumber(long seqNum) {
316 lastSeqNum = seqNum;
317 }
318
319
320
321
322
323
324 public synchronized long getLastSequenceNumber () {
325 return lastSeqNum;
326 }
327
328
329
330
331
332 public long getEventID() {
333 return evID;
334 }
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351 public synchronized long sendEvent(EventFactory factory) {
352 if (generator == null) {
353
354 throw new IllegalStateException("EventType.sendEvent:" +
355 "called before state was fully restored");
356 }
357
358
359
360
361
362 lastSeqNum++;
363
364
365 if (!haveListener())
366 return lastSeqNum;
367
368 final ExecutorService mgr = generator.getExecutorService();
369 final WakeupManager wMgr = generator.getWakeupManager();
370 mgr.execute(new SendTask(mgr, wMgr, factory, lastSeqNum));
371
372 return lastSeqNum;
373 }
374
375
376
377
378
379
380
381
382 public synchronized long bumpSequenceNumber() {
383 return ++lastSeqNum;
384 }
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399 public void restoreTransientState(EventTypeGenerator generator,
400 SendMonitor monitor,
401 ProxyPreparer recoveredListenerPreparer,
402 AccessControlContext context)
403 {
404 if (generator == null) {
405 throw new NullPointerException("EventType.restoreTransientState:" +
406 "Must call with a non-null generator");
407 }
408 if (monitor == null) {
409 throw new NullPointerException("EventType.restoreTransientState:" +
410 "Must call with a non-null monitor");
411 }
412 if (recoveredListenerPreparer == null) {
413 throw new NullPointerException("EventType.restoreTransientState:" +
414 "Must call with a non-null recoveredListenerPreparer");
415 }
416 synchronized (this){
417 this.generator = generator;
418 this.monitor = monitor;
419 this.recoveredListenerPreparer = recoveredListenerPreparer;
420 this.context = context;
421 }
422 generator.recoverEventID(evID);
423
424 }
425
426
427
428
429
430 private class SendTask extends RetryTask {
431
432 final static private long MAX_TIME = 1000 * 60 * 60 * 24;
433
434
435 final private EventFactory eventFactory;
436
437
438 final private long seqNum;
439
440
441 volatile private RemoteEvent cachedEvent;
442
443
444
445
446
447 private long eventForRegistrationNumber = -1;
448
449
450
451
452
453
454
455
456
457 private SendTask(ExecutorService taskManager, WakeupManager wakeupManager,
458 EventFactory eventFactory, long seqNum)
459 {
460 super(taskManager, wakeupManager);
461 this.eventFactory = eventFactory;
462 this.seqNum = seqNum;
463 }
464
465
466 public boolean tryOnce() {
467 final long now = System.currentTimeMillis();
468 if (now - startTime() > MAX_TIME)
469 return true;
470
471 if (!EventType.this.monitor.isCurrent())
472 return true;
473
474
475
476 RemoteEventListener listener;
477 MarshalledObject handback;
478 long registrationNumber;
479 boolean createEvent;
480 RemoteEvent event;
481 synchronized (EventType.this) {
482 if (!EventType.this.haveListener())
483 return true;
484
485 listener = EventType.this.getListener();
486 if (listener == null) {
487 return false;
488
489 }
490 handback = EventType.this.handback;
491 registrationNumber = EventType.this.registrationNumber;
492 event = cachedEvent;
493 createEvent = (event == null ||
494 eventForRegistrationNumber != registrationNumber);
495 if (createEvent) eventForRegistrationNumber = registrationNumber;
496 }
497
498
499
500
501
502 if (createEvent)
503 {
504 event = eventFactory.createEvent(EventType.this.evID, seqNum,
505 handback);
506 synchronized (EventType.this){
507 cachedEvent = event;
508 }
509
510 }
511
512
513 try {
514 try {
515 final RemoteEvent ev = event;
516 AccessController.doPrivileged(
517 new PrivilegedExceptionAction(){
518
519 @Override
520 public Object run() throws Exception {
521 listener.notify(ev);
522 return null;
523 }
524 }, context
525 );
526 } catch (PrivilegedActionException e){
527 throw e.getException();
528 }
529
530 return true;
531 } catch (Throwable t) {
532
533
534
535
536
537
538
539 final int cat = ThrowableConstants.retryable(t);
540 if (cat == ThrowableConstants.INDEFINITE) {
541 logger.log(Levels.HANDLED,
542 "Problem sending event -- will retry later",
543 t);
544 return false;
545 } else if (cat == ThrowableConstants.BAD_INVOCATION) {
546 logger.log(Level.INFO, "Problem sending event", t);
547 return true;
548 } else {
549 EventType.this.monitor.definiteException(EventType.this,
550 event, registrationNumber, t);
551 logger.log(Level.INFO, "Problem sending event", t);
552 return true;
553 }
554 }
555
556 }
557
558 }
559 }