1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.river.outrigger;
19
20 import java.security.AccessControlContext;
21 import org.apache.river.config.Config;
22 import org.apache.river.thread.wakeup.WakeupManager;
23
24 import net.jini.config.Configuration;
25 import net.jini.config.ConfigurationException;
26
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.LinkedList;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36 import org.apache.river.thread.NamedThreadFactory;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 class TxnMonitor implements Runnable {
52
53 private final AccessControlContext context;
54
55
56
57
58
59
60
61 private static class ToMonitor {
62 final QueryWatcher query;
63 final Collection<Txn> txns;
64
65 ToMonitor(QueryWatcher query, Collection<Txn> txns) {
66 this.query = query;
67 this.txns = txns;
68 }
69 }
70
71
72
73
74
75
76
77
78
79
80
81
82 private final LinkedList<ToMonitor> pending = new LinkedList<ToMonitor>();
83
84
85 private final WakeupManager wakeupMgr =
86 new WakeupManager(new WakeupManager.ThreadDesc(null, true));
87
88
89
90
91 private final ExecutorService taskManager;
92
93
94
95
96 private final OutriggerServerImpl space;
97
98
99
100
101 private final Thread ourThread;
102
103 private volatile boolean started = false;
104
105
106 private static final Logger logger =
107 Logger.getLogger(OutriggerServerImpl.txnLoggerName);
108
109
110
111
112 TxnMonitor(OutriggerServerImpl space,
113 Configuration config,
114 AccessControlContext context)
115 throws ConfigurationException
116 {
117 if (space == null)
118 throw new NullPointerException("space must be non-null");
119 this.space = space;
120
121 taskManager = Config.getNonNullEntry(config,
122 OutriggerServerImpl.COMPONENT_NAME, "txnMonitorExecutorService",
123 ExecutorService.class,
124 new ThreadPoolExecutor(
125 10,
126 10,
127 15,
128 TimeUnit.SECONDS,
129 new LinkedBlockingQueue<Runnable>(),
130 new NamedThreadFactory("OutriggerServerImpl TxnMonitor", false)
131 )
132 );
133
134 ourThread = new Thread(this, "TxnMonitor");
135 ourThread.setDaemon(false);
136 this.context = context;
137 }
138
139 public void start(){
140 synchronized (this){
141 ourThread.start();
142 started = true;
143 }
144 }
145
146 public void destroy() {
147 taskManager.shutdown();
148 wakeupMgr.stop();
149 ourThread.interrupt();
150 synchronized (this) {
151 notifyAll();
152 }
153
154 try {
155 if (started) ourThread.join();
156 } catch(InterruptedException ie) {
157 Thread.currentThread().interrupt();
158 }
159 }
160
161
162
163
164 OutriggerServerImpl space() {
165 return space;
166 }
167
168
169
170
171
172 synchronized void add(QueryWatcher query, Collection<Txn> transactions) {
173 if (logger.isLoggable(Level.FINEST)) {
174 final StringBuffer buf = new StringBuffer();
175 buf.append("Setting up monitor for ");
176 buf.append(query);
177 buf.append(" toMonitor:");
178 boolean notFirst = false;
179 for (Iterator<Txn> i=transactions.iterator(); i.hasNext();) {
180 if (notFirst) {
181 buf.append(",");
182 notFirst = true;
183 }
184 buf.append(i.next());
185 }
186 logger.log(Level.FINEST, buf.toString());
187 }
188
189 pending.add(new ToMonitor(query, transactions));
190 notifyAll();
191 }
192
193
194
195
196
197 void add(Collection<Txn> transactions) {
198 add(null, transactions);
199 }
200
201
202
203
204
205 public void run() {
206 try {
207 ToMonitor tm;
208 while (!Thread.currentThread().isInterrupted()) {
209 synchronized (this) {
210
211 while (pending.isEmpty()) {
212 wait();
213 }
214 tm = pending.removeFirst();
215 }
216
217 logger.log(Level.FINER, "creating monitor tasks for {0}",
218 tm.query);
219
220 Iterator<Txn> it = tm.txns.iterator();
221 while (it.hasNext()) {
222 Txn txn = it.next();
223 TxnMonitorTask task = taskFor(txn);
224 task.add(tm.query);
225 }
226 }
227 } catch (InterruptedException e) {
228 Thread.currentThread().interrupt();
229 }
230 }
231
232
233
234
235
236 private TxnMonitorTask taskFor(Txn txn) {
237 TxnMonitorTask task = txn.monitorTask();
238 if (task == null) {
239 logger.log(Level.FINER, "creating TxnMonitorTask for {0}",
240 txn);
241
242 task = new TxnMonitorTask(txn, this, taskManager, wakeupMgr, context);
243 txn.monitorTask(task);
244 taskManager.execute(task);
245 }
246 return task;
247 }
248 }