View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.river.outrigger;
19  
20  import java.io.IOException;
21  import java.rmi.RemoteException;
22  import java.rmi.MarshalledObject;
23  import net.jini.core.event.RemoteEventListener;
24  import net.jini.core.event.RemoteEvent;
25  import net.jini.core.event.UnknownEventException;
26  import net.jini.id.Uuid;
27  import net.jini.security.ProxyPreparer;
28  import net.jini.space.JavaSpace;
29  import org.apache.river.landlord.LeasedResource;
30  
31  /**
32   * Subclass of <code>TransitionWatcher</code> for event
33   * registrations. Also represents the registration itself.
34   */
35  abstract class EventRegistrationWatcher extends TransitionWatcher 
36      implements EventRegistrationRecord
37  {
38      /** 
39       * The current expiration time of the registration
40       * Protected, but only for use by subclasses.
41       */
42      volatile long expiration;
43  
44      /** 
45       * The UUID that identifies this registration 
46       * Protected, but only for use by subclasses.
47       * Should not be changed.
48       */
49      Uuid cookie;
50  
51      /**
52       * The handback associated with this registration.
53       * Protected, but only for use by subclasses.
54       * Should not be changed.
55       */
56      MarshalledObject handback;
57  
58      /** 
59       * The event ID associated with this registration
60       * Protected, but only for use by subclasses.
61       * Should not be changed.
62       */
63      long eventID;
64  
65      /** 
66       * The current sequence number. 
67       */
68      private long currentSeqNum = 0;
69  
70  //    /**
71  //     * The sequence number of the last event successfully 
72  //     * delivered.  Protected, but only for use by subclasses.
73  //     */
74  //    volatile long lastSeqNumDelivered = -1;
75  
76      /**
77       * The <code>TemplateHandle</code> associated with this
78       * watcher.
79       */
80      private TemplateHandle owner;
81  
82      /**
83       * Used during log recovery to create a mostly empty
84       * <code>EventRegistrationWatcher</code>.  
85       * <p> 
86       * Note, we set the time stamp and tie-breaker here instead of
87       * getting them from the log. This means they will be inconstant
88       * with their value from the last VM we were in, but since they
89       * never leak out and events are read-only anyway this should not
90       * be a problem (this also allows us to keep the tie-breaker and
91       * time stamp in final fields).
92       *
93       * @param timestamp the value that is used
94       *        to sort <code>TransitionWatcher</code>s.
95       * @param startOrdinal the highest ordinal associated
96       *        with operations that are considered to have occurred 
97       *        before the operation associated with this watcher.
98       * @param currentSeqNum Sequence number to start with.
99       */
100     EventRegistrationWatcher(long timestamp, long startOrdinal, 
101 			     long currentSeqNum) 
102     {
103 	super(timestamp, startOrdinal);
104 	this.currentSeqNum = currentSeqNum;
105         handback = null;
106         cookie = null;
107         eventID = 0;
108     }
109 
110     /**
111      * Create a new <code>EventRegistrationWatcher</code>.
112      * @param timestamp the value that is used
113      *        to sort <code>TransitionWatcher</code>s.
114      * @param startOrdinal the highest ordinal associated
115      *        with operations that are considered to have occurred 
116      *        before the operation associated with this watcher.
117      * @param cookie The unique identifier associated
118      *        with this watcher. Must not be <code>null</code>.
119      * @param handback The handback object that
120      *        should be sent along with event
121      *        notifications to the the listener.
122      * @param eventID The event ID for event type
123      *        represented by this object.
124      * @throws NullPointerException if the <code>cookie</code>
125      *        argument is <code>null</code>.
126      */
127     EventRegistrationWatcher(long timestamp, long startOrdinal, Uuid cookie,
128 	MarshalledObject handback, long eventID)
129     {
130 	super(timestamp, startOrdinal);
131 
132 	if (cookie == null)
133 	    throw new NullPointerException("cookie must be non-null");
134 
135 	this.cookie = cookie;
136 	this.handback = handback;
137 	this.eventID = eventID;
138     }
139 
140     /**
141      * Process the given transition by queuing up a task with the
142      * notifier for event delivery. Assumes the passed entry matches
143      * the template in the <code>TemplateHandle</code> associated with
144      * this watcher and that <code>isInterested</code> returned
145      * <code>true</code>. If <code>remove</code> has been called or
146      * the expiration time of this watcher has passed, this call
147      * should have no effect. This call may cause the watcher to be
148      * removed.
149      * @param transition A <code>EntryTransition</code> that
150      *              describes the transition and what
151      *              entry is transitioning. This method
152      *              will assume that <code>transition.getHandle</code>
153      *              returns a non-null value.
154      * @param now   An estimate of the current time (not the time
155      *              when the event occured).
156      * @throws NullPointerException if <code>transition</code> is 
157      *         <code>null</code>.  
158      */
159     void process(EntryTransition transition, long now) {
160 	boolean doneFor = false;
161 
162 	// lock before checking the time and so we can update currentSeqNum
163 	synchronized (this) {
164 	    if (owner == null) 
165 		return; // Must have been removed
166 
167 	    if (now > expiration) {
168 		doneFor = true;
169 	    } else {
170 		currentSeqNum++;
171 		owner.getServer().enqueueDelivery(new BasicEventSender(currentSeqNum, eventID, handback));
172 	    }
173 	}
174 
175 	if (doneFor)
176 	    cancel();
177     }
178 
179 
180     /**
181      * Return the remote listener associated with this 
182      * <code>EventRegistrationWatcher</code>. Optionally
183      * prepare the listener if it has been recovered from
184      * the store and not yet re-prepared.
185      * @return the remote listener associated with this 
186      * <code>EventRegistrationWatcher</code>. 
187      * @throws IOException if the listener can not
188      *         be unmarshalled. May throw {@link RemoteException}
189      *         if the call to the preparer does
190      * @throws ClassNotFoundException if the listener
191      *         needs to be unmarshalled and a necessary
192      *         class can not be found.
193      * @throws SecurityException if the <code>prepareProxy</code>
194      *         call does.
195      */
196     abstract RemoteEventListener getListener(ProxyPreparer preparer)
197 	throws ClassNotFoundException, IOException;
198 
199     /**
200      * Associate a <code>TemplateHandle</code> with this object.  May
201      * only be called once on any given
202      * <code>EventRegistrationWatcher</code> instance.
203      *
204      * @param h The <code>TemplateHandle</code> associated
205      *          with this watcher.
206      * @return <code>true</code> if the handle was succfully added,
207      *         and <code>false</code> if the watcher has already
208      *         been removed.
209      * @throws NullPointerException if <code>h</code> is 
210      *        <code>null</code> 
211      */
212     boolean addTemplateHandle(TemplateHandle h) {
213 	if (h == null) 
214 	    throw 
215 		new NullPointerException("TemplateHandle must be non-null");
216         synchronized (this){
217             if (owner != null)
218                 throw new AssertionError("Can only call addTemplateHandle once");
219 
220             owner = h;
221         }
222 	return true;
223     }
224 
225     /**
226      * Set the expiration time of this object.  This method may be
227      * called before <code>setTemplateHandle</code> is called.
228      * Assumes locking is handled by the caller.
229      * @param newExpiration The expiration time.
230      */
231     public synchronized void setExpiration(long newExpiration) {
232 	expiration = newExpiration;
233     }
234 
235     public long getExpiration() {
236 	return expiration;
237     }
238 
239     /**
240      * Get the unique identifier associated with this object.  This
241      * method may be called before <code>setTemplateHandle</code> is
242      * called. 
243      * @return The unique identifier associated with this
244      * watcher.
245      */
246     public synchronized Uuid getCookie() {
247 	return cookie;
248     }
249 
250     /**
251      * Overridden by subclasses if there is any cleanup work they need
252      * to do as part of <code>cancel</code> or
253      * <code>removeIfExpired</code>. Called after releasing the lock
254      * on <code>this</code>.  Will be called at most once.  
255      * @param owner A reference to the owner.
256      * @param expired <code>true</code> if being called from 
257      *        <code>removeIfExpired</code> and false otherwise. 
258      */
259     void cleanup(TemplateHandle owner, boolean expired) 
260     {}
261 
262     /**
263      * The heavy lifting of removing ourselves.
264      * @param now The current time (or a bit earlier).
265      * @param doIt If <code>true</code> ignore
266      *             <code>now</code> and just remove <code>this</code>
267      *             object.
268      * @return <code>true</code> if this call removed
269      *         <code>this</code> object, <code>false</code> if
270      *         it had already been done. Should be ignored if 
271      *         <code>doIt</code> is <code>false</code>.
272      */
273     private boolean doRemove(long now, boolean doIt) {
274 	final TemplateHandle owner;
275 	synchronized (this) {
276 	    if (this.owner == null)
277 		return false; // already removed
278 
279 	    // Is this a force, or past our expiration?
280 	    if (!doIt && (now < expiration))
281 		return false; // don't remove, not our time
282 
283 	    owner = this.owner;
284 	    expiration = Long.MIN_VALUE; //Make sure no one tries to renew us
285 	    this.owner = null;       //We might stick around don't hold owner
286 	}
287 
288 	cleanup(owner, !doIt);
289 	owner.getServer().removeEventRegistration(this);	    
290 	owner.removeTransitionWatcher(this);	
291 	return true; // we did the deed
292     }
293 
294     void removeIfExpired(long now) {
295 	doRemove(now, false);
296     }
297 
298     public boolean cancel() {
299 	return doRemove(0, true);
300     }
301 
302     /**
303      * Common implementation of <code>EventSender</code>.
304      */
305     private class BasicEventSender implements EventSender {
306         
307         private final long seqNum;
308         private final long eventID;
309         private final MarshalledObject handback;
310         
311         BasicEventSender(long seqNum, long eventID, MarshalledObject handback){
312             this.seqNum = seqNum;
313             this.eventID = eventID;
314             this.handback = handback;
315         }
316         
317 	public void sendEvent(JavaSpace source, long now, ProxyPreparer preparer)
318 	    throws UnknownEventException, IOException, ClassNotFoundException
319 	{
320             // Forget all the tricky crap, just send it.
321 //	    boolean doneFor = false;
322 //	    long seqNum = -1;
323 //
324 //	    synchronized (EventRegistrationWatcher.this) {
325 //		if (owner == null)
326 //		    return; // Our watcher must have been removed, we're done
327 //
328 //		if (getExpiration() < now) {
329 //		    doneFor = true; // Our watcher is expired, remove it
330 //		} else if (currentSeqNum <= lastSeqNumDelivered) {
331 //		    return; // Someone already sent our event, we're done
332 //		} else {
333 //		    // We need to send an event!
334 //		    seqNum = currentSeqNum;
335 //		}
336 //	    
337 //
338 //                // cancel is a synchronized method with the same lock.
339 //                if (doneFor) {
340 //                    cancel();
341 //                    return;
342 //                }
343 //
344 //                /* The only way to get here is through a path that sets
345 //                 * seqNum to the non-initial values
346 //                 */
347 //                assert seqNum != -1;
348 //
349 //
350 //
351 //                // success!, update lastSeqNumDelivered, but don't go backward
352 //	    
353 //		if (seqNum > lastSeqNumDelivered)
354 //		    lastSeqNumDelivered = seqNum;
355 //	    }
356             
357              /* If we are here then we need to send an event (probably
358 	     * someone could have sent our event between the time
359 	     * we released the lock and now).
360 	     */
361 	    getListener(preparer).notify(
362 		new RemoteEvent(source, eventID, seqNum, handback));
363 	}
364 	
365 	
366 	public void cancelRegistration() {
367 	    cancel();
368 	}
369 
370 //	/**
371 //	 * Return the <code>EventRegistrationWatcher</code> this
372 //	 * object is part of (exits because
373 //	 * <code>(BasicEventSender)other).EventRegistrationWatcher.
374 //	 * this</code> does not work.
375 //	 */
376 //	private EventRegistrationWatcher getOwner() {
377 //	    return EventRegistrationWatcher.this;
378 //	}
379 
380 	/**
381 	 * Run after another event sender if it is for the same
382 	 * registration. No sense sending the same event twice. Don't
383 	 * care which one goes first (though the second probably won't
384 	 * run since <code>lastSeqNumDelivered</code> will probably
385 	 * equal <code>currentSeqNum</code> when it runs).  
386 	 */
387 //	public boolean runAfter(EventSender other) {
388 //	    if (!(other instanceof BasicEventSender))
389 //		return false;
390 //
391 //	    return EventRegistrationWatcher.this ==
392 //		((BasicEventSender)other).getOwner();
393 //	}
394     }
395 }