View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.river.outrigger;
19  
20  import java.security.AccessControlContext;
21  import org.apache.river.config.Config;
22  import org.apache.river.thread.wakeup.WakeupManager;
23  
24  import net.jini.config.Configuration;
25  import net.jini.config.ConfigurationException;
26  
27  import java.util.Collection;
28  import java.util.Iterator;
29  import java.util.LinkedList;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.LinkedBlockingQueue;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  import java.util.logging.Level;
35  import java.util.logging.Logger;
36  import org.apache.river.thread.NamedThreadFactory;
37  
38  /**
39   * This class provides a driver for monitoring the state of transactions
40   * that have blocked progress of other operations recently.  It creates
41   * tasks that monitor each transaction by intermittently querying the
42   * transaction's state.  If it finds that the transaction has aborted,
43   * it makes sure that the local space aborts the transaction, too, so
44   * that operations will cease to be blocked by the transaction.
45   *
46   * @author Sun Microsystems, Inc.
47   *
48   * @see TxnMonitorTask
49   * @see OutriggerServerImpl#monitor
50   */
51  class TxnMonitor implements Runnable {
52      
53      private final AccessControlContext context;
54      
55      /**
56       * Each <code>ToMonitor</code> object represents a need to monitor
57       * the given transactions, possibly under a lease.
58       *
59       * @see #pending
60       */
61      private static class ToMonitor {
62  	final QueryWatcher	query;         // query governing interest in txns
63  	final Collection<Txn>	txns;	       // the transactions to monitor
64  
65  	ToMonitor(QueryWatcher query, Collection<Txn> txns) {
66  	    this.query = query;
67  	    this.txns = txns;
68  	}
69      }
70  
71      /**
72       * This list is used to contain requests to monitor interfering
73       * transactions.  We use a list like this so that the
74       * <code>getMatch</code> request that detected the conflict
75       * doesn't have to wait for all the setup before returning -- it
76       * just puts the data on this list and the <code>TxnMonitor</code>
77       * pulls it off using its own thread.
78       * 
79       * @see OutriggerServerImpl#getMatch 
80       */
81      // @see #ToMonitor
82      private final LinkedList<ToMonitor> pending = new LinkedList<ToMonitor>();
83  
84      /** wakeup manager for <code>TxnMonitorTask</code>s */
85      private final WakeupManager wakeupMgr = 
86  	new WakeupManager(new WakeupManager.ThreadDesc(null, true));
87  
88      /**
89       * The manager for <code>TxnMonitorTask</code> objects.
90       */
91      private final ExecutorService taskManager;
92  
93      /**
94       * The space we belong to.  Needed for aborts.
95       */
96      private final OutriggerServerImpl	space;
97  
98      /**
99       * The thread running us.
100      */
101     private final Thread ourThread;
102 
103     private volatile boolean started = false;
104 
105     /** Logger for logging transaction related information */
106     private static final Logger logger = 
107 	Logger.getLogger(OutriggerServerImpl.txnLoggerName);
108 
109     /**
110      * Create a new TxnMonitor.
111      */
112     TxnMonitor(OutriggerServerImpl space,
113 	       Configuration config,
114 	       AccessControlContext context)
115 	throws ConfigurationException 
116     {
117 	if (space == null)
118 	    throw new NullPointerException("space must be non-null");
119 	this.space = space;
120 
121 	taskManager = Config.getNonNullEntry(config,
122 	    OutriggerServerImpl.COMPONENT_NAME, "txnMonitorExecutorService", 
123 	    ExecutorService.class, 
124             new ThreadPoolExecutor(
125                     10,
126                     10, /* Ignored */
127                     15,
128                     TimeUnit.SECONDS, 
129                     new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue */
130                     new NamedThreadFactory("OutriggerServerImpl TxnMonitor", false)
131             )
132         );
133 
134         ourThread = new Thread(this, "TxnMonitor");
135 	ourThread.setDaemon(false);
136 	this.context = context;
137     }
138     
139     public void start(){
140         synchronized (this){
141         ourThread.start();
142             started = true;
143     }
144     }
145 
146     public void destroy() {
147         taskManager.shutdown();
148 	wakeupMgr.stop();	
149         ourThread.interrupt();
150 	synchronized (this) {
151 	    notifyAll();
152 	}
153 
154         try {
155 	    if (started) ourThread.join();
156 	} catch(InterruptedException ie) {
157 	    Thread.currentThread().interrupt(); // restore
158 	}
159     }
160 
161     /**
162      * Return the space we're part of.
163      */
164     OutriggerServerImpl space() {
165 	return space;
166     }
167 
168     /**
169      * Add a set of <code>transactions</code> to be monitored under the
170      * given query.
171      */
172     synchronized void add(QueryWatcher query, Collection<Txn> transactions) {
173 	if (logger.isLoggable(Level.FINEST)) {
174 	    final StringBuffer buf = new StringBuffer();
175 	    buf.append("Setting up monitor for ");
176 	    buf.append(query);
177 	    buf.append(" toMonitor:");
178 	    boolean notFirst = false;
179 	    for (Iterator<Txn> i=transactions.iterator(); i.hasNext();) {
180 		if (notFirst) {
181 		    buf.append(",");
182 		    notFirst = true;
183 		}
184 		buf.append(i.next());
185 	    }
186 	    logger.log(Level.FINEST, buf.toString());
187 	}
188 
189 	pending.add(new ToMonitor(query, transactions));
190 	notifyAll();
191     }
192 
193     /**
194      * Add a set of <code>transactions</code> to be monitored under no
195      * lease.
196      */
197     void add(Collection<Txn> transactions) {
198 	add(null, transactions);
199     }
200 
201     /**
202      * Take pending monitor requests off the queue, creating the
203      * required <code>TxnMonitorTask</code> objects and scheduling them.
204      */
205     public void run() {
206 	try {
207 	    ToMonitor tm;
208 	    while (!Thread.currentThread().isInterrupted())  {
209 		synchronized (this) {
210 		    // Sleep if nothing is pending.
211 		    while (pending.isEmpty()) {
212                         wait();
213                     }
214 		    tm = pending.removeFirst();
215 		}
216 
217 		logger.log(Level.FINER, "creating monitor tasks for {0}",
218 			   tm.query);
219 
220 		Iterator<Txn> it = tm.txns.iterator();
221 		while (it.hasNext()) {
222 		    Txn txn = it.next();
223 		    TxnMonitorTask task = taskFor(txn);
224 		    task.add(tm.query);
225 		}
226 	    }
227 	} catch (InterruptedException e) {
228             Thread.currentThread().interrupt();// restore
229 	}
230     }
231 
232     /**
233      * Return the monitor task for this transaction, creating it if
234      * necessary.
235      */
236     private TxnMonitorTask taskFor(Txn txn) {
237 	TxnMonitorTask task = txn.monitorTask();
238 	if (task == null) {
239 	    logger.log(Level.FINER, "creating TxnMonitorTask for {0}", 
240 			   txn);
241 
242 	    task = new TxnMonitorTask(txn, this, taskManager, wakeupMgr, context);
243 	    txn.monitorTask(task);
244 	    taskManager.execute(task);  // add it after we've set it in the txn
245 	}
246 	return task;
247     }
248 }