View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    * 
10   *      http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.river.norm.event;
19  
20  import java.io.IOException;
21  import java.io.ObjectInputStream;
22  import java.io.ObjectOutputStream;
23  import java.io.Serializable;
24  import java.security.AccessControlContext;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.LinkedBlockingQueue;
27  import java.util.concurrent.ThreadPoolExecutor;
28  import java.util.concurrent.TimeUnit;
29  import org.apache.river.api.io.AtomicSerial;
30  import org.apache.river.api.io.AtomicSerial.GetArg;
31  import org.apache.river.thread.NamedThreadFactory;
32  import org.apache.river.thread.wakeup.WakeupManager;
33  
34  /**
35   * Factory class for <code>EventType</code> objects.  All
36   * <code>EventType</code> objects created by the same generator (or
37   * associated with the same generator by a
38   * <code>EventType.restoreTransientState</code> call) will use the same
39   * thread pool to manage their event send threads.
40   *
41   * @author Sun Microsystems, Inc.
42   * @see EventType 
43   * @see EventType#restoreTransientState
44   */
45  @AtomicSerial
46  public class EventTypeGenerator implements Serializable {
47      private static final long serialVersionUID = 1L;
48  
49      /**
50       * Next event ID.
51       * @serial
52       */
53      private long nextEvID;
54  
55      /**
56       * ExecutorService used to send events
57       */
58      private transient ExecutorService taskManager;
59  
60      /**
61       * Wakeup manager used by the event sending tasks to schedule 
62       * retries.
63       */
64      private transient WakeupManager wakeupManager;
65  
66      /**
67       * Create a new <code>EventType</code> object specify the 
68       * event id it should have.
69       *
70       * @param eventID  the event ID of this type
71       * @param monitor  Object to callback when an event sending
72       *        attempt fails with a definite exception and to 
73       *        ensure that the lease on the event is still current.
74       *        May not be <code>null</code>.
75       * @return the new <code>EventType</code> object
76       * @throws IOException if listener cannot be serialized 
77       */
78      public EventType newEventType(SendMonitor monitor, long eventID, AccessControlContext context)
79  	throws IOException
80      {
81  	return new EventType(this, monitor, eventID, null, null, context);
82      }
83  
84      /**
85       * Called by event types during transient state recovery to ensure
86       * the generator knows about there event ID.
87       * <p>
88       * Note: this method is not synchronized.
89       * @param evID event ID of recovered <code>EventType</code> object
90       */
91      synchronized void recoverEventID(long evID) {
92  	if (evID >= nextEvID)
93  	    nextEvID = evID + 1;
94      }
95  
96      /**
97       * Return the ExecutorService that <code>EventType</code> objects created
98       * by this generator should use to send their events.
99       */
100     ExecutorService getExecutorService() {
101 	return taskManager;
102     }
103 
104     /**
105      * Return the wakeup manager that <code>EventType</code> objects created
106      * by this generator should use to send their events.
107      */
108     WakeupManager getWakeupManager() {
109 	return wakeupManager;
110     }
111 
112     /**
113      * Terminate any independent treads started by event types
114      * associated with this generator.
115      */
116     public void terminate() {
117 	taskManager.shutdown();
118 	wakeupManager.stop();
119 	wakeupManager.cancelAll();
120     }
121 
122     /**
123      * Override <code>readObject</code> to create a <code>TaskManager</code> 
124      * and a <code>WakeupManager</code>.
125      * @see ObjectInputStream#defaultReadObject
126      */
127     private void readObject(ObjectInputStream in)
128 	throws IOException, ClassNotFoundException
129     {
130 	synchronized(this){
131 	    // fill in the object from the stream 
132 	    in.defaultReadObject();
133 
134 	    taskManager = new ThreadPoolExecutor(
135 			10,
136 			10, /* Ignored */
137 			15,
138 			TimeUnit.SECONDS, 
139 			new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue */
140 			new NamedThreadFactory("EventTypeGenerator", false)
141 	    );
142 	    wakeupManager = 
143 		new WakeupManager(new WakeupManager.ThreadDesc(null, false));   
144 	}
145     }
146     
147     private void writeObject(ObjectOutputStream out) throws IOException{
148 	synchronized(this){
149 	    out.defaultWriteObject();
150 	}
151     }
152     
153     public EventTypeGenerator(GetArg arg) throws IOException{
154 	this(arg.get("nextEvID", 1L));
155     }
156     
157     private EventTypeGenerator(long nextEvID){
158 	this.nextEvID = nextEvID;
159 	this.wakeupManager = new WakeupManager(new WakeupManager.ThreadDesc(null, false));
160 	this.taskManager = new ThreadPoolExecutor(
161 		10,
162 		10, /* Ignored */
163 		15,
164 		TimeUnit.SECONDS,
165 		new LinkedBlockingQueue<Runnable>(), /* Unbounded queue */
166 		new NamedThreadFactory("EventTypeGenerator", false)
167 	);
168     }
169     
170     public EventTypeGenerator(){
171 	this.nextEvID = 1;
172 	this.wakeupManager = new WakeupManager(new WakeupManager.ThreadDesc(null, false));
173 	this.taskManager = new ThreadPoolExecutor(
174 		10,
175 		10, /* Ignored */
176 		15,
177 		TimeUnit.SECONDS,
178 		new LinkedBlockingQueue<Runnable>(), /* Unbounded queue */
179 		new NamedThreadFactory("EventTypeGenerator", false)
180 	);
181     }
182 }