View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.river.norm.event;
19  
20  import java.io.IOException;
21  import java.io.ObjectOutputStream;
22  import java.io.Serializable;
23  import java.rmi.MarshalledObject;
24  import java.rmi.RemoteException;
25  import java.security.AccessControlContext;
26  import java.security.AccessController;
27  import java.security.PrivilegedActionException;
28  import java.security.PrivilegedExceptionAction;
29  import java.util.concurrent.ExecutorService;
30  import java.util.logging.Level;
31  import java.util.logging.Logger;
32  import net.jini.core.event.RemoteEvent;
33  import net.jini.core.event.RemoteEventListener;
34  import net.jini.io.MarshalledInstance;
35  import net.jini.security.ProxyPreparer;
36  import org.apache.river.api.io.AtomicSerial;
37  import org.apache.river.api.io.AtomicSerial.GetArg;
38  import org.apache.river.constants.ThrowableConstants;
39  import org.apache.river.logging.Levels;
40  import org.apache.river.thread.wakeup.RetryTask;
41  import org.apache.river.thread.wakeup.WakeupManager;
42  
43  /**
44   * Representation of an event type the supports a single registrant.
45   * The registrant, event ID, and event sequence number information
46   * portions of this object are preserved when serialized.  The send
47   * monitor, and currently scheduled event sends are not.
48   *
49   * @author Sun Microsystems, Inc.
50   */
51  // $$$ I am not sure if this class does too much locking, or too little.
52  // It does too little in that it does not do any locking during serialization
53  // It does too much in that Norm's current locking discipline means that for
54  // the most part this class does not have to worry about multiple threads
55  // of control entering at one time.
56  // Before we make this a general utility we have to re-think the locking
57  // strategy.
58  @AtomicSerial
59  public class EventType implements Serializable {
60      private static final long serialVersionUID = 2;
61  
62      /** Logger for logging messages for this class */
63      private static final Logger logger = Logger.getLogger("org.apache.river.norm");
64  
65      /**
66       * Listener registered for events of this type.  Stored in 
67       * marshalled form so object the <code>EventType</code> object can
68       * be recovered even if the listener's codebase is not available.
69       * If this field is <code>null</code>, <code>listener</code> and 
70       * <code>handback</code> will be also.  
71       * @serial
72       */
73      private MarshalledObject marshalledListener;
74  
75      /** Transient cache of listener in unmarshalled form */
76      private transient RemoteEventListener listener;
77  
78      /**
79       * The proxy preparer to use to prepare a newly unmarshalled listener, or
80       * null if this instance was created using an already prepared listener,
81       * which is how instances are created initially.
82       */
83      private transient ProxyPreparer recoveredListenerPreparer;
84  
85      /**
86       * Handback object associated with current listener.
87       * @serial
88       */
89      private MarshalledObject handback;
90  
91      /** 
92       * Sequence number of the current listener/handback pair, incremented
93       * every time a new listener is set (even if the objects 
94       * are equivalent)
95       * @serial
96       */
97      private long registrationNumber;
98  
99      /**
100      * Last event sequence number we used
101      * @serial
102      */
103     private long lastSeqNum;
104 
105     /**
106      * Our event ID
107      * @serial
108      */
109     private final long evID;
110 
111     /** 
112      * Object we check with to ensure leases have not expired and notify
113      * when we get a definite exception during an event send attempt.
114      * If this field is <code>null</code> generator will be also.
115      */
116     private transient SendMonitor monitor;
117 
118     /** 
119      * Event type generator that created us. If this field is
120      * <code>null</code> monitor will be also.
121      */
122     private transient EventTypeGenerator generator;
123     
124     private transient AccessControlContext context;
125 
126     /**
127      * Simple constructor.  Initially the last sequence number is set to 0.
128      * @param generator EventTypeGenerator that is creating this event type
129      * @param monitor Object which is to monitor the sending of events
130      *	      of this type
131      * @param evID event ID of this event type
132      * @param listener the listener events of this type should go to
133      * @param handback the object that should be passed to listener
134      *        as part of the event
135      * @throws IOException if the listener cannot be serialized 
136      */
137     EventType(EventTypeGenerator generator,
138 	      SendMonitor monitor,
139 	      long evID,
140 	      RemoteEventListener listener,
141 	      MarshalledObject handback,
142 	      AccessControlContext context) throws IOException
143     {
144 	if (generator == null) {
145 	    throw new NullPointerException("EventType(): Must create event " +
146                 "type objects with a non-null generator");
147 	}
148 
149 	if (monitor == null) {
150 	    throw new NullPointerException("EventType(): Must create event " +
151                 "type objects with a non-null monitor");
152 	}
153 
154 	this.generator = generator;
155 	this.monitor = monitor;
156 	this.evID = evID;
157 	this.context = context;
158 	setLastSequenceNumber(0);
159 	setListener(listener, handback);
160     } 
161     
162     /**
163      * AtomicSerial constructor
164      * @param arg
165      * @throws IOException 
166      */
167     public EventType(GetArg arg) throws IOException{
168 	this(arg.get("marshalledListener", null, MarshalledObject.class),
169 	     arg.get("handback", null, MarshalledObject.class),
170 	     arg.get("registrationNumber", 0L),
171 	     arg.get("lastSeqNum", 0L),
172 	     arg.get("evID", 0L)
173 	);
174     }
175     
176     private EventType(MarshalledObject marshalledListener, MarshalledObject handback,
177 	    long registrationNumber, long lastSeqNum, long evID)
178     {
179 	this.marshalledListener = marshalledListener;
180 	this.handback = handback;
181 	this.registrationNumber = registrationNumber;
182 	this.lastSeqNum = lastSeqNum;
183 	this.evID = evID;
184     }
185     
186     private synchronized void writeObject(ObjectOutputStream out) throws IOException {
187 	out.defaultWriteObject();
188     }
189 
190     /** Utility method to null out listener */
191     private void clearListener() {
192 	listener = null;
193 	handback = null;
194 	marshalledListener = null;
195     }
196 
197     /**
198      * (Re)set the listener for this type of event.  Any pending
199      * events that have not yet been sent will be sent to the new
200      * listener, passing the new handback.  Setting the listener to
201      * <code>null</code> will cancel the sending of all pending
202      * events.  
203      *
204      * @param listener the listener events of this type should go to
205      * @param handback the object that should be passed to listener
206      *        as part of the event
207      * @throws IOException if listener cannot be serialized
208      */
209     public final synchronized void setListener(RemoteEventListener listener, 
210 					 MarshalledObject    handback)
211         throws IOException
212     {
213 	registrationNumber++;
214 	
215 	if (listener == null) {
216 	    clearListener();
217 	} else {	    
218 	    marshalledListener = 
219                 new MarshalledInstance(listener).convertToMarshalledObject();
220 	    this.listener = listener;
221 	    this.handback = handback;
222 	}
223     }
224 
225     /**
226      * Returns <code>true</code> if there is a listener registered for this 
227      * event type.
228      */
229     public synchronized boolean haveListener() {
230 	return marshalledListener != null;
231     }
232 
233     /**
234      * Convince method to get the listener.
235      * @return the listener, or <code>null</code> if the listener can't be
236      *	       unpacked or prepared, or there is no listener
237      * synchronized externally
238      */
239     private RemoteEventListener getListener() {
240 	if (!haveListener()) 
241 	    return null;
242             
243 	if (listener != null) 
244 	    return listener;
245 
246 	// There is a listener, but it is not unpacked yet, try to unpack
247 	RemoteEventListener unpreparedListener = null;
248 	try {
249 	    unpreparedListener =
250 		(RemoteEventListener) new MarshalledInstance(marshalledListener).get(false);
251 	} catch (IOException e) {
252 	    logger.log(Levels.HANDLED,
253 		       "Problem unmarshalling listener -- will retry later",
254 		       e);
255 	    // $$$ is this really the right thing to do?
256 	    // we probably really have a corrupted marshalledListener here
257 	} catch (ClassNotFoundException e) {
258 	    logger.log(Levels.HANDLED,
259 		       "Problem unmarshalling listener -- will retry later",
260 		       e);
261 	}
262 
263 	if (unpreparedListener != null) {
264 	    // Prepare the listener
265 	    try {
266 		listener = (RemoteEventListener)
267 		    recoveredListenerPreparer.prepareProxy(unpreparedListener);
268 	    } catch (RemoteException e) {
269 		logger.log(Levels.HANDLED,
270 			   "Problem preparing listener -- will retry later",
271 			   e);
272 	    } catch (SecurityException e) {
273 		logger.log(Levels.HANDLED,
274 			   "Problem preparing listener -- will retry later",
275 			   e);
276 	    }
277 	}
278 
279 	return listener;
280     }
281 
282     /**
283      * Atomically clear the current registration if its sequence
284      * number matches the passed in sequence number.  If the
285      * replacement occurs it will have the same effect as calling
286      * <code>setListener(null, null)</code>.
287      * <p>
288      * Can be used by code that needs to remove event registrations in
289      * response to exceptions thrown during event delivery without
290      * risking clobbering new registrations.
291      * @param oldSequenceNumber sequence number of the 
292      *        registration that had a problem 
293      * @return <code>true</code> if the state of the object was
294      * changed and <code>false</code> otherwise
295      * @see EventType#setListener 
296      */
297     public synchronized boolean clearListenerIfSequenceMatch(
298         long oldSequenceNumber) 
299     {
300 	if (oldSequenceNumber == registrationNumber) {
301 	    clearListener();
302 	    return true;
303 	}
304 
305 	return false;
306     }
307 
308     /**
309      * Set the object's notion of the last sequence number.  The next event
310      * scheduled to be sent will have a sequence number one greater than
311      * the value past to this call.
312      * <p>
313      * @param seqNum value for the last sequence number
314      */
315     public final synchronized void setLastSequenceNumber(long seqNum) {
316 	lastSeqNum = seqNum;
317     }
318 
319     /**
320      * Return the sequence number of the last event that was scheduled
321      * to be sent.  Intended primarily for creating
322      * <code>EventRegistration</code> objects and the like.
323      */
324     public synchronized long getLastSequenceNumber () {
325 	return lastSeqNum;
326     }
327     
328     /**
329      * Return the <code>long</code> that was uniquely associated with this
330      * object when it was created.
331      */
332     public long getEventID() {
333 	return evID;
334     }
335 
336     /**
337      * Schedule the sending of an event.  This event will be sent to
338      * the currently registered listener.  If the listener changes
339      * before the the event is successfully sent the event will be sent
340      * to the new listener.  If the current listener is
341      * <code>null</code> this call will have no affect aside from
342      * incrementing the sequence number.
343      * @param factory an object that will be used to create the
344      * <code>Event</code> object when necessary
345      * @return the sequence number assigned to the event
346      * @throws IllegalStateException if this method is called
347      *         after the object has be deserialized and before
348      *         <code>restoreTransientState</code> has been called
349      * @see EventType#restoreTransientState
350      */
351     public synchronized long sendEvent(EventFactory factory) {
352 	if (generator == null) {
353 	    // Have not had our state restored, complain
354 	    throw new IllegalStateException("EventType.sendEvent:" +
355 	        "called before state was fully restored");
356 	}
357 
358 	// Even if there is no listener, an event has occurred, so
359 	// increment the sequence number (note this a stronger
360 	// guarantee that the Jini Distributed Event Specification,
361 	// but one that is required by the LRS spec).
362 	lastSeqNum++;
363 
364 	// If we don't have a listener we don't need do anything else
365 	if (!haveListener())
366 	    return lastSeqNum;
367 	
368 	final ExecutorService mgr = generator.getExecutorService();
369 	final WakeupManager wMgr = generator.getWakeupManager();
370 	mgr.execute(new SendTask(mgr, wMgr, factory, lastSeqNum));
371 	
372 	return lastSeqNum;
373     }
374 
375     /**
376      * Increment the sequence number and return the result. This
377      * method is useful if an event occurs that needs to be noted
378      * but from some reason it is impossible to deliver the event.
379      *
380      * @return the new value for the sequence number
381      */
382     public synchronized long bumpSequenceNumber() {
383 	return ++lastSeqNum;
384     }
385 
386     /**
387      * Restore the transient state of this object after recovering it
388      * from a serialization stream.  None of the arguments can be
389      * <code>null</code>.
390      * <p>
391      * @param generator the <code>EventTypeGenerator</code> that was used
392      *        to create this EventType object originally
393      * @param monitor the object that monitors the progress of events
394      *        set by this object
395      * @param recoveredListenerPreparer the proxy preparer to use to prepare
396      *	      listeners recovered from persistent storage
397      * @param context Context used to send events.
398      */
399     public void restoreTransientState(EventTypeGenerator generator,
400 				      SendMonitor monitor, 
401 				      ProxyPreparer recoveredListenerPreparer,
402 				      AccessControlContext context)
403     {
404 	if (generator == null) {
405 	    throw new NullPointerException("EventType.restoreTransientState:" +
406 	        "Must call with a non-null generator");
407 	}
408 	if (monitor == null) {
409 	    throw new NullPointerException("EventType.restoreTransientState:" +
410 	        "Must call with a non-null monitor");
411 	}
412 	if (recoveredListenerPreparer == null) {
413 	    throw new NullPointerException("EventType.restoreTransientState:" +
414 	        "Must call with a non-null recoveredListenerPreparer");
415 	}
416         synchronized (this){
417             this.generator = generator;
418             this.monitor = monitor;
419             this.recoveredListenerPreparer = recoveredListenerPreparer;
420 	    this.context = context;
421         }
422             generator.recoverEventID(evID);
423         
424     }
425 
426     /**
427      * Subclass of <code>RetryTask</code> used by <code>EventType</code>
428      * to send events.
429      */
430     private class SendTask extends RetryTask {
431 	/** Max time we are willing to let a send attempt to go on for */
432 	final static private long MAX_TIME = 1000 * 60 * 60 * 24; //~ 1 Day
433 	
434 	/** Factory used to create the <code>RemoteEvent</code> to be sent */
435 	final private EventFactory eventFactory;
436 
437 	/** Sequence number the event should have */
438 	final private long seqNum;
439 
440 	/** Cached event */
441 	volatile private RemoteEvent cachedEvent;
442 	
443 	/** 
444 	 * Registration sequence number of the listener/handback pair
445 	 * event was built for
446 	 */
447 	private long eventForRegistrationNumber = -1;
448 
449 	/**
450 	 * Simple constructor.
451 	 * @param taskManager <code>TaskManager</code> this task is to be
452 	 *                    put into
453 	 * @param eventFactory <code>EventFactory</code> that will be used
454 	 *                     to create the event to be sent
455 	 * @param seqNum      the sequence number of the event
456 	 */
457 	private SendTask(ExecutorService taskManager, WakeupManager wakeupManager,
458 			 EventFactory eventFactory, long seqNum)
459 	{
460 	    super(taskManager, wakeupManager);
461 	    this.eventFactory = eventFactory;
462 	    this.seqNum = seqNum;
463 	}
464 
465 	// Inherit java doc from super type
466 	public boolean tryOnce() {
467 	    final long now = System.currentTimeMillis();
468 	    if (now - startTime() > MAX_TIME)
469 		return true;	// we have been trying too long, stop here
470 
471 	    if (!EventType.this.monitor.isCurrent())
472 		return true;	// lease gone, stop here
473 
474 	    // Local copies of listener and handback so they won't
475 	    // be clobbered by setListener calls
476 	    RemoteEventListener listener;
477 	    MarshalledObject handback;
478 	    long registrationNumber;
479 	    boolean createEvent;
480 	    RemoteEvent event;
481 	    synchronized (EventType.this) {
482 		if (!EventType.this.haveListener())
483 		    return true; // No currently registered listener, stop here
484 
485 		listener = EventType.this.getListener();
486 		if (listener == null) {
487 		    return false; // There is a listener, but we can't unpack
488 				  // it -- schedule a retry later
489 		}
490 		handback = EventType.this.handback;
491 		registrationNumber = EventType.this.registrationNumber;
492 		event = cachedEvent;
493 		createEvent = (event == null || 
494 			    eventForRegistrationNumber != registrationNumber);
495 		if (createEvent) eventForRegistrationNumber = registrationNumber;
496 	    }
497 
498 	    // If the handback has changed we need to create a new
499 	    // event object (we approximate this test by checking the
500 	    // registrationNumber
501 	    
502 	    if (createEvent) 
503 	    {
504 		event = eventFactory.createEvent(EventType.this.evID, seqNum,
505 						 handback);
506 		synchronized (EventType.this){
507 		    cachedEvent = event;
508 		}
509 		    
510 	    }
511 
512 	    // Try sending 
513 	    try {
514 		try {
515 		    final RemoteEvent ev = event;
516 		    AccessController.doPrivileged(
517 			new PrivilegedExceptionAction(){
518 
519 			    @Override
520 			    public Object run() throws Exception {
521 				listener.notify(ev);
522 				return null;
523 			    }
524 			}, context
525 		    );
526 		} catch (PrivilegedActionException e){
527 		    throw e.getException();
528 		}
529 		
530 		return true;
531 	    } catch (Throwable t) {
532 		// Classify the exception using ThrowableConstants, if
533 		// it is a bad object or uncategorized (which must be
534 		// a UnknownEventException) drop the event
535 		// registration, if it is a bad invocation mark the
536 		// try is done (since re-sending this event won't work), 
537 		// but don't drop the registration.  If indefinite return 
538 		// false so a retry will be scheduled.
539 		final int cat = ThrowableConstants.retryable(t);
540 		if (cat == ThrowableConstants.INDEFINITE) {
541 		    logger.log(Levels.HANDLED,
542 			       "Problem sending event -- will retry later",
543 			       t);
544 		    return false;
545 		} else if (cat == ThrowableConstants.BAD_INVOCATION) {
546 		    logger.log(Level.INFO, "Problem sending event", t);
547 		    return true;
548 		} else {
549 		    EventType.this.monitor.definiteException(EventType.this,
550 		        event, registrationNumber, t);
551 		    logger.log(Level.INFO, "Problem sending event", t);
552 		    return true;
553 		}
554 	    }
555 	    
556 	}
557 
558     }
559 }