1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.river.norm.event;
19
20 import java.io.IOException;
21 import java.io.ObjectInputStream;
22 import java.io.ObjectOutputStream;
23 import java.io.Serializable;
24 import java.security.AccessControlContext;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import org.apache.river.api.io.AtomicSerial;
30 import org.apache.river.api.io.AtomicSerial.GetArg;
31 import org.apache.river.thread.NamedThreadFactory;
32 import org.apache.river.thread.wakeup.WakeupManager;
33
34
35
36
37
38
39
40
41
42
43
44
45 @AtomicSerial
46 public class EventTypeGenerator implements Serializable {
47 private static final long serialVersionUID = 1L;
48
49
50
51
52
53 private long nextEvID;
54
55
56
57
58 private transient ExecutorService taskManager;
59
60
61
62
63
64 private transient WakeupManager wakeupManager;
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public EventType newEventType(SendMonitor monitor, long eventID, AccessControlContext context)
79 throws IOException
80 {
81 return new EventType(this, monitor, eventID, null, null, context);
82 }
83
84
85
86
87
88
89
90
91 synchronized void recoverEventID(long evID) {
92 if (evID >= nextEvID)
93 nextEvID = evID + 1;
94 }
95
96
97
98
99
100 ExecutorService getExecutorService() {
101 return taskManager;
102 }
103
104
105
106
107
108 WakeupManager getWakeupManager() {
109 return wakeupManager;
110 }
111
112
113
114
115
116 public void terminate() {
117 taskManager.shutdown();
118 wakeupManager.stop();
119 wakeupManager.cancelAll();
120 }
121
122
123
124
125
126
127 private void readObject(ObjectInputStream in)
128 throws IOException, ClassNotFoundException
129 {
130 synchronized(this){
131
132 in.defaultReadObject();
133
134 taskManager = new ThreadPoolExecutor(
135 10,
136 10,
137 15,
138 TimeUnit.SECONDS,
139 new LinkedBlockingQueue<Runnable>(),
140 new NamedThreadFactory("EventTypeGenerator", false)
141 );
142 wakeupManager =
143 new WakeupManager(new WakeupManager.ThreadDesc(null, false));
144 }
145 }
146
147 private void writeObject(ObjectOutputStream out) throws IOException{
148 synchronized(this){
149 out.defaultWriteObject();
150 }
151 }
152
153 public EventTypeGenerator(GetArg arg) throws IOException{
154 this(arg.get("nextEvID", 1L));
155 }
156
157 private EventTypeGenerator(long nextEvID){
158 this.nextEvID = nextEvID;
159 this.wakeupManager = new WakeupManager(new WakeupManager.ThreadDesc(null, false));
160 this.taskManager = new ThreadPoolExecutor(
161 10,
162 10,
163 15,
164 TimeUnit.SECONDS,
165 new LinkedBlockingQueue<Runnable>(),
166 new NamedThreadFactory("EventTypeGenerator", false)
167 );
168 }
169
170 public EventTypeGenerator(){
171 this.nextEvID = 1;
172 this.wakeupManager = new WakeupManager(new WakeupManager.ThreadDesc(null, false));
173 this.taskManager = new ThreadPoolExecutor(
174 10,
175 10,
176 15,
177 TimeUnit.SECONDS,
178 new LinkedBlockingQueue<Runnable>(),
179 new NamedThreadFactory("EventTypeGenerator", false)
180 );
181 }
182 }