1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.river.outrigger; 19 20 import java.io.IOException; 21 import java.rmi.RemoteException; 22 import java.rmi.MarshalledObject; 23 import net.jini.core.event.RemoteEventListener; 24 import net.jini.core.event.RemoteEvent; 25 import net.jini.core.event.UnknownEventException; 26 import net.jini.id.Uuid; 27 import net.jini.security.ProxyPreparer; 28 import net.jini.space.JavaSpace; 29 import org.apache.river.landlord.LeasedResource; 30 31 /** 32 * Subclass of <code>TransitionWatcher</code> for event 33 * registrations. Also represents the registration itself. 34 */ 35 abstract class EventRegistrationWatcher extends TransitionWatcher 36 implements EventRegistrationRecord 37 { 38 /** 39 * The current expiration time of the registration 40 * Protected, but only for use by subclasses. 41 */ 42 volatile long expiration; 43 44 /** 45 * The UUID that identifies this registration 46 * Protected, but only for use by subclasses. 47 * Should not be changed. 48 */ 49 Uuid cookie; 50 51 /** 52 * The handback associated with this registration. 53 * Protected, but only for use by subclasses. 54 * Should not be changed. 55 */ 56 MarshalledObject handback; 57 58 /** 59 * The event ID associated with this registration 60 * Protected, but only for use by subclasses. 61 * Should not be changed. 62 */ 63 long eventID; 64 65 /** 66 * The current sequence number. 67 */ 68 private long currentSeqNum = 0; 69 70 // /** 71 // * The sequence number of the last event successfully 72 // * delivered. Protected, but only for use by subclasses. 73 // */ 74 // volatile long lastSeqNumDelivered = -1; 75 76 /** 77 * The <code>TemplateHandle</code> associated with this 78 * watcher. 79 */ 80 private TemplateHandle owner; 81 82 /** 83 * Used during log recovery to create a mostly empty 84 * <code>EventRegistrationWatcher</code>. 85 * <p> 86 * Note, we set the time stamp and tie-breaker here instead of 87 * getting them from the log. This means they will be inconstant 88 * with their value from the last VM we were in, but since they 89 * never leak out and events are read-only anyway this should not 90 * be a problem (this also allows us to keep the tie-breaker and 91 * time stamp in final fields). 92 * 93 * @param timestamp the value that is used 94 * to sort <code>TransitionWatcher</code>s. 95 * @param startOrdinal the highest ordinal associated 96 * with operations that are considered to have occurred 97 * before the operation associated with this watcher. 98 * @param currentSeqNum Sequence number to start with. 99 */ 100 EventRegistrationWatcher(long timestamp, long startOrdinal, 101 long currentSeqNum) 102 { 103 super(timestamp, startOrdinal); 104 this.currentSeqNum = currentSeqNum; 105 handback = null; 106 cookie = null; 107 eventID = 0; 108 } 109 110 /** 111 * Create a new <code>EventRegistrationWatcher</code>. 112 * @param timestamp the value that is used 113 * to sort <code>TransitionWatcher</code>s. 114 * @param startOrdinal the highest ordinal associated 115 * with operations that are considered to have occurred 116 * before the operation associated with this watcher. 117 * @param cookie The unique identifier associated 118 * with this watcher. Must not be <code>null</code>. 119 * @param handback The handback object that 120 * should be sent along with event 121 * notifications to the the listener. 122 * @param eventID The event ID for event type 123 * represented by this object. 124 * @throws NullPointerException if the <code>cookie</code> 125 * argument is <code>null</code>. 126 */ 127 EventRegistrationWatcher(long timestamp, long startOrdinal, Uuid cookie, 128 MarshalledObject handback, long eventID) 129 { 130 super(timestamp, startOrdinal); 131 132 if (cookie == null) 133 throw new NullPointerException("cookie must be non-null"); 134 135 this.cookie = cookie; 136 this.handback = handback; 137 this.eventID = eventID; 138 } 139 140 /** 141 * Process the given transition by queuing up a task with the 142 * notifier for event delivery. Assumes the passed entry matches 143 * the template in the <code>TemplateHandle</code> associated with 144 * this watcher and that <code>isInterested</code> returned 145 * <code>true</code>. If <code>remove</code> has been called or 146 * the expiration time of this watcher has passed, this call 147 * should have no effect. This call may cause the watcher to be 148 * removed. 149 * @param transition A <code>EntryTransition</code> that 150 * describes the transition and what 151 * entry is transitioning. This method 152 * will assume that <code>transition.getHandle</code> 153 * returns a non-null value. 154 * @param now An estimate of the current time (not the time 155 * when the event occured). 156 * @throws NullPointerException if <code>transition</code> is 157 * <code>null</code>. 158 */ 159 void process(EntryTransition transition, long now) { 160 boolean doneFor = false; 161 162 // lock before checking the time and so we can update currentSeqNum 163 synchronized (this) { 164 if (owner == null) 165 return; // Must have been removed 166 167 if (now > expiration) { 168 doneFor = true; 169 } else { 170 currentSeqNum++; 171 owner.getServer().enqueueDelivery(new BasicEventSender(currentSeqNum, eventID, handback)); 172 } 173 } 174 175 if (doneFor) 176 cancel(); 177 } 178 179 180 /** 181 * Return the remote listener associated with this 182 * <code>EventRegistrationWatcher</code>. Optionally 183 * prepare the listener if it has been recovered from 184 * the store and not yet re-prepared. 185 * @return the remote listener associated with this 186 * <code>EventRegistrationWatcher</code>. 187 * @throws IOException if the listener can not 188 * be unmarshalled. May throw {@link RemoteException} 189 * if the call to the preparer does 190 * @throws ClassNotFoundException if the listener 191 * needs to be unmarshalled and a necessary 192 * class can not be found. 193 * @throws SecurityException if the <code>prepareProxy</code> 194 * call does. 195 */ 196 abstract RemoteEventListener getListener(ProxyPreparer preparer) 197 throws ClassNotFoundException, IOException; 198 199 /** 200 * Associate a <code>TemplateHandle</code> with this object. May 201 * only be called once on any given 202 * <code>EventRegistrationWatcher</code> instance. 203 * 204 * @param h The <code>TemplateHandle</code> associated 205 * with this watcher. 206 * @return <code>true</code> if the handle was succfully added, 207 * and <code>false</code> if the watcher has already 208 * been removed. 209 * @throws NullPointerException if <code>h</code> is 210 * <code>null</code> 211 */ 212 boolean addTemplateHandle(TemplateHandle h) { 213 if (h == null) 214 throw 215 new NullPointerException("TemplateHandle must be non-null"); 216 synchronized (this){ 217 if (owner != null) 218 throw new AssertionError("Can only call addTemplateHandle once"); 219 220 owner = h; 221 } 222 return true; 223 } 224 225 /** 226 * Set the expiration time of this object. This method may be 227 * called before <code>setTemplateHandle</code> is called. 228 * Assumes locking is handled by the caller. 229 * @param newExpiration The expiration time. 230 */ 231 public synchronized void setExpiration(long newExpiration) { 232 expiration = newExpiration; 233 } 234 235 public long getExpiration() { 236 return expiration; 237 } 238 239 /** 240 * Get the unique identifier associated with this object. This 241 * method may be called before <code>setTemplateHandle</code> is 242 * called. 243 * @return The unique identifier associated with this 244 * watcher. 245 */ 246 public synchronized Uuid getCookie() { 247 return cookie; 248 } 249 250 /** 251 * Overridden by subclasses if there is any cleanup work they need 252 * to do as part of <code>cancel</code> or 253 * <code>removeIfExpired</code>. Called after releasing the lock 254 * on <code>this</code>. Will be called at most once. 255 * @param owner A reference to the owner. 256 * @param expired <code>true</code> if being called from 257 * <code>removeIfExpired</code> and false otherwise. 258 */ 259 void cleanup(TemplateHandle owner, boolean expired) 260 {} 261 262 /** 263 * The heavy lifting of removing ourselves. 264 * @param now The current time (or a bit earlier). 265 * @param doIt If <code>true</code> ignore 266 * <code>now</code> and just remove <code>this</code> 267 * object. 268 * @return <code>true</code> if this call removed 269 * <code>this</code> object, <code>false</code> if 270 * it had already been done. Should be ignored if 271 * <code>doIt</code> is <code>false</code>. 272 */ 273 private boolean doRemove(long now, boolean doIt) { 274 final TemplateHandle owner; 275 synchronized (this) { 276 if (this.owner == null) 277 return false; // already removed 278 279 // Is this a force, or past our expiration? 280 if (!doIt && (now < expiration)) 281 return false; // don't remove, not our time 282 283 owner = this.owner; 284 expiration = Long.MIN_VALUE; //Make sure no one tries to renew us 285 this.owner = null; //We might stick around don't hold owner 286 } 287 288 cleanup(owner, !doIt); 289 owner.getServer().removeEventRegistration(this); 290 owner.removeTransitionWatcher(this); 291 return true; // we did the deed 292 } 293 294 void removeIfExpired(long now) { 295 doRemove(now, false); 296 } 297 298 public boolean cancel() { 299 return doRemove(0, true); 300 } 301 302 /** 303 * Common implementation of <code>EventSender</code>. 304 */ 305 private class BasicEventSender implements EventSender { 306 307 private final long seqNum; 308 private final long eventID; 309 private final MarshalledObject handback; 310 311 BasicEventSender(long seqNum, long eventID, MarshalledObject handback){ 312 this.seqNum = seqNum; 313 this.eventID = eventID; 314 this.handback = handback; 315 } 316 317 public void sendEvent(JavaSpace source, long now, ProxyPreparer preparer) 318 throws UnknownEventException, IOException, ClassNotFoundException 319 { 320 // Forget all the tricky crap, just send it. 321 // boolean doneFor = false; 322 // long seqNum = -1; 323 // 324 // synchronized (EventRegistrationWatcher.this) { 325 // if (owner == null) 326 // return; // Our watcher must have been removed, we're done 327 // 328 // if (getExpiration() < now) { 329 // doneFor = true; // Our watcher is expired, remove it 330 // } else if (currentSeqNum <= lastSeqNumDelivered) { 331 // return; // Someone already sent our event, we're done 332 // } else { 333 // // We need to send an event! 334 // seqNum = currentSeqNum; 335 // } 336 // 337 // 338 // // cancel is a synchronized method with the same lock. 339 // if (doneFor) { 340 // cancel(); 341 // return; 342 // } 343 // 344 // /* The only way to get here is through a path that sets 345 // * seqNum to the non-initial values 346 // */ 347 // assert seqNum != -1; 348 // 349 // 350 // 351 // // success!, update lastSeqNumDelivered, but don't go backward 352 // 353 // if (seqNum > lastSeqNumDelivered) 354 // lastSeqNumDelivered = seqNum; 355 // } 356 357 /* If we are here then we need to send an event (probably 358 * someone could have sent our event between the time 359 * we released the lock and now). 360 */ 361 getListener(preparer).notify( 362 new RemoteEvent(source, eventID, seqNum, handback)); 363 } 364 365 366 public void cancelRegistration() { 367 cancel(); 368 } 369 370 // /** 371 // * Return the <code>EventRegistrationWatcher</code> this 372 // * object is part of (exits because 373 // * <code>(BasicEventSender)other).EventRegistrationWatcher. 374 // * this</code> does not work. 375 // */ 376 // private EventRegistrationWatcher getOwner() { 377 // return EventRegistrationWatcher.this; 378 // } 379 380 /** 381 * Run after another event sender if it is for the same 382 * registration. No sense sending the same event twice. Don't 383 * care which one goes first (though the second probably won't 384 * run since <code>lastSeqNumDelivered</code> will probably 385 * equal <code>currentSeqNum</code> when it runs). 386 */ 387 // public boolean runAfter(EventSender other) { 388 // if (!(other instanceof BasicEventSender)) 389 // return false; 390 // 391 // return EventRegistrationWatcher.this == 392 // ((BasicEventSender)other).getOwner(); 393 // } 394 } 395 }