1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.river.outrigger; 19 20 import java.util.LinkedList; 21 import java.util.logging.Level; 22 import java.util.logging.Logger; 23 24 import net.jini.id.Uuid; 25 26 /** 27 * Logs expiration of leases and asynchronously persists them to disk. 28 */ 29 class ExpirationOpQueue extends Thread { 30 /** The queue of expirations to log */ 31 private final LinkedList queue = new LinkedList(); 32 33 /** The server we are working for */ 34 private final OutriggerServerImpl server; 35 36 /** Logger for logging exceptions */ 37 private static final Logger logger = 38 Logger.getLogger(OutriggerServerImpl.leaseLoggerName); 39 40 /** 41 * Create a new <code>ExpirationOpQueue</code> that 42 * will handle lease expiration logging for the 43 * specified server. 44 * @param server the <code>OutriggerServerImpl</code> to 45 * log for. 46 */ 47 ExpirationOpQueue(OutriggerServerImpl server) { 48 super("Expiration Op Queue"); 49 this.server = server; 50 } 51 52 /** 53 * Enqueue the logging of the expiration of the specified lease. 54 * @param cookie The cookie of the lease that has expired. 55 */ 56 synchronized void enqueue(Uuid cookie) { 57 queue.add(cookie); 58 notifyAll(); 59 } 60 61 /** 62 * Stop the queue 63 */ 64 void terminate() { 65 interrupt(); 66 synchronized (this){ 67 notifyAll(); 68 } 69 } 70 71 public void run() { 72 while (!Thread.currentThread().isInterrupted()) { // ok not to lock since it starts false 73 try { 74 final Uuid cookie; 75 synchronized (this) { 76 while (queue.isEmpty()) { 77 wait(); 78 } 79 cookie = (Uuid)queue.removeFirst(); 80 } 81 82 server.cancelOp(cookie, true); 83 } catch (InterruptedException e){ 84 Thread.currentThread().interrupt(); // restore 85 return; 86 } catch (Throwable t) { 87 try { 88 logger.log(Level.INFO, 89 "ExpirationOpQueue.run encountered " + 90 t.getClass().getName() + ", continuing", 91 t); 92 } catch (Throwable tt) { 93 // don't let a problem in logging kill the thread 94 } 95 } 96 } 97 } 98 } 99 100