1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.river.reggie;
19
20 import java.io.DataInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.io.ObjectInput;
25 import java.io.ObjectInputStream;
26 import java.io.ObjectOutputStream;
27 import java.io.OutputStream;
28 import java.io.Serializable;
29 import java.lang.reflect.Array;
30 import java.net.DatagramPacket;
31 import java.net.InetAddress;
32 import java.net.InetSocketAddress;
33 import java.net.MulticastSocket;
34 import java.net.NetworkInterface;
35 import java.net.ServerSocket;
36 import java.net.Socket;
37 import java.net.SocketException;
38 import java.net.SocketTimeoutException;
39 import java.net.UnknownHostException;
40 import java.nio.BufferUnderflowException;
41 import java.nio.ByteBuffer;
42 import java.rmi.MarshalledObject;
43 import java.rmi.NoSuchObjectException;
44 import java.rmi.RemoteException;
45 import java.rmi.activation.ActivationException;
46 import java.rmi.activation.ActivationID;
47 import java.rmi.activation.ActivationSystem;
48 import java.security.AccessControlContext;
49 import java.security.AccessController;
50 import java.security.PrivilegedAction;
51 import java.security.PrivilegedActionException;
52 import java.security.PrivilegedExceptionAction;
53 import java.util.ArrayList;
54 import java.util.Arrays;
55 import java.util.Collections;
56 import java.util.Enumeration;
57 import java.util.HashMap;
58 import java.util.Iterator;
59 import java.util.LinkedList;
60 import java.util.List;
61 import java.util.Map;
62 import java.util.NoSuchElementException;
63 import java.util.PriorityQueue;
64 import java.util.Queue;
65 import java.util.Random;
66 import java.util.Set;
67 import java.util.SortedSet;
68 import java.util.TreeSet;
69 import java.util.concurrent.Callable;
70 import java.util.concurrent.ConcurrentSkipListSet;
71 import java.util.concurrent.ExecutorService;
72 import java.util.concurrent.LinkedBlockingQueue;
73 import java.util.concurrent.PriorityBlockingQueue;
74 import java.util.concurrent.ScheduledExecutorService;
75 import java.util.concurrent.ScheduledThreadPoolExecutor;
76 import java.util.concurrent.ThreadPoolExecutor;
77 import java.util.concurrent.TimeUnit;
78 import java.util.concurrent.atomic.AtomicInteger;
79 import java.util.concurrent.atomic.AtomicLong;
80 import java.util.concurrent.locks.Condition;
81 import java.util.logging.Level;
82 import java.util.logging.Logger;
83 import javax.net.ServerSocketFactory;
84 import javax.net.SocketFactory;
85 import javax.security.auth.Subject;
86 import javax.security.auth.login.LoginContext;
87 import javax.security.auth.login.LoginException;
88 import net.jini.activation.ActivationExporter;
89 import net.jini.activation.ActivationGroup;
90 import net.jini.config.Configuration;
91 import net.jini.config.ConfigurationException;
92 import net.jini.config.ConfigurationProvider;
93 import net.jini.config.NoSuchEntryException;
94 import net.jini.constraint.BasicMethodConstraints;
95 import net.jini.core.constraint.InvocationConstraints;
96 import net.jini.core.constraint.MethodConstraints;
97 import net.jini.core.constraint.RemoteMethodControl;
98 import net.jini.core.discovery.LookupLocator;
99 import net.jini.core.entry.Entry;
100 import net.jini.core.event.EventRegistration;
101 import net.jini.core.event.RemoteEventListener;
102 import net.jini.core.lease.Lease;
103 import net.jini.core.lease.UnknownLeaseException;
104 import net.jini.core.lookup.ServiceID;
105 import net.jini.core.lookup.ServiceItem;
106 import net.jini.core.lookup.ServiceRegistrar;
107 import net.jini.core.lookup.ServiceRegistration;
108 import net.jini.discovery.Constants;
109 import net.jini.discovery.ConstrainableLookupLocator;
110 import net.jini.discovery.DiscoveryGroupManagement;
111 import net.jini.discovery.DiscoveryLocatorManagement;
112 import net.jini.discovery.DiscoveryManagement;
113 import net.jini.discovery.LookupDiscoveryManager;
114 import net.jini.export.Exporter;
115 import net.jini.export.ProxyAccessor;
116 import net.jini.lookup.ServiceAttributesAccessor;
117 import net.jini.export.CodebaseAccessor;
118 import net.jini.lookup.ServiceIDAccessor;
119 import net.jini.lookup.ServiceProxyAccessor;
120 import net.jini.id.ReferentUuid;
121 import net.jini.id.Uuid;
122 import net.jini.id.UuidFactory;
123 import net.jini.io.MarshalledInstance;
124 import net.jini.io.UnsupportedConstraintException;
125 import net.jini.jeri.AtomicILFactory;
126 import net.jini.jeri.BasicILFactory;
127 import net.jini.jeri.BasicJeriExporter;
128 import net.jini.jeri.tcp.TcpServerEndpoint;
129 import net.jini.lookup.JoinManager;
130 import net.jini.lookup.entry.ServiceInfo;
131 import net.jini.security.BasicProxyPreparer;
132 import net.jini.security.ProxyPreparer;
133 import net.jini.security.Security;
134 import net.jini.security.TrustVerifier;
135 import net.jini.security.proxytrust.ServerProxyTrust;
136 import org.apache.river.api.io.AtomicMarshalledInstance;
137 import org.apache.river.api.io.AtomicSerial;
138 import org.apache.river.api.io.AtomicSerial.GetArg;
139 import org.apache.river.api.io.AtomicSerial.ReadInput;
140 import org.apache.river.api.io.AtomicSerial.ReadObject;
141 import org.apache.river.api.util.Startable;
142 import org.apache.river.config.Config;
143 import org.apache.river.config.LocalHostLookup;
144 import org.apache.river.constants.ThrowableConstants;
145 import org.apache.river.constants.VersionConstants;
146 import org.apache.river.discovery.ClientSubjectChecker;
147 import org.apache.river.discovery.Discovery;
148 import org.apache.river.discovery.DiscoveryConstraints;
149 import org.apache.river.discovery.DiscoveryProtocolException;
150 import org.apache.river.discovery.EncodeIterator;
151 import org.apache.river.discovery.MulticastAnnouncement;
152 import org.apache.river.discovery.MulticastRequest;
153 import org.apache.river.discovery.UnicastResponse;
154 import org.apache.river.logging.Levels;
155 import org.apache.river.lookup.entry.BasicServiceType;
156 import org.apache.river.proxy.CodebaseProvider;
157 import org.apache.river.proxy.MarshalledWrapper;
158 import org.apache.river.reliableLog.LogHandler;
159 import org.apache.river.reliableLog.ReliableLog;
160 import org.apache.river.start.lifecycle.LifeCycle;
161 import org.apache.river.thread.InterruptedStatusThread;
162 import org.apache.river.thread.InterruptedStatusThread.Interruptable;
163 import org.apache.river.thread.NamedThreadFactory;
164 import org.apache.river.thread.ReadersWriter;
165 import org.apache.river.thread.ReadersWriter.ConcurrentLockException;
166 import org.apache.river.thread.SynchronousExecutors;
167 import org.apache.river.reggie.proxy.*;
168
169
170
171
172
173
174
175
176
177
178
179
180 class RegistrarImpl implements Registrar, ProxyAccessor, ServerProxyTrust, Startable,
181 ServiceProxyAccessor, ServiceAttributesAccessor, ServiceIDAccessor,
182 CodebaseAccessor
183 {
184
185
186 private static final long MAX_LEASE = 1000L * 60 * 60 * 24 * 365 * 1000;
187
188 private static final long MAX_RENEW = 1000L * 60 * 60 * 24 * 365;
189
190 private static final int DEFAULT_MAX_PACKET_SIZE = 512;
191
192 private static final int DEFAULT_MULTICAST_TTL = 15;
193
194 private static final int DEFAULT_SOCKET_TIMEOUT = 1*60*1000;
195
196 private static final int LOG_VERSION = 3;
197
198 private static final String COMPONENT = "org.apache.river.reggie";
199
200 private static final Uuid myLeaseID = UuidFactory.create(0L, 0L);
201
202 private static final Logger LOGGER = Logger.getLogger(COMPONENT + ".service");
203
204 private static final Logger EVENT_LOGGER = Logger.getLogger(COMPONENT + ".event");
205
206 private static final Logger DISCOVERY_LOGGER = Logger.getLogger(COMPONENT + ".discovery");
207
208 private static final Logger SNAPSHOT_LOGGER = Logger.getLogger(COMPONENT + ".snapshot");
209
210
211 private static final Entry[] baseAttrs = {
212 new ServiceInfo(
213 "Lookup", "Sun Microsystems, Inc.", "Sun Microsystems, Inc.",
214 VersionConstants.SERVER_VERSION, "", ""),
215 new BasicServiceType("Lookup")
216 };
217
218 private static final EntryRep[] emptyAttrs = {};
219
220
221 private RegistrarProxy proxy;
222
223 private volatile Exporter serverExporter;
224
225 private Registrar myRef;
226
227 private volatile ServiceID myServiceID;
228
229 private final ActivationID activationID;
230
231 private final ActivationSystem activationSystem;
232
233 private volatile LookupLocator myLocator;
234
235 private final LoginContext loginContext;
236
237 private final LifeCycle lifeCycle;
238
239
240 private final ServerSocketFactory serverSocketFactory ;
241 private final SocketFactory socketFactory;
242
243
244
245
246
247 private final Map<ServiceID,SvcReg> serviceByID = new HashMap<ServiceID,SvcReg>(200);
248
249
250
251
252 private final SortedSet<SvcReg> serviceByTime = new TreeSet<SvcReg>();
253
254
255
256
257 private final Map<String,Map<ServiceID,SvcReg>> serviceByTypeName
258 = new HashMap<String,Map<ServiceID,SvcReg>>(200);
259
260
261
262
263
264
265
266
267
268
269
270 private final Map<EntryClass,Map<Object,List<SvcReg>>[]> serviceByAttr
271 = new HashMap<EntryClass,Map<Object,List<SvcReg>>[]>(23);
272
273
274
275
276
277
278 private final Map<EntryClass,List<SvcReg>> serviceByEmptyAttr
279 = new HashMap<EntryClass,List<SvcReg>>(11);
280
281 private final List<EntryClass> entryClasses = new ArrayList<EntryClass>();
282
283
284
285
286 private final Map<Long,EventReg> eventByID = new HashMap<Long,EventReg>(200);
287
288
289
290
291 private final Queue<EventReg> eventByTime = new PriorityQueue<EventReg>();
292
293
294
295
296
297 private final Map<ServiceID,Object> subEventByService = new HashMap<ServiceID,Object>(200);
298
299
300
301
302 private final Map<Long,EventReg> subEventByID = new HashMap<Long,EventReg>(200);
303
304
305 private final UuidGenerator resourceIdGenerator;
306
307 private final UuidGenerator serviceIdGenerator;
308
309 private long eventID = 0;
310
311 private final Random random = new Random();
312
313
314 private final ProxyPreparer listenerPreparer;
315
316 private final ProxyPreparer recoveredListenerPreparer;
317
318 private final ProxyPreparer locatorPreparer;
319
320 private final ProxyPreparer recoveredLocatorPreparer;
321
322
323 private long maxServiceLease;
324
325 private long maxEventLease;
326
327 private long minSvcExpiration = Long.MAX_VALUE;
328
329 private long minEventExpiration = Long.MAX_VALUE;
330
331
332 private final DiscoveryManagement discoer;
333
334 private volatile JoinManager joiner;
335
336 private final SynchronousExecutors eventNotifierExec;
337 private final Map<EventReg,ExecutorService> eventTaskMap;
338
339 private final ExecutorService discoveryResponseExec;
340
341 private final Thread serviceExpirer;
342
343 private final Thread eventExpirer;
344
345 private volatile Thread unicaster;
346 private volatile Unicast unicast;
347
348 private final Thread multicaster;
349
350 private final Thread announcer;
351
352 private final Thread snapshotter;
353
354
355 private final ReadersWriter concurrentObj;
356
357 private final Condition serviceNotifier;
358
359 private final Condition eventNotifier;
360
361 private final Condition snapshotNotifier;
362
363
364 private final ServiceType objectServiceType;
365
366
367 private final ReliableLog log;
368
369 private volatile boolean inRecovery;
370
371 private final AtomicInteger logFileSize = new AtomicInteger();
372
373
374 private final int persistenceSnapshotThreshold ;
375
376 private final float persistenceSnapshotWeight;
377
378 private final long minMaxServiceLease;
379
380 private final long minMaxEventLease;
381
382 private final long minRenewalInterval;
383
384 private volatile int unicastPort;
385 private int httpsUnicastPort;
386 private boolean enableHttpsUnicast;
387 private Discovery httpsDiscovery;
388
389 private volatile String[] memberGroups;
390
391 private volatile String[] lookupGroups;
392
393 private volatile LookupLocator[] lookupLocators;
394
395 private volatile Entry[] lookupAttrs;
396
397 private final long multicastAnnouncementInterval;
398
399 private final AtomicLong announcementSeqNo = new AtomicLong();
400
401
402 private final NetworkInterface[] multicastInterfaces;
403
404 private final boolean multicastInterfacesSpecified;
405
406 private final int multicastInterfaceRetryInterval;
407
408 private final Discovery protocol2;
409
410 private final InvocationConstraints rawUnicastDiscoveryConstraints;
411
412 private final DiscoveryConstraints multicastRequestConstraints;
413
414 private final DiscoveryConstraints multicastAnnouncementConstraints;
415
416 private final DiscoveryConstraints unicastDiscoveryConstraints;
417
418 private final ClientSubjectChecker multicastRequestSubjectChecker;
419
420 private final long unexportTimeout;
421
422 private final long unexportWait;
423
424 private final ClientSubjectChecker unicastDiscoverySubjectChecker;
425
426
427 private String unicastDiscoveryHost;
428 private Configuration config;
429 private Exception constructionException;
430 private final AccessControlContext context;
431 private final String certFactoryType;
432 private final String certPathEncoding;
433 private final byte[] encodedCerts;
434 private final String codebase;
435
436
437
438
439
440
441
442
443
444
445 RegistrarImpl(String[] configArgs,
446 final ActivationID activationID,
447 final boolean persistent,
448 final LifeCycle lifeCycle)
449 throws Exception
450 {
451 this(ConfigurationProvider.getInstance(
452 configArgs, RegistrarImpl.class.getClassLoader())
453 ,activationID,persistent,lifeCycle);
454 }
455
456
457
458
459
460
461
462
463
464 RegistrarImpl(final Configuration config,
465 final ActivationID activationID,
466 final boolean persistent,
467 final LifeCycle lifeCycle)
468 throws Exception
469 {
470 this(loginAndRun(config,activationID,persistent,lifeCycle));
471 }
472
473
474
475 private static Initializer loginAndRun( final Configuration config,
476 final ActivationID activationID,
477 final boolean persistent,
478 final LifeCycle lifeCycle)
479 throws Exception
480 {
481
482 Initializer result = null;
483 try {
484 if (activationID != null && !persistent) {
485 throw new IllegalArgumentException();
486 }
487 final LoginContext loginContext = (LoginContext) config.getEntry(
488 COMPONENT, "loginContext", LoginContext.class, null);
489
490 PrivilegedExceptionAction<Initializer> init = new PrivilegedExceptionAction<Initializer>() {
491 public Initializer run() throws Exception {
492 return new Initializer(config,
493 activationID, persistent, lifeCycle, loginContext);
494 }
495 };
496 if (loginContext != null) {
497 loginContext.login();
498 try {
499 result = Subject.doAsPrivileged(
500 loginContext.getSubject(), init, null);
501 } catch (PrivilegedActionException e) {
502 throw e.getCause();
503 }
504 } else {
505 result = init.run();
506 }
507 return result;
508 } catch (Throwable t) {
509 LOGGER.log(Level.SEVERE, "Reggie initialization failed", t);
510 if (t instanceof Exception) {
511 throw (Exception) t;
512 } else {
513 throw (Error) t;
514 }
515 }
516 }
517
518 private RegistrarImpl(Initializer init){
519 this.concurrentObj = new ReadersWriter();
520 this.snapshotNotifier = concurrentObj.newCondition();
521 this.eventNotifier = concurrentObj.newCondition();
522 this.serviceNotifier = concurrentObj.newCondition();
523 this.codebase = init.codebase;
524 this.certFactoryType = init.certFactoryType;
525 this.certPathEncoding = init.certPathEncoding;
526 this.encodedCerts = init.encodedCerts.clone();
527 lifeCycle = init.lifeCycle;
528 serverSocketFactory = init.serverSocketFactory;
529 persistenceSnapshotThreshold = init.persistenceSnapshotThreshold;
530 socketFactory = init.socketFactory;
531 recoveredListenerPreparer = init.recoveredListenerPreparer;
532 persistenceSnapshotWeight = init.persistenceSnapshotWeight;
533 recoveredLocatorPreparer = init.recoveredLocatorPreparer;
534 inRecovery = init.inRecovery;
535 activationID = init.activationID;
536 activationSystem = init.activationSystem;
537 serverExporter = init.serverExporter;
538 lookupGroups = init.lookupGroups;
539 lookupLocators = init.lookupLocators;
540 memberGroups = init.memberGroups;
541 unicastPort = init.unicastPort;
542 httpsUnicastPort = init.httpsUnicastPort;
543 enableHttpsUnicast = init.enableHttpsUnicast;
544 lookupAttrs = init.lookupAttrs;
545 discoer = init.discoer;
546 listenerPreparer = init.listenerPreparer;
547 locatorPreparer = init.locatorPreparer;
548 minMaxEventLease = init.minMaxEventLease;
549 minMaxServiceLease = init.minMaxServiceLease;
550 minRenewalInterval = init.minRenewalInterval;
551 multicastAnnouncementInterval = init.multicastAnnouncementInterval;
552 multicastInterfaceRetryInterval = init.multicastInterfaceRetryInterval;
553 multicastInterfaces = init.multicastInterfaces;
554 multicastInterfacesSpecified = init.multicastInterfacesSpecified;
555 resourceIdGenerator = init.resourceIdGenerator;
556 serviceIdGenerator = init.serviceIdGenerator;
557 unexportTimeout = init.unexportTimeout;
558 unexportWait = init.unexportWait;
559 objectServiceType = init.objectServiceType;
560 unicastDiscoverySubjectChecker = init.unicastDiscoverySubjectChecker;
561 protocol2 = init.protocol2;
562 rawUnicastDiscoveryConstraints = init.rawUnicastDiscoveryConstraints;
563 multicastRequestConstraints = init.multicastRequestConstraints;
564 multicastAnnouncementConstraints = init.multicastAnnouncementConstraints;
565 unicastDiscoveryConstraints = init.unicastDiscoveryConstraints;
566 context = init.context;
567 eventNotifierExec = new SynchronousExecutors(init.scheduledExecutor);
568 eventTaskMap = new HashMap<EventReg,ExecutorService>(200);
569 discoveryResponseExec = init.executor;
570 ReliableLog log = null;
571 Thread serviceExpirer = null;
572 Thread eventExpirer = null;
573 Thread unicaster = null;
574 Thread multicaster = null;
575 Thread announcer = null;
576 Thread snapshotter = null;
577
578 try {
579
580 List<Thread> threads = AccessController.doPrivileged(new PrivilegedExceptionAction<List<Thread>>(){
581
582 @Override
583 public List<Thread> run() throws Exception {
584 Thread t;
585 List<Thread> list = new ArrayList<Thread>(6);
586 list.add(newThread(new ServiceExpire(RegistrarImpl.this), "service expire"));
587 list.add(newThread(new EventExpire(RegistrarImpl.this),"event expire"));
588 unicast = new Unicast(RegistrarImpl.this, unicastPort);
589 list.add(newInterruptStatusThread(unicast, "unicast request"));
590 list.add(newInterruptStatusThread(new Multicast(RegistrarImpl.this), "multicast request"));
591 list.add(newThread(new Announce(RegistrarImpl.this),"discovery announcement"));
592 list.add(newThread(new Snapshot(RegistrarImpl.this),"snapshot thread"));
593 return list;
594 }
595
596 private Thread newThread(Runnable r, String name){
597 Thread t = new Thread(r,name);
598 t.setDaemon(false);
599 return t;
600 }
601
602 private Thread newInterruptStatusThread(Runnable r, String name){
603 Thread t = new InterruptedStatusThread(r,name);
604 t.setDaemon(false);
605 return t;
606 }
607
608 }, context);
609 serviceExpirer = threads.get(0);
610 eventExpirer = threads.get(1);
611 unicaster = threads.get(2);
612 multicaster = threads.get(3);
613 announcer = threads.get(4);
614 snapshotter = threads.get(5);
615 if (init.persistent){
616 log = new ReliableLog(init.persistenceDirectory, new LocalLogHandler(this));
617 if (SNAPSHOT_LOGGER.isLoggable(Level.CONFIG)) {
618 SNAPSHOT_LOGGER.log(Level.CONFIG, "using persistence directory {0}",
619 new Object[]{ init.persistenceDirectory });
620 }
621 } else {
622 log = null;
623 }
624
625 constructionException = null;
626 } catch (PrivilegedActionException ex) {
627 constructionException = ex.getException();
628 } catch (IOException ex) {
629 constructionException = ex;
630 } finally {
631 this.log = log;
632 this.serviceExpirer = serviceExpirer;
633 this.eventExpirer = eventExpirer;
634 this.unicaster = unicaster;
635 this.multicaster = multicaster;
636 this.announcer = announcer;
637 this.snapshotter = snapshotter;
638 }
639 multicastRequestSubjectChecker = init.multicastRequestSubjectChecker;
640 loginContext = init.loginContext;
641 unicastDiscoveryHost = init.unicastDiscoveryHost;
642 config = init.config;
643 }
644
645 @Override
646 public Entry[] getServiceAttributes() throws IOException {
647 return getLookupAttributes();
648 }
649
650 @Override
651 public ServiceID serviceID() throws IOException {
652 return myServiceID;
653 }
654
655 @Override
656 public String getClassAnnotation() throws IOException {
657 return "".equals(codebase) ?
658 CodebaseProvider.getClassAnnotation(RegistrarProxy.class)
659 : codebase;
660 }
661
662 @Override
663 public String getCertFactoryType() throws IOException {
664 return certFactoryType;
665 }
666
667 @Override
668 public String getCertPathEncoding() throws IOException {
669 return certPathEncoding;
670 }
671
672 @Override
673 public byte[] getEncodedCerts() throws IOException {
674 return encodedCerts.clone();
675 }
676
677
678 @AtomicSerial
679 private final static class SvcReg implements Comparable, Serializable {
680
681 private static final long serialVersionUID = 2L;
682
683
684
685
686
687
688 public final Item item;
689
690
691
692
693
694 public final Uuid leaseID;
695
696
697
698
699
700 public volatile long leaseExpiration;
701
702 public SvcReg(GetArg arg) throws IOException{
703 this( (Item) arg.get("item", null),
704 (Uuid) arg.get("leaseID", null),
705 arg.get("leaseExpiration", 0L)
706 );
707 }
708
709
710 public SvcReg(Item item, Uuid leaseID, long leaseExpiration) {
711 this.item = item;
712 this.leaseID = leaseID;
713 this.leaseExpiration = leaseExpiration;
714 }
715
716
717
718
719
720
721 public int compareTo(Object obj) {
722 SvcReg reg = (SvcReg)obj;
723 if (this == reg)
724 return 0;
725 int i = compare(leaseExpiration, reg.leaseExpiration);
726 if (i != 0) {
727 return i;
728 }
729 i = compare(leaseID.getMostSignificantBits(),
730 reg.leaseID.getMostSignificantBits());
731 if (i != 0) {
732 return i;
733 }
734 return compare(leaseID.getLeastSignificantBits(),
735 reg.leaseID.getLeastSignificantBits());
736 }
737
738
739
740
741
742 private static int compare(long l1, long l2) {
743 return (l1 < l2) ? -1 : ((l1 > l2) ? 1 : 0);
744 }
745 }
746
747
748 @AtomicSerial
749 private final static class EventReg implements Comparable, Serializable {
750
751 private static final long serialVersionUID = 2L;
752
753
754
755
756
757 public final long eventID;
758
759
760
761
762 public final Uuid leaseID;
763
764
765
766
767 public final Template tmpl;
768
769
770
771
772
773 public final int transitions;
774
775
776
777
778
779 public long seqNo;
780
781
782
783 public transient RemoteEventListener listener;
784
785
786
787
788
789 public final Object handback;
790
791
792
793
794
795 private long leaseExpiration;
796
797
798
799
800 transient boolean newNotify;
801
802 public EventReg(GetArg arg) throws IOException {
803 this(arg.get("eventID", 0L),
804 arg.get("leaseID", null, Uuid.class),
805 arg.get("tmpl", null, Template.class),
806 arg.get("transitions", 0),
807 ((RO)arg.getReader()).listener,
808 arg.get("handback", null),
809 arg.get("leaseExpiration", 0L),
810 true
811 );
812 seqNo = arg.get("seqNo", 0L);
813 }
814
815 @ReadInput
816 static ReadObject getReader(){
817 return new RO();
818 }
819
820 private static class RO implements ReadObject {
821
822 RemoteEventListener listener;
823
824 @Override
825 public void read(ObjectInput input) throws IOException, ClassNotFoundException {
826
827 MarshalledInstance mi = (MarshalledInstance) input.readObject();
828 try {
829 listener = (RemoteEventListener) mi.get(false);
830 } catch (Throwable e) {
831 if (e instanceof Error &&
832 ThrowableConstants.retryable(e) ==
833 ThrowableConstants.BAD_OBJECT)
834 {
835 throw (Error) e;
836 }
837 EVENT_LOGGER.log(Level.WARNING,
838 "failed to recover event listener", e);
839 }
840 }
841
842 }
843
844
845
846 public EventReg(long eventID, Uuid leaseID, Template tmpl,
847 int transitions, RemoteEventListener listener,
848 Object handback, long leaseExpiration, boolean newNotify) {
849 if (listener == null) throw new NullPointerException("Listener cannot be null");
850 this.eventID = eventID;
851 this.leaseID = leaseID;
852 this.tmpl = tmpl;
853 this.transitions = transitions;
854 this.seqNo = 0;
855 this.listener = listener;
856 this.handback = handback;
857 this.leaseExpiration = leaseExpiration;
858 this.newNotify = newNotify;
859 }
860
861 long incrementAndGetSeqNo(){
862 return ++seqNo;
863 }
864
865 long getSeqNo(){
866 return seqNo;
867 }
868
869 @Override
870 public int hashCode() {
871 int hash = 7;
872 hash = 97 * hash + (int) (this.eventID ^ (this.eventID >>> 32));
873 hash = 97 * hash + (this.leaseID != null ? this.leaseID.hashCode() : 0);
874 hash = 97 * hash + this.transitions;
875 hash = 97 * hash + (this.handback != null ? this.handback.hashCode() : 0);
876 return hash;
877 }
878
879 @Override
880 public boolean equals(Object o){
881 if (this == o) return true;
882 if (!(o instanceof EventReg)) return false;
883 EventReg that = (EventReg) o;
884 if (this.eventID != that.eventID) return false;
885 if (this.transitions != that.transitions) return false;
886 if (!this.leaseID.equals(that.leaseID)) return false;
887 return this.handback.equals(that.handback);
888 }
889
890
891
892
893
894
895 public int compareTo(Object obj) {
896 if (equals(obj)) return 0;
897 EventReg reg = (EventReg)obj;
898 if (getLeaseExpiration() < reg.getLeaseExpiration() ||
899 (getLeaseExpiration() == reg.getLeaseExpiration() &&
900 eventID < reg.eventID))
901 return -1;
902 return 1;
903 }
904
905
906
907
908
909 void prepareListener(ProxyPreparer preparer) {
910 if (listener != null) {
911 try {
912 listener =
913 (RemoteEventListener) preparer.prepareProxy(listener);
914 } catch (Exception e) {
915 if (EVENT_LOGGER.isLoggable(Level.WARNING)) {
916 logThrow(EVENT_LOGGER, Level.WARNING,
917 getClass().getName(),
918 "prepareListener",
919 "failed to prepare event listener {0}",
920 new Object[]{ listener },
921 e);
922 }
923 listener = null;
924 }
925 seqNo += Integer.MAX_VALUE;
926 }
927 }
928
929
930
931
932 private void writeObject(ObjectOutputStream stream)
933 throws IOException
934 {
935 stream.defaultWriteObject();
936 stream.writeObject(new AtomicMarshalledInstance(listener));
937 }
938
939
940
941
942 private void readObject(ObjectInputStream stream)
943 throws IOException, ClassNotFoundException
944 {
945 stream.defaultReadObject();
946 MarshalledInstance mi = (MarshalledInstance) stream.readObject();
947 try {
948 listener = (RemoteEventListener) mi.get(false);
949 } catch (Throwable e) {
950 if (e instanceof Error &&
951 ThrowableConstants.retryable(e) ==
952 ThrowableConstants.BAD_OBJECT)
953 {
954 throw (Error) e;
955 }
956 EVENT_LOGGER.log(Level.WARNING,
957 "failed to recover event listener", e);
958 }
959 }
960
961
962
963
964 synchronized long getLeaseExpiration() {
965 return leaseExpiration;
966 }
967
968
969
970
971 synchronized void setLeaseExpiration(long leaseExpiration) {
972 this.leaseExpiration = leaseExpiration;
973 }
974 }
975
976
977
978
979
980
981
982 private static interface LogRecord extends Serializable {
983 void apply(RegistrarImpl regImpl);
984 }
985
986
987
988
989
990
991
992 private static class SvcRegisteredLogObj implements LogRecord {
993
994 private static final long serialVersionUID = 2L;
995
996
997
998
999
1000
1001 private final SvcReg reg;
1002
1003
1004 public SvcRegisteredLogObj(SvcReg reg) {
1005 this.reg = reg;
1006 }
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018 public void apply(RegistrarImpl regImpl) {
1019 regImpl.concurrentObj.writeLock();
1020 try {
1021 SvcReg oldReg =
1022 (SvcReg)regImpl.serviceByID.get(reg.item.getServiceID());
1023 if (oldReg != null)
1024 regImpl.deleteService(oldReg, 0);
1025 regImpl.addService(reg);
1026 } finally {
1027 regImpl.concurrentObj.writeUnlock();
1028 }
1029 }
1030 }
1031
1032
1033
1034
1035
1036
1037
1038 private static class AttrsAddedLogObj implements LogRecord {
1039
1040 private static final long serialVersionUID = 2L;
1041
1042
1043
1044
1045
1046
1047 private final ServiceID serviceID;
1048
1049
1050
1051
1052
1053 private final Uuid leaseID;
1054
1055
1056
1057
1058
1059 private final EntryRep[] attrSets;
1060
1061
1062 public AttrsAddedLogObj(ServiceID serviceID,
1063 Uuid leaseID,
1064 EntryRep[] attrSets)
1065 {
1066 this.serviceID = serviceID;
1067 this.leaseID = leaseID;
1068 this.attrSets = attrSets;
1069 }
1070
1071
1072
1073
1074
1075
1076
1077
1078 public void apply(RegistrarImpl regImpl) {
1079 regImpl.concurrentObj.writeLock();
1080 try {
1081 regImpl.addAttributesDo(serviceID, leaseID, attrSets);
1082 } catch (UnknownLeaseException e) {
1083
1084 throw new AssertionError("an UnknownLeaseException should"
1085 + " never occur during recovery");
1086 } finally {
1087 regImpl.concurrentObj.writeUnlock();
1088 }
1089 }
1090 }
1091
1092
1093
1094
1095
1096
1097
1098
1099 private static class AttrsModifiedLogObj implements LogRecord {
1100
1101 private static final long serialVersionUID = 2L;
1102
1103
1104
1105
1106
1107
1108 private final ServiceID serviceID;
1109
1110
1111
1112
1113
1114 private final Uuid leaseID;
1115
1116
1117
1118
1119 private final EntryRep[] attrSetTmpls;
1120
1121
1122
1123
1124
1125 private final EntryRep[] attrSets;
1126
1127
1128 public AttrsModifiedLogObj(ServiceID serviceID,
1129 Uuid leaseID,
1130 EntryRep[] attrSetTmpls,
1131 EntryRep[] attrSets)
1132 {
1133 this.serviceID = serviceID;
1134 this.leaseID = leaseID;
1135 this.attrSetTmpls = attrSetTmpls;
1136 this.attrSets = attrSets;
1137 }
1138
1139
1140
1141
1142
1143
1144
1145
1146 public void apply(RegistrarImpl regImpl) {
1147 regImpl.concurrentObj.writeLock();
1148 try {
1149 regImpl.modifyAttributesDo(serviceID, leaseID,
1150 attrSetTmpls, attrSets);
1151 } catch (UnknownLeaseException e) {
1152
1153 throw new AssertionError("an UnknownLeaseException should"
1154 + " never occur during recovery");
1155 } finally {
1156 regImpl.concurrentObj.writeUnlock();
1157 }
1158 }
1159 }
1160
1161
1162
1163
1164
1165
1166
1167 private static class AttrsSetLogObj implements LogRecord {
1168
1169 private static final long serialVersionUID = 2L;
1170
1171
1172
1173
1174
1175
1176 private final ServiceID serviceID;
1177
1178
1179
1180
1181
1182 private final Uuid leaseID;
1183
1184
1185
1186
1187
1188 private final EntryRep[] attrSets;
1189
1190
1191 public AttrsSetLogObj(ServiceID serviceID,
1192 Uuid leaseID,
1193 EntryRep[] attrSets)
1194 {
1195 this.serviceID = serviceID;
1196 this.leaseID = leaseID;
1197 this.attrSets = attrSets;
1198 }
1199
1200
1201
1202
1203
1204
1205
1206
1207 public void apply(RegistrarImpl regImpl) {
1208 regImpl.concurrentObj.writeLock();
1209 try {
1210 regImpl.setAttributesDo(serviceID, leaseID, attrSets);
1211 } catch (UnknownLeaseException e) {
1212
1213 } finally {
1214 regImpl.concurrentObj.writeUnlock();
1215 }
1216 }
1217 }
1218
1219
1220
1221
1222
1223
1224
1225 private static class EventRegisteredLogObj implements LogRecord {
1226
1227 private static final long serialVersionUID = 2L;
1228
1229
1230
1231
1232
1233
1234 private final EventReg eventReg;
1235
1236
1237 public EventRegisteredLogObj(EventReg eventReg) {
1238 this.eventReg = eventReg;
1239 }
1240
1241
1242
1243
1244
1245
1246
1247
1248 public void apply(RegistrarImpl regImpl) {
1249 regImpl.concurrentObj.writeLock();
1250 try{
1251 eventReg.prepareListener(regImpl.recoveredListenerPreparer);
1252 regImpl.addEvent(eventReg);
1253 regImpl.eventID++;
1254 } finally {
1255 regImpl.concurrentObj.writeUnlock();
1256 }
1257 }
1258 }
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268 private static class ServiceLeaseCancelledLogObj implements LogRecord {
1269
1270 private static final long serialVersionUID = 2L;
1271
1272
1273
1274
1275
1276
1277 private final ServiceID serviceID;
1278
1279
1280
1281
1282
1283 private final Uuid leaseID;
1284
1285
1286 public ServiceLeaseCancelledLogObj(ServiceID serviceID, Uuid leaseID) {
1287 this.serviceID = serviceID;
1288 this.leaseID = leaseID;
1289 }
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299 public void apply(RegistrarImpl regImpl) {
1300 regImpl.concurrentObj.writeLock();
1301 try {
1302 regImpl.cancelServiceLeaseDo(serviceID, leaseID);
1303 } catch (UnknownLeaseException e) {
1304
1305 } finally {
1306 regImpl.concurrentObj.writeUnlock();
1307 }
1308 }
1309 }
1310
1311
1312
1313
1314
1315
1316
1317 private static class ServiceLeaseRenewedLogObj implements LogRecord {
1318
1319 private static final long serialVersionUID = 2L;
1320
1321
1322
1323
1324
1325
1326 private final ServiceID serviceID;
1327
1328
1329
1330
1331
1332 private final Uuid leaseID;
1333
1334
1335
1336
1337
1338 private final long leaseExpTime;
1339
1340
1341 public ServiceLeaseRenewedLogObj(ServiceID serviceID,
1342 Uuid leaseID,
1343 long leaseExpTime)
1344 {
1345 this.serviceID = serviceID;
1346 this.leaseID = leaseID;
1347 this.leaseExpTime = leaseExpTime;
1348 }
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358 public void apply(RegistrarImpl regImpl) {
1359 regImpl.renewServiceLeaseAbs(serviceID, leaseID, leaseExpTime);
1360 }
1361 }
1362
1363
1364
1365
1366
1367
1368
1369 private static class EventLeaseCancelledLogObj implements LogRecord {
1370
1371 private static final long serialVersionUID = 2L;
1372
1373
1374
1375
1376
1377
1378 private final long eventID;
1379
1380
1381
1382
1383
1384 private final Uuid leaseID;
1385
1386
1387 public EventLeaseCancelledLogObj(long eventID, Uuid leaseID) {
1388 this.eventID = eventID;
1389 this.leaseID = leaseID;
1390 }
1391
1392
1393
1394
1395
1396
1397
1398
1399 public void apply(RegistrarImpl regImpl) {
1400 regImpl.concurrentObj.writeLock();
1401 try {
1402 regImpl.cancelEventLeaseDo(eventID, leaseID);
1403 } catch (UnknownLeaseException e) {
1404
1405 } finally {
1406 regImpl.concurrentObj.writeUnlock();
1407 }
1408 }
1409 }
1410
1411
1412
1413
1414
1415
1416
1417 private static class EventLeaseRenewedLogObj implements LogRecord {
1418
1419 private static final long serialVersionUID = 2L;
1420
1421
1422
1423
1424
1425
1426 private final long eventID;
1427
1428
1429
1430
1431
1432 private final Uuid leaseID;
1433
1434
1435
1436
1437
1438 private final long leaseExpTime;
1439
1440
1441 public EventLeaseRenewedLogObj(long eventID,
1442 Uuid leaseID,
1443 long leaseExpTime)
1444 {
1445 this.eventID = eventID;
1446 this.leaseID = leaseID;
1447 this.leaseExpTime = leaseExpTime;
1448 }
1449
1450
1451
1452
1453
1454
1455
1456
1457 public void apply(RegistrarImpl regImpl) {
1458 regImpl.renewEventLeaseAbs(eventID, leaseID, leaseExpTime);
1459 }
1460 }
1461
1462
1463
1464
1465
1466
1467
1468 private static class LeasesRenewedLogObj implements LogRecord {
1469
1470 private static final long serialVersionUID = 2L;
1471
1472
1473
1474
1475
1476
1477 private final Object[] regIDs;
1478
1479
1480
1481
1482
1483 private final Uuid[] leaseIDs;
1484
1485
1486
1487
1488
1489 private final long[] leaseExpTimes;
1490
1491
1492 public LeasesRenewedLogObj(Object[] regIDs,
1493 Uuid[] leaseIDs,
1494 long[] leaseExpTimes)
1495 {
1496 this.regIDs = regIDs;
1497 this.leaseIDs = leaseIDs;
1498 this.leaseExpTimes = leaseExpTimes;
1499 }
1500
1501
1502
1503
1504
1505
1506
1507 public void apply(RegistrarImpl regImpl) {
1508 regImpl.renewLeasesAbs(regIDs, leaseIDs, leaseExpTimes);
1509 }
1510 }
1511
1512
1513
1514
1515
1516
1517
1518 private static class LeasesCancelledLogObj implements LogRecord {
1519
1520 private static final long serialVersionUID = 2L;
1521
1522
1523
1524
1525
1526
1527 private final Object[] regIDs;
1528
1529
1530
1531
1532
1533 private final Uuid[] leaseIDs;
1534
1535
1536 public LeasesCancelledLogObj(Object[] regIDs, Uuid[] leaseIDs) {
1537 this.regIDs = regIDs;
1538 this.leaseIDs = leaseIDs;
1539 }
1540
1541
1542
1543
1544
1545
1546
1547 public void apply(RegistrarImpl regImpl) {
1548
1549
1550
1551 regImpl.concurrentObj.writeLock();
1552 try {
1553 regImpl.cancelLeasesDo(regIDs, leaseIDs);
1554 } finally {
1555 regImpl.concurrentObj.writeUnlock();
1556 }
1557 }
1558 }
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574 private static class UnicastPortSetLogObj implements LogRecord {
1575
1576 private static final long serialVersionUID = 2L;
1577
1578
1579
1580
1581
1582
1583 private final int newPort;
1584
1585
1586 public UnicastPortSetLogObj(int newPort) {
1587 this.newPort = newPort;
1588 }
1589
1590
1591
1592
1593
1594
1595
1596 public void apply(RegistrarImpl regImpl) {
1597 regImpl.unicastPort = newPort;
1598 }
1599 }
1600
1601
1602
1603
1604
1605
1606
1607 private static class LookupGroupsChangedLogObj implements LogRecord {
1608
1609 private static final long serialVersionUID = 2L;
1610
1611
1612
1613
1614
1615
1616 private final String[] groups;
1617
1618
1619 public LookupGroupsChangedLogObj(String[] groups) {
1620 this.groups = groups;
1621 }
1622
1623
1624
1625
1626
1627
1628
1629 public void apply(RegistrarImpl regImpl) {
1630 regImpl.lookupGroups = groups;
1631 }
1632 }
1633
1634
1635
1636
1637
1638
1639
1640 private static class LookupLocatorsChangedLogObj implements LogRecord {
1641
1642 private static final long serialVersionUID = 2L;
1643
1644
1645
1646
1647 private transient LookupLocator[] locators;
1648
1649
1650 public LookupLocatorsChangedLogObj(LookupLocator[] locators) {
1651 this.locators = locators;
1652 }
1653
1654
1655
1656
1657
1658
1659
1660 public void apply(RegistrarImpl regImpl) {
1661 try {
1662 regImpl.lookupLocators = prepareLocators(
1663 locators, regImpl.recoveredLocatorPreparer, true);
1664 } catch (RemoteException e) {
1665 throw new AssertionError(e);
1666 }
1667 }
1668
1669
1670
1671
1672 private void writeObject(ObjectOutputStream stream)
1673 throws IOException
1674 {
1675 stream.defaultWriteObject();
1676 marshalLocators(locators, stream);
1677 }
1678
1679
1680
1681
1682
1683 private void readObject(ObjectInputStream stream)
1684 throws IOException, ClassNotFoundException
1685 {
1686 stream.defaultReadObject();
1687 locators = unmarshalLocators(stream);
1688 }
1689 }
1690
1691
1692
1693
1694
1695
1696
1697 private static class MemberGroupsChangedLogObj implements LogRecord {
1698
1699 private static final long serialVersionUID = 2L;
1700
1701
1702
1703
1704
1705
1706 private final String[] groups;
1707
1708
1709 public MemberGroupsChangedLogObj(String[] groups) {
1710 this.groups = groups;
1711 }
1712
1713
1714
1715
1716
1717
1718
1719 public void apply(RegistrarImpl regImpl) {
1720 regImpl.concurrentObj.writeLock();
1721 try {
1722 regImpl.memberGroups = groups;
1723 } finally {
1724 regImpl.concurrentObj.writeUnlock();
1725 }
1726 }
1727 }
1728
1729
1730
1731
1732
1733
1734
1735 private static class LookupAttributesChangedLogObj implements LogRecord {
1736
1737 private static final long serialVersionUID = 1L;
1738
1739
1740
1741
1742 private transient Entry[] attrs;
1743
1744
1745 public LookupAttributesChangedLogObj(Entry[] attrs) {
1746 this.attrs = attrs;
1747 }
1748
1749
1750
1751
1752
1753
1754
1755 public void apply(RegistrarImpl regImpl) {
1756 regImpl.lookupAttrs = attrs;
1757 }
1758
1759
1760
1761
1762 private void writeObject(ObjectOutputStream stream)
1763 throws IOException
1764 {
1765 stream.defaultWriteObject();
1766 marshalAttributes(attrs, stream);
1767 }
1768
1769
1770
1771
1772
1773 private void readObject(ObjectInputStream stream)
1774 throws IOException, ClassNotFoundException
1775 {
1776 stream.defaultReadObject();
1777 attrs = unmarshalAttributes(stream);
1778 }
1779 }
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863 private static class LocalLogHandler extends LogHandler {
1864 private final RegistrarImpl reggie;
1865
1866 public LocalLogHandler(RegistrarImpl reggie) {
1867 this.reggie = reggie;
1868 }
1869
1870
1871 public void snapshot(OutputStream out) throws IOException {
1872 reggie.concurrentObj.readLock();
1873 try {
1874 reggie.takeSnapshot(out);
1875 } finally {
1876 reggie.concurrentObj.readUnlock();
1877 }
1878 }
1879
1880
1881 public void recover(InputStream in)
1882 throws IOException, ClassNotFoundException
1883 {
1884 reggie.concurrentObj.writeLock();
1885 try {
1886 reggie.recoverSnapshot(in);
1887 } finally {
1888 reggie.concurrentObj.writeUnlock();
1889 }
1890 }
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914 public void applyUpdate(Object logRecObj) {
1915 ((LogRecord)logRecObj).apply(reggie);
1916 }
1917 }
1918
1919
1920 private static abstract class ItemIter {
1921
1922 public final long now = System.currentTimeMillis();
1923
1924 public boolean dupsPossible = false;
1925
1926 protected final Template tmpl;
1927
1928 protected SvcReg reg;
1929
1930
1931 protected ItemIter(Template tmpl) {
1932 this.tmpl = tmpl;
1933 }
1934
1935
1936 public boolean hasNext() {
1937 return reg != null;
1938 }
1939
1940
1941 public Item next() {
1942 if (reg == null)
1943 throw new NoSuchElementException();
1944 Item item = reg.item;
1945 step();
1946 return item;
1947 }
1948
1949
1950 public SvcReg nextReg() {
1951 if (reg == null)
1952 throw new NoSuchElementException();
1953 SvcReg cur = reg;
1954 step();
1955 return cur;
1956 }
1957
1958
1959 protected abstract void step();
1960 }
1961
1962
1963 private static class AllItemIter extends ItemIter {
1964
1965 private final Iterator<SvcReg> iter;
1966
1967
1968 public AllItemIter(Iterator<SvcReg> it) {
1969 super(null);
1970 iter = it;
1971 step();
1972 }
1973
1974
1975 @Override
1976 protected void step() {
1977 while (iter.hasNext()) {
1978 reg = (SvcReg)iter.next();
1979 if (reg.leaseExpiration > now)
1980 return;
1981 }
1982 reg = null;
1983 }
1984 }
1985
1986
1987 private static class SvcIterator extends ItemIter {
1988
1989 private final Iterator<SvcReg> services;
1990
1991
1992
1993
1994
1995 public SvcIterator(Template tmpl, Iterator<SvcReg> it) {
1996 super(tmpl);
1997 services = it;
1998 step();
1999 }
2000
2001
2002 protected final void step() {
2003 if (tmpl.serviceTypesLength() > 1) {
2004 while (services.hasNext()) {
2005 reg = (SvcReg) services.next();
2006 if (reg.leaseExpiration > now &&
2007 matchType(tmpl.serviceTypes(), reg.item.serviceType) &&
2008 matchAttributes(tmpl, reg.item))
2009 return;
2010 }
2011 } else {
2012 while (services.hasNext()) {
2013 reg = (SvcReg) services.next();
2014 if (reg.leaseExpiration > now &&
2015 matchAttributes(tmpl, reg.item))
2016 return;
2017 }
2018 }
2019 reg = null;
2020 }
2021 }
2022
2023
2024 private static class AttrItemIter extends ItemIter {
2025
2026 private final List<SvcReg> svcs;
2027
2028 private int svcidx;
2029
2030
2031
2032
2033
2034
2035 public AttrItemIter(Template tmpl, List<SvcReg> svcs) {
2036 super(tmpl);
2037 this.svcs = svcs;
2038 if (svcs != null) {
2039 svcidx = svcs.size();
2040 step();
2041 }
2042 }
2043
2044
2045 protected void step() {
2046 while (--svcidx >= 0) {
2047 reg = svcs.get(svcidx);
2048 if (reg.leaseExpiration > now
2049 && matchAttributes(tmpl, reg.item)) {
2050 return;
2051 }
2052 }
2053 reg = null;
2054 }
2055 }
2056
2057
2058 private class ClassItemIter extends ItemIter {
2059
2060 private final EntryClass eclass;
2061
2062 private int classidx;
2063
2064 private Iterator<List<SvcReg>> iter;
2065
2066 private List<SvcReg> svcs;
2067
2068 private int svcidx = 0;
2069
2070
2071
2072
2073
2074
2075 public ClassItemIter(Template tmpl) {
2076 super(tmpl);
2077 dupsPossible = true;
2078 eclass = tmpl.attributeSetTemplType();
2079 classidx = entryClasses.size();
2080 step();
2081 }
2082
2083
2084 protected void step() {
2085 do {
2086 while (--svcidx >= 0) {
2087 reg = svcs.get(svcidx);
2088 if (reg.leaseExpiration > now &&
2089 matchAttributes(tmpl, reg.item))
2090 return;
2091 }
2092 } while (stepValue());
2093 reg = null;
2094 }
2095
2096
2097
2098
2099
2100 private boolean stepValue() {
2101 while (true) {
2102 if (iter != null && iter.hasNext()) {
2103 svcs = iter.next();
2104 svcidx = svcs.size();
2105 return true;
2106 }
2107 if (!stepClass())
2108 return false;
2109 if (iter == null)
2110 return true;
2111 }
2112 }
2113
2114
2115
2116
2117
2118
2119
2120 private boolean stepClass() {
2121 while (--classidx >= 0) {
2122 EntryClass cand = entryClasses.get(classidx);
2123 if (!eclass.isAssignableFrom(cand))
2124 continue;
2125 if (cand.getNumFields() > 0) {
2126 cand = getDefiningClass(cand, cand.getNumFields() - 1);
2127 Map<Object,List<SvcReg>>[] attrMaps = serviceByAttr.get(cand);
2128 iter = attrMaps[attrMaps.length - 1].values().iterator();
2129 } else {
2130 iter = null;
2131 svcs = serviceByEmptyAttr.get(cand);
2132 svcidx = svcs.size();
2133 }
2134 return true;
2135 }
2136 return false;
2137 }
2138 }
2139
2140
2141 private static class IDItemIter extends ItemIter {
2142
2143
2144 public IDItemIter(Template tmpl, SvcReg reg) {
2145 super(tmpl);
2146 if (reg != null &&
2147 (reg.leaseExpiration <= now || !matchItem(tmpl, reg.item))) {
2148 reg = null;
2149 }
2150 this.reg = reg;
2151 }
2152
2153
2154 protected void step() {
2155 reg = null;
2156 }
2157 }
2158
2159
2160 private static final class EventTask implements Callable<Boolean>, Comparable<EventTask> {
2161
2162
2163 private final EventReg reg;
2164
2165 private final long seqNo;
2166
2167 private final ServiceID sid;
2168
2169 private final Item item;
2170
2171 private final int transition;
2172
2173 private final RegistrarProxy proxy;
2174 private final Registrar registrar;
2175
2176 private final long now;
2177
2178 private final RemoteEventListener listener;
2179 private final boolean newNotify;
2180
2181
2182 public EventTask(EventReg reg,
2183 ServiceID sid,
2184 Item item,
2185 int transition,
2186 RegistrarProxy proxy,
2187 Registrar registrar,
2188 long now)
2189 {
2190 this.reg = reg;
2191 this.listener = reg.listener;
2192 seqNo = reg.incrementAndGetSeqNo();
2193 this.sid = sid;
2194 this.item = item;
2195 this.transition = transition;
2196 this.proxy = proxy;
2197 this.registrar = registrar;
2198 this.now = now;
2199 this.newNotify = reg.newNotify;
2200 }
2201
2202
2203 @Override
2204 public Boolean call() throws Exception {
2205 if (EVENT_LOGGER.isLoggable(Level.FINE)) {
2206 EVENT_LOGGER.log(
2207 Level.FINE,
2208 "notifying listener {0} of event {1}",
2209 new Object[]{ listener, Long.valueOf(reg.eventID) });
2210 }
2211 try {
2212 if (!newNotify){
2213 listener.notify(
2214 new RegistrarEvent(proxy,
2215 reg.eventID,
2216 seqNo,
2217 (MarshalledObject) reg.handback,
2218 sid,
2219 transition,
2220 item
2221 )
2222 );
2223 } else {
2224 listener.notify(
2225 new RegistrarEvent(proxy,
2226 reg.eventID,
2227 seqNo,
2228 (MarshalledInstance) reg.handback,
2229 sid,
2230 transition,
2231 item
2232 )
2233 );
2234 }
2235 return Boolean.TRUE;
2236 } catch (Throwable e) {
2237 switch (ThrowableConstants.retryable(e)) {
2238 case ThrowableConstants.BAD_OBJECT:
2239 if (e instanceof Error) {
2240 EVENT_LOGGER.log(
2241 Levels.HANDLED, "exception sending event", e);
2242 throw (Error) e;
2243 }
2244 case ThrowableConstants.BAD_INVOCATION:
2245 case ThrowableConstants.UNCATEGORIZED:
2246
2247
2248
2249 EVENT_LOGGER.log(Level.INFO, "exception sending event", e);
2250 try {
2251 registrar.cancelEventLease(reg.eventID, reg.leaseID);
2252 } catch (UnknownLeaseException ee) {
2253 EVENT_LOGGER.log(
2254 Levels.HANDLED,
2255 "exception canceling event lease",
2256 e);
2257 } catch (RemoteException ee) {
2258 EVENT_LOGGER.log(
2259 Levels.HANDLED,
2260 "The server has been shutdown",
2261 e);
2262 }
2263 }
2264 return Boolean.FALSE;
2265 }
2266 }
2267
2268
2269
2270
2271
2272
2273
2274
2275 @Override
2276 public int compareTo(EventTask o) {
2277 if (this.now < o.now) return -1;
2278 if (this.now > o.now) return 1;
2279 if (this.seqNo < o.seqNo) return -1;
2280 if (this.seqNo > o.seqNo) return 1;
2281 return 0;
2282 }
2283 }
2284
2285
2286 private static final class DecodeRequestTask implements Runnable {
2287
2288 private final DatagramPacket datagram;
2289
2290 private final Discovery decoder;
2291 private final RegistrarImpl reggie;
2292
2293 private final Set<AddressTask> runningTasks;
2294
2295 public DecodeRequestTask(
2296 DatagramPacket datagram, Discovery decoder, RegistrarImpl reggie, Set<AddressTask> runningTasks)
2297 {
2298 this.datagram = datagram;
2299 this.decoder = decoder;
2300 this.reggie = reggie;
2301 this.runningTasks = runningTasks;
2302 }
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312 public void run() {
2313 MulticastRequest req;
2314 try {
2315 req = decoder.decodeMulticastRequest(
2316 datagram,
2317 reggie.multicastRequestConstraints.getUnfulfilledConstraints(),
2318 reggie.multicastRequestSubjectChecker, true);
2319 } catch (Exception e) {
2320 if (!(e instanceof InterruptedIOException) &&
2321 DISCOVERY_LOGGER.isLoggable(Levels.HANDLED))
2322 {
2323 logThrow(DISCOVERY_LOGGER, Levels.HANDLED,
2324 getClass().getName(),
2325 "run",
2326 "exception decoding multicast request from {0}:{1}",
2327 new Object[]{
2328 datagram.getAddress(),
2329 Integer.valueOf(datagram.getPort()) },
2330 e);
2331 }
2332 return;
2333 }
2334 String[] groups = req.getGroups();
2335 if ((groups.length == 0 || overlap(reggie.memberGroups, groups)) &&
2336 indexOf(req.getServiceIDs(), reggie.myServiceID) < 0)
2337 {
2338 try {
2339 req.checkConstraints();
2340 } catch (Exception e) {
2341 if (!(e instanceof InterruptedIOException) &&
2342 DISCOVERY_LOGGER.isLoggable(Levels.HANDLED))
2343 {
2344 logThrow(DISCOVERY_LOGGER, Levels.HANDLED,
2345 getClass().getName(),
2346 "run",
2347 "exception decoding multicast request from {0}:{1}",
2348 new Object[]{
2349 datagram.getAddress(),
2350 Integer.valueOf(datagram.getPort()) },
2351 e);
2352 }
2353 return;
2354 }
2355 AddressTask task =
2356 new AddressTask(req.getHost(), req.getPort(), reggie);
2357 if (runningTasks.add(task)) {
2358 try {
2359 task.run();
2360 } finally {
2361 runningTasks.remove(task);
2362 }
2363 }
2364 }
2365 }
2366 }
2367
2368
2369 private static final class AddressTask implements Runnable, Comparable<AddressTask> {
2370
2371
2372 private final String host;
2373
2374 private final int port;
2375
2376 private final RegistrarImpl reggie;
2377
2378 private final int hash;
2379
2380
2381
2382 public AddressTask(
2383 String host, int port, RegistrarImpl reggie)
2384 {
2385 this.reggie = reggie;
2386 this.host = host;
2387 this.port = port;
2388 int hash = 3;
2389 hash = 37 * hash + (this.host != null ? this.host.hashCode() : 0);
2390 hash = 37 * hash + this.port;
2391 this.hash = hash;
2392 }
2393
2394
2395 @Override
2396 public int hashCode() {
2397 return hash;
2398 }
2399
2400
2401 public boolean equals(Object obj) {
2402 if (!(obj instanceof AddressTask))
2403 return false;
2404 AddressTask ua = (AddressTask)obj;
2405 return host.equals(ua.host) && port == ua.port;
2406 }
2407
2408
2409 public void run() {
2410 InetAddress[] addr = new InetAddress[]{};
2411 try {
2412 try {
2413 addr = InetAddress.getAllByName(host);
2414 if (addr == null)
2415 addr = new InetAddress[]{};
2416 } catch (UnknownHostException e) {
2417 if (DISCOVERY_LOGGER.isLoggable(Level.INFO)) {
2418 logThrow(DISCOVERY_LOGGER, Level.INFO,
2419 getClass().getName(),
2420 "run",
2421 "failed to resolve host {0};"
2422 + " connection will still be attempted",
2423 new Object[]{ host },
2424 e);
2425 }
2426 }
2427 long deadline = DiscoveryConstraints.process(
2428 reggie.rawUnicastDiscoveryConstraints).getConnectionDeadline(
2429 Long.MAX_VALUE);
2430 long now = System.currentTimeMillis();
2431 if (deadline <= now)
2432 throw new SocketTimeoutException("timeout expired before"
2433 + " connection attempt");
2434 long timeLeft = deadline - now;
2435 int timeout = timeLeft >= Integer.MAX_VALUE ?
2436 Integer.MAX_VALUE : (int) timeLeft;
2437
2438 int len = addr.length;
2439 if (len == 0) {
2440 attemptResponse(
2441 new InetSocketAddress(host, port), timeout);
2442 return;
2443 }
2444 for (int i = 0; i < len; i++) {
2445 try {
2446 attemptResponse(
2447 new InetSocketAddress(addr[i], port), timeout);
2448 return;
2449 } catch (Exception e) {
2450 if (DISCOVERY_LOGGER.isLoggable(Levels.HANDLED)) {
2451 logThrow(DISCOVERY_LOGGER, Levels.HANDLED, getClass().getName(),
2452 "run", "exception responding to {0}:{1}",
2453 new Object[] {addr[i], Integer.valueOf(port)}
2454 , e);
2455 }
2456 }
2457 timeLeft = deadline - System.currentTimeMillis();
2458 timeout = timeLeft >= Integer.MAX_VALUE ?
2459 Integer.MAX_VALUE : (int) timeLeft;
2460 if (timeLeft <= 0)
2461 throw new SocketTimeoutException("timeout expired"
2462 + " before successful response");
2463 }
2464 } catch (Exception e) {
2465 if (DISCOVERY_LOGGER.isLoggable(Level.INFO)) {
2466 logThrow(DISCOVERY_LOGGER, Level.INFO,
2467 getClass().getName(),
2468 "run",
2469 "failed to respond to {0} on port {1}",
2470 new Object[]{Arrays.asList(addr), Integer.valueOf(port)},
2471 e);
2472 }
2473 }
2474 }
2475
2476
2477 private void attemptResponse(InetSocketAddress addr, int timeout)
2478 throws Exception
2479 {
2480 Socket s = reggie.socketFactory.createSocket();
2481 try {
2482 s.connect(addr, timeout);
2483 reggie.respond(s);
2484 } finally {
2485 try {
2486 s.close();
2487 } catch (IOException e) {
2488 DISCOVERY_LOGGER.log(Levels.HANDLED, "exception closing socket", e);
2489 }
2490 }
2491 }
2492
2493 @Override
2494 public int compareTo(AddressTask o) {
2495 int hostCompare = host.compareTo(o.host);
2496 if ( hostCompare == -1) return -1;
2497 if ( hostCompare == 1) return 1;
2498 if (port < o.port) return -1;
2499 if (port > o.port) return 1;
2500 return 0;
2501 }
2502 }
2503
2504
2505 private static final class SocketTask implements Runnable {
2506
2507
2508 public final Socket socket;
2509 public final RegistrarImpl reggie;
2510
2511
2512 public SocketTask(Socket socket, RegistrarImpl reggie) {
2513 this.socket = socket;
2514 this.reggie = reggie;
2515 }
2516
2517
2518 public void run() {
2519 try {
2520 reggie.respond(socket);
2521 } catch (Exception e) {
2522 if (DISCOVERY_LOGGER.isLoggable(Levels.HANDLED)) {
2523 logThrow(DISCOVERY_LOGGER, Levels.HANDLED,
2524 getClass().getName(),
2525 "run",
2526 "exception handling unicast discovery from {0}:{1}",
2527 new Object[] {
2528 socket.getInetAddress(),
2529 Integer.valueOf(socket.getPort())}
2530 ,
2531 e);
2532 }
2533 }
2534 }
2535 }
2536
2537
2538 private static class ServiceExpire implements Runnable {
2539 final RegistrarImpl reggie;
2540
2541 public ServiceExpire(RegistrarImpl reggie) {
2542 this.reggie = reggie;
2543 }
2544
2545 public void run() {
2546 try {
2547 reggie.concurrentObj.writeLock();
2548 } catch (ConcurrentLockException e) {
2549 return;
2550 }
2551 try {
2552 while (!Thread.currentThread().isInterrupted()) {
2553 long now = System.currentTimeMillis();
2554 while (true) {
2555 SvcReg reg = reggie.serviceByTime.first();
2556 reggie.minSvcExpiration = reg.leaseExpiration;
2557 if (reggie.minSvcExpiration > now)
2558 break;
2559 reggie.deleteService(reg, now);
2560 reggie.addLogRecord(new ServiceLeaseCancelledLogObj(
2561 reg.item.getServiceID(), reg.leaseID));
2562 if (LOGGER.isLoggable(Level.FINE)) {
2563 LOGGER.log(
2564 Level.FINE,
2565 "expired service registration {0}",
2566 new Object[]{ reg.item.getServiceID() });
2567 }
2568 }
2569 try {
2570 reggie.serviceNotifier.await(reggie.minSvcExpiration - now, TimeUnit.MILLISECONDS);
2571 } catch (InterruptedException ex) {
2572 Thread.currentThread().interrupt();
2573 return;
2574 }
2575 }
2576 } finally {
2577 reggie.concurrentObj.writeUnlock();
2578 }
2579 }
2580 }
2581
2582
2583 private static class EventExpire implements Runnable {
2584 private final RegistrarImpl reggie;
2585
2586 public EventExpire(RegistrarImpl reggie) {
2587 this.reggie = reggie;
2588 }
2589
2590 public void run() {
2591 try {
2592 reggie.concurrentObj.writeLock();
2593 } catch (ConcurrentLockException e) {
2594 return;
2595 }
2596 try {
2597 while (!Thread.currentThread().isInterrupted()) {
2598 long now = System.currentTimeMillis();
2599 reggie.minEventExpiration = Long.MAX_VALUE;
2600 for (EventReg reg = reggie.eventByTime.poll();
2601 reg != null; reg = reggie.eventByTime.poll()) {
2602 if (reg.getLeaseExpiration() > now) {
2603 reggie.minEventExpiration = reg.getLeaseExpiration();
2604 break;
2605 }
2606 reggie.deleteEvent(reg);
2607 if (EVENT_LOGGER.isLoggable(Level.FINE)) {
2608 EVENT_LOGGER.log(
2609 Level.FINE,
2610 "expired event registration {0} for {1}",
2611 new Object[]{ reg.leaseID, reg.listener });
2612 }
2613 }
2614 try {
2615 reggie.eventNotifier.await(reggie.minEventExpiration - now, TimeUnit.MILLISECONDS);
2616 } catch (InterruptedException ex) {
2617 Thread.currentThread().interrupt();
2618 return;
2619 }
2620 }
2621 } finally {
2622 reggie.concurrentObj.writeUnlock();
2623 }
2624 }
2625 }
2626
2627
2628
2629
2630
2631
2632 private static class Destroy implements Runnable {
2633 private final RegistrarImpl reggie;
2634
2635 public Destroy(RegistrarImpl reggie) {
2636 this.reggie = reggie;
2637 }
2638
2639 public void run() {
2640 long now = System.currentTimeMillis();
2641 long endTime = now + reggie.unexportTimeout;
2642 if (endTime < 0)
2643 endTime = Long.MAX_VALUE;
2644 boolean unexported = false;
2645
2646 while(!unexported && (now < endTime)) {
2647 unexported = reggie.serverExporter.unexport(false);
2648 if (!unexported) {
2649 try {
2650 final long sleepTime =
2651 Math.min(reggie.unexportWait, endTime - now);
2652 Thread.currentThread().sleep(sleepTime);
2653 now = System.currentTimeMillis();
2654 } catch (InterruptedException e) {
2655 Thread.currentThread().interrupt();
2656 LOGGER.log(
2657 Levels.HANDLED, "exception during unexport wait", e);
2658 }
2659 }
2660 }
2661
2662 if(!unexported) {
2663 reggie.serverExporter.unexport(true);
2664 }
2665
2666
2667 reggie.serviceExpirer.interrupt();
2668 reggie.eventExpirer.interrupt();
2669 reggie.unicaster.interrupt();
2670 reggie.multicaster.interrupt();
2671 reggie.announcer.interrupt();
2672 reggie.snapshotter.interrupt();
2673 reggie.eventNotifierExec.shutdown();
2674 List<Runnable> cancelledTasks = reggie.discoveryResponseExec.shutdownNow();
2675 reggie.joiner.terminate();
2676 reggie.discoer.terminate();
2677 try {
2678 reggie.serviceExpirer.join();
2679 reggie.eventExpirer.join();
2680 reggie.unicaster.join();
2681 reggie.multicaster.join();
2682 reggie.announcer.join();
2683 reggie.snapshotter.join();
2684 } catch (InterruptedException e) {
2685 }
2686 reggie.closeRequestSockets(cancelledTasks);
2687 if (reggie.log != null) {
2688 reggie.log.deletePersistentStore();
2689 SNAPSHOT_LOGGER.finer("deleted persistence directory");
2690 }
2691 if (reggie.activationID != null) {
2692 try {
2693 ActivationGroup.inactive(reggie.activationID, reggie.serverExporter);
2694 } catch (Exception e) {
2695 LOGGER.log(Level.INFO, "exception going inactive", e);
2696 }
2697 }
2698 if (reggie.lifeCycle != null) {
2699 reggie.lifeCycle.unregister(reggie);
2700 }
2701 if (reggie.loginContext != null) {
2702 try {
2703 reggie.loginContext.logout();
2704 } catch (LoginException e) {
2705 LOGGER.log(Level.INFO, "logout failed", e);
2706 }
2707 }
2708 LOGGER.info("Reggie shutdown completed");
2709 }
2710 }
2711
2712
2713 private static class Multicast implements Runnable, Interruptable {
2714 private final RegistrarImpl reggie;
2715
2716 private final InetAddress requestAddr;
2717
2718 private final MulticastSocket socket;
2719
2720 private final List<NetworkInterface> failedInterfaces;
2721
2722 private final Set<AddressTask> runningTasks;
2723
2724 private volatile boolean interrupted = false;
2725
2726
2727
2728
2729
2730 public Multicast(RegistrarImpl reggie) throws IOException {
2731 this.runningTasks = new ConcurrentSkipListSet<AddressTask>();
2732 this.reggie = reggie;
2733 List<NetworkInterface> failedInterfaces = new ArrayList<NetworkInterface>();
2734 if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0)
2735 {
2736 requestAddr = null;
2737 socket = null;
2738 this.failedInterfaces = failedInterfaces;
2739 return;
2740 }
2741 requestAddr = Constants.getRequestAddress();
2742 socket = new MulticastSocket(Constants.discoveryPort);
2743 if (reggie.multicastInterfaces != null) {
2744 Level failureLogLevel = reggie.multicastInterfacesSpecified ?
2745 Level.WARNING : Levels.HANDLED;
2746 int l = reggie.multicastInterfaces.length;
2747 for (int i = 0; i < l; i++) {
2748 NetworkInterface nic = reggie.multicastInterfaces[i];
2749 try {
2750 socket.setNetworkInterface(nic);
2751 socket.joinGroup(requestAddr);
2752 } catch (IOException e) {
2753 failedInterfaces.add(nic);
2754 if (DISCOVERY_LOGGER.isLoggable(failureLogLevel)) {
2755 logThrow(DISCOVERY_LOGGER, failureLogLevel,
2756 getClass().getName(),
2757 "<init>",
2758 "exception enabling {0}",
2759 new Object[]{ nic },
2760 e);
2761 }
2762 }
2763 }
2764 } else {
2765 try {
2766 socket.joinGroup(requestAddr);
2767 } catch (IOException e) {
2768 failedInterfaces.add(null);
2769 DISCOVERY_LOGGER.log(
2770 Level.WARNING,
2771 "exception enabling default interface", e);
2772 }
2773 }
2774 this.failedInterfaces = failedInterfaces;
2775 }
2776
2777 public void run() {
2778 if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0)
2779 {
2780 return;
2781 }
2782 byte[] buf = new byte[
2783 reggie.multicastRequestConstraints.getMulticastMaxPacketSize(
2784 DEFAULT_MAX_PACKET_SIZE)];
2785 DatagramPacket dgram = new DatagramPacket(buf, buf.length);
2786 long retryTime =
2787 System.currentTimeMillis() + reggie.multicastInterfaceRetryInterval;
2788 while (!interrupted) {
2789 try {
2790 int timeout = 0;
2791 if (!failedInterfaces.isEmpty()) {
2792 timeout =
2793 (int) (retryTime - System.currentTimeMillis());
2794 if (timeout <= 0) {
2795 retryFailedInterfaces();
2796 if (failedInterfaces.isEmpty()) {
2797 timeout = 0;
2798 } else {
2799 timeout = reggie.multicastInterfaceRetryInterval;
2800 retryTime =
2801 System.currentTimeMillis() + timeout;
2802 }
2803 }
2804 }
2805 socket.setSoTimeout(timeout);
2806 dgram.setLength(buf.length);
2807 try {
2808 socket.receive(dgram);
2809 } catch (NullPointerException e) {
2810 break;
2811 }
2812
2813 int pv;
2814 try {
2815 pv = ByteBuffer.wrap(dgram.getData(),
2816 dgram.getOffset(),
2817 dgram.getLength()).getInt();
2818 } catch (BufferUnderflowException e) {
2819 throw new DiscoveryProtocolException(null, e);
2820 }
2821 reggie.multicastRequestConstraints.checkProtocolVersion(pv);
2822 reggie.discoveryResponseExec.execute(
2823 new DecodeRequestTask(
2824 dgram,
2825 reggie.getDiscovery(pv),
2826 reggie,
2827 runningTasks
2828 )
2829 );
2830
2831 buf = new byte[buf.length];
2832 dgram = new DatagramPacket(buf, buf.length);
2833
2834 } catch (SocketTimeoutException e) {
2835
2836 } catch (InterruptedIOException e) {
2837 break;
2838 } catch (Exception e) {
2839 if (interrupted) {
2840 break;
2841 }
2842 DISCOVERY_LOGGER.log(Levels.HANDLED,
2843 "exception receiving multicast request", e);
2844 }
2845 }
2846 socket.close();
2847 }
2848
2849 public void interrupt() {
2850
2851 interrupted = true;
2852 if (socket != null) socket.close();
2853 }
2854
2855
2856
2857
2858
2859
2860
2861 private void retryFailedInterfaces() {
2862 for (Iterator<NetworkInterface> i = failedInterfaces.iterator(); i.hasNext(); ) {
2863 NetworkInterface nic = i.next();
2864 try {
2865 if (nic != null) {
2866 socket.setNetworkInterface(nic);
2867 }
2868 socket.joinGroup(requestAddr);
2869 i.remove();
2870
2871 Level l = reggie.multicastInterfacesSpecified ?
2872 Level.INFO : Level.FINE;
2873 if (DISCOVERY_LOGGER.isLoggable(l)) {
2874 if (nic != null) {
2875 DISCOVERY_LOGGER.log(l, "enabled {0}", new Object[]{ nic });
2876 } else {
2877 DISCOVERY_LOGGER.log(l, "enabled default interface");
2878 }
2879 }
2880 } catch (IOException e) {
2881
2882 }
2883 }
2884 }
2885 }
2886
2887
2888 private static class Unicast implements Runnable, Interruptable {
2889 private static final Boolean arbitraryPort;
2890
2891 static {
2892 arbitraryPort = AccessController.doPrivileged(
2893 new PrivilegedAction<Boolean>(){
2894
2895 @Override
2896 public Boolean run() {
2897 return Boolean.valueOf(
2898 Boolean.getBoolean(
2899 "net.jini.core.lookup.ServiceRegistrar.portAbitraryIfInUse"
2900 )
2901 );
2902 }
2903
2904 }
2905 );
2906 }
2907 private final RegistrarImpl reggie;
2908
2909 private final ServerSocket listen;
2910
2911 public final int port;
2912
2913 private volatile boolean interrupted = false;
2914
2915
2916
2917
2918
2919 public Unicast(RegistrarImpl reggie, int port) throws IOException {
2920 this.reggie = reggie;
2921 ServerSocket listen = null;
2922 boolean ephemeral = false;
2923 if (port == 0) {
2924 try {
2925 listen = reggie.serverSocketFactory.createServerSocket(Constants.discoveryPort);
2926 port = Constants.discoveryPort;
2927 } catch (IOException e) {
2928 DISCOVERY_LOGGER.log(
2929 Levels.HANDLED, "failed to bind to default port", e);
2930 }
2931 }
2932 if (listen == null) {
2933 try {
2934 listen = reggie.serverSocketFactory.createServerSocket(port);
2935 } catch (IOException e){
2936 DISCOVERY_LOGGER.log(Level.INFO, "failed to bind to port " + port, e);
2937 if (arbitraryPort){
2938 listen = reggie.serverSocketFactory.createServerSocket(0);
2939 ephemeral = true;
2940 } else {
2941 throw e;
2942 }
2943 }
2944 }
2945 port = listen.getLocalPort();
2946 DISCOVERY_LOGGER.log(Level.INFO, "Reggie Unicast Discovery listening on port {0}", port);
2947 this.listen = listen;
2948 this.port = port;
2949 if (ephemeral) reggie.unicastPort = port;
2950 }
2951
2952 public void run() {
2953 while (!interrupted) {
2954 try {
2955 Socket socket = listen.accept();
2956 if (interrupted) {
2957 try {
2958 socket.close();
2959 } catch (IOException e) {
2960 DISCOVERY_LOGGER.log(
2961 Levels.HANDLED, "exception closing socket", e);
2962 }
2963 break;
2964 }
2965 reggie.discoveryResponseExec.execute(new SocketTask(socket, reggie));
2966 } catch (InterruptedIOException e) {
2967 break;
2968 } catch (Exception e) {
2969 DISCOVERY_LOGGER.log(
2970 Levels.HANDLED, "exception listening on socket", e);
2971 }
2972
2973 }
2974 try {
2975 listen.close();
2976 } catch (IOException e) {
2977 DISCOVERY_LOGGER.log(
2978 Levels.HANDLED, "exception closing server socket", e);
2979 }
2980 }
2981
2982
2983
2984
2985
2986
2987 public void interrupt() {
2988 interrupted = true;
2989 AccessController.doPrivileged( new PrivilegedAction(){
2990 public Object run(){
2991 try {
2992 Socket s = reggie.socketFactory.createSocket(LocalHostLookup.getLocalHost(), port);
2993 s.close();
2994 } catch (IOException e) {
2995 } finally {
2996 return null;
2997 }
2998 }
2999 });
3000 }
3001 }
3002
3003
3004 private static class Announce implements Runnable {
3005 private final RegistrarImpl reggie;
3006
3007 private final MulticastSocket socket;
3008
3009 private DatagramPacket[] dataPackets = null;
3010
3011 private LookupLocator lastLocator;
3012
3013 private String[] lastGroups;
3014
3015
3016
3017
3018
3019 public Announce(RegistrarImpl reggie) throws IOException {
3020 this.reggie = reggie;
3021 if (reggie.multicastInterfaces == null || reggie.multicastInterfaces.length > 0)
3022 {
3023 socket = new MulticastSocket();
3024 if (Constants.GLOBAL_ANNOUNCE){
3025 socket.setTimeToLive(
3026 reggie.multicastAnnouncementConstraints.getMulticastTimeToLive(
3027 DEFAULT_MULTICAST_TTL));
3028 } else {
3029 socket.setTimeToLive(
3030 reggie.multicastAnnouncementConstraints.getMulticastTimeToLive(
3031 DEFAULT_MULTICAST_TTL));
3032
3033 }
3034 } else {
3035 socket = null;
3036 }
3037 }
3038
3039 public void run() {
3040 Thread currentThread = Thread.currentThread();
3041 synchronized (currentThread){
3042 if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0)
3043 {
3044 return;
3045 }
3046 try {
3047 while (!currentThread.isInterrupted() && announce(reggie.memberGroups)) {
3048 currentThread.wait(reggie.multicastAnnouncementInterval);
3049 }
3050 } catch (InterruptedException e) {
3051 currentThread.interrupt();
3052 return;
3053 } finally {
3054 if (reggie.memberGroups.length > 0)
3055 announce(new String[0]);
3056 socket.close();
3057 }
3058 }
3059 }
3060
3061
3062
3063
3064
3065
3066 private boolean announce(String[] groups) {
3067 if (dataPackets == null || !lastLocator.equals(reggie.myLocator) ||
3068 !Arrays.equals(lastGroups, groups))
3069 {
3070 List packets = new ArrayList();
3071 Discovery disco;
3072 try {
3073 disco = reggie.getDiscovery(reggie.multicastAnnouncementConstraints
3074 .chooseProtocolVersion());
3075 } catch (DiscoveryProtocolException e) {
3076 throw new AssertionError(e);
3077 }
3078 LookupLocator myLocator = reggie.myLocator;
3079 EncodeIterator ei = disco.encodeMulticastAnnouncement(
3080 new MulticastAnnouncement(reggie.announcementSeqNo.getAndIncrement(),
3081 myLocator.getHost(),
3082 myLocator.getPort(),
3083 groups,
3084 reggie.myServiceID),
3085 reggie.multicastAnnouncementConstraints
3086 .getMulticastMaxPacketSize(DEFAULT_MAX_PACKET_SIZE),
3087 reggie.multicastAnnouncementConstraints
3088 .getUnfulfilledConstraints());
3089 while (ei.hasNext()) {
3090 try {
3091 packets.addAll(Arrays.asList(ei.next()));
3092 } catch (Exception e) {
3093 DISCOVERY_LOGGER.log( (e instanceof
3094 UnsupportedConstraintException)
3095 ? Levels.HANDLED : Level.INFO,
3096 "exception encoding multicast"
3097 + " announcement", e);
3098 }
3099 }
3100 lastLocator = myLocator;
3101 lastGroups = groups;
3102 dataPackets = (DatagramPacket[]) packets.toArray(
3103 new DatagramPacket[packets.size()]);
3104 }
3105 try {
3106 send(dataPackets);
3107 } catch (InterruptedIOException e) {
3108 return false;
3109 } catch (IOException e) {
3110 DISCOVERY_LOGGER.log(
3111 Level.INFO, "exception sending multicast announcement", e);
3112 }
3113 return true;
3114 }
3115
3116
3117
3118
3119
3120 private void send(DatagramPacket[] packets)
3121 throws InterruptedIOException
3122 {
3123 if (reggie.multicastInterfaces != null) {
3124 Level failureLogLevel = reggie.multicastInterfacesSpecified ?
3125 Level.WARNING : Levels.HANDLED;
3126 int l = reggie.multicastInterfaces.length;
3127 for (int i = 0; i < l; i++) {
3128 send(packets, reggie.multicastInterfaces[i], failureLogLevel);
3129 }
3130 } else {
3131 send(packets, null, Level.WARNING);
3132 }
3133 }
3134
3135
3136
3137
3138
3139
3140
3141 private void send(DatagramPacket[] packets,
3142 NetworkInterface nic,
3143 Level failureLogLevel)
3144 throws InterruptedIOException
3145 {
3146 if (nic != null) {
3147 try {
3148 socket.setNetworkInterface(nic);
3149 } catch (SocketException e) {
3150 if (DISCOVERY_LOGGER.isLoggable(failureLogLevel)) {
3151 logThrow(DISCOVERY_LOGGER, failureLogLevel,
3152 getClass().getName(),
3153 "send",
3154 "exception setting {0}",
3155 new Object[]{ nic },
3156 e);
3157 }
3158 return;
3159 }
3160 }
3161 for (int i = 0; i < packets.length; i++) {
3162 try {
3163 socket.send(packets[i]);
3164 } catch (InterruptedIOException e) {
3165 throw e;
3166 } catch (IOException e) {
3167 if (nic != null) {
3168 if (DISCOVERY_LOGGER.isLoggable(failureLogLevel)) {
3169 logThrow(DISCOVERY_LOGGER, failureLogLevel,
3170 getClass().getName(),
3171 "send",
3172 "exception sending packet on {0}",
3173 new Object[]{ nic },
3174 e);
3175 }
3176 } else {
3177 DISCOVERY_LOGGER.log(
3178 failureLogLevel,
3179 "exception sending packet on default interface",
3180 e);
3181 }
3182 }
3183 }
3184 }
3185 }
3186
3187
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
3204
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231 private static class Snapshot implements Runnable {
3232 RegistrarImpl reggie;
3233
3234 public Snapshot(RegistrarImpl reggie) {
3235 this.reggie = reggie;
3236 }
3237
3238 public void run() {
3239 if (reggie.log == null) {
3240 return;
3241 }
3242 reggie.concurrentObj.writeLock();
3243 try {
3244 while (!Thread.currentThread().isInterrupted()) {
3245 try {
3246 reggie.snapshotNotifier.await();
3247 } catch (InterruptedException ex) {
3248 Thread.currentThread().interrupt();
3249 return;
3250 }
3251 try {
3252 reggie.log.snapshot();
3253 reggie.logFileSize.set(0);
3254 } catch (Exception e) {
3255
3256 if (Thread.currentThread().isInterrupted())
3257 return;
3258 LOGGER.log(Level.WARNING, "snapshot failed", e);
3259 }
3260 }
3261 } finally {
3262 reggie.concurrentObj.writeUnlock();
3263 }
3264 }
3265 }
3266
3267
3268 public Object getServiceProxy() throws NoSuchObjectException {
3269 concurrentObj.readLock();
3270 try {
3271 return proxy;
3272 } finally {
3273 concurrentObj.readUnlock();
3274 }
3275 }
3276
3277
3278 public Object getProxy() {
3279 concurrentObj.readLock();
3280 try {
3281 return myRef;
3282 } finally {
3283 concurrentObj.readUnlock();
3284 }
3285 }
3286
3287
3288 public TrustVerifier getProxyVerifier() throws NoSuchObjectException {
3289 concurrentObj.readLock();
3290 try {
3291 return new ProxyVerifier(myRef, myServiceID);
3292 } finally {
3293 concurrentObj.readUnlock();
3294 }
3295 }
3296
3297
3298 public ServiceRegistration register(Item nitem, long leaseDuration)
3299 throws NoSuchObjectException
3300 {
3301 concurrentObj.writeLock();
3302 try {
3303 ServiceRegistration reg = registerDo(nitem, leaseDuration);
3304 if (LOGGER.isLoggable(Level.FINE)) {
3305 LOGGER.log(
3306 Level.FINE,
3307 "registered instance of {0} as {1}",
3308 new Object[]{
3309 nitem.serviceType.getName(), reg.getServiceID() });
3310 }
3311 return reg;
3312 } finally {
3313 concurrentObj.writeUnlock();
3314 }
3315 }
3316
3317
3318 public MarshalledWrapper lookup(Template tmpl) throws NoSuchObjectException
3319 {
3320 concurrentObj.readLock();
3321 try {
3322 return lookupDo(tmpl);
3323 } finally {
3324 concurrentObj.readUnlock();
3325 }
3326 }
3327
3328
3329 public Matches lookup(Template tmpl, int maxMatches)
3330 throws NoSuchObjectException
3331 {
3332 concurrentObj.readLock();
3333 try {
3334 return lookupDo(tmpl, maxMatches);
3335 } finally {
3336 concurrentObj.readUnlock();
3337 }
3338 }
3339
3340
3341 public Object [] lookUp(Template tmpl, int maxProxys)
3342 throws NoSuchObjectException
3343 {
3344 concurrentObj.readLock();
3345 try {
3346 return lookupDo(tmpl, maxProxys).getProxys();
3347 } finally {
3348 concurrentObj.readUnlock();
3349 }
3350 }
3351
3352
3353 public EventRegistration notify(Template tmpl,
3354 int transitions,
3355 RemoteEventListener listener,
3356 MarshalledObject handback,
3357 long leaseDuration)
3358 throws RemoteException
3359 {
3360 concurrentObj.writeLock();
3361 try {
3362 EventRegistration reg = notifyDo(
3363 tmpl, transitions, listener, handback, leaseDuration, false);
3364 if (EVENT_LOGGER.isLoggable(Level.FINE)) {
3365 EVENT_LOGGER.log(
3366 Level.FINE,
3367 "registered event listener {0} as {1}",
3368 new Object[]{
3369 listener,
3370 ((ReferentUuid) reg.getLease()).getReferentUuid()
3371 });
3372 }
3373 return reg;
3374 } finally {
3375 concurrentObj.writeUnlock();
3376 }
3377 }
3378
3379
3380 public EventRegistration notiFy(Template tmpl,
3381 int transitions,
3382 RemoteEventListener listener,
3383 MarshalledInstance handback,
3384 long leaseDuration)
3385 throws RemoteException
3386 {
3387 concurrentObj.writeLock();
3388 try {
3389 EventRegistration reg = notifyDo(
3390 tmpl, transitions, listener, handback, leaseDuration, true);
3391 if (EVENT_LOGGER.isLoggable(Level.FINE)) {
3392 EVENT_LOGGER.log(
3393 Level.FINE,
3394 "registered event listener {0} as {1}",
3395 new Object[]{
3396 listener,
3397 ((ReferentUuid) reg.getLease()).getReferentUuid()
3398 });
3399 }
3400 return reg;
3401 } finally {
3402 concurrentObj.writeUnlock();
3403 }
3404 }
3405
3406
3407 public EntryClassBase[] getEntryClasses(Template tmpl)
3408 throws NoSuchObjectException
3409 {
3410 concurrentObj.readLock();
3411 try {
3412 return getEntryClassesDo(tmpl);
3413 } finally {
3414 concurrentObj.readUnlock();
3415 }
3416 }
3417
3418
3419 public Object[] getFieldValues(Template tmpl, int setIndex, int field)
3420 throws NoSuchObjectException
3421 {
3422 concurrentObj.readLock();
3423 try {
3424 return getFieldValuesDo(tmpl, setIndex, field);
3425 } finally {
3426 concurrentObj.readUnlock();
3427 }
3428 }
3429
3430
3431 public ServiceTypeBase[] getServiceTypes(Template tmpl, String prefix)
3432 throws NoSuchObjectException
3433 {
3434 concurrentObj.readLock();
3435 try {
3436 return getServiceTypesDo(tmpl, prefix);
3437 } finally {
3438 concurrentObj.readUnlock();
3439 }
3440 }
3441
3442
3443 public LookupLocator getLocator() throws NoSuchObjectException {
3444 concurrentObj.readLock();
3445 try {
3446 return myLocator;
3447 } finally {
3448 concurrentObj.readUnlock();
3449 }
3450 }
3451
3452
3453 public Object getAdmin() throws NoSuchObjectException {
3454 concurrentObj.readLock();
3455 try {
3456 return AdminProxy.getInstance(myRef, myServiceID);
3457 } finally {
3458 concurrentObj.readUnlock();
3459 }
3460 }
3461
3462
3463 public void addAttributes(ServiceID serviceID,
3464 Uuid leaseID,
3465 EntryRep[] attrSets)
3466 throws NoSuchObjectException, UnknownLeaseException
3467 {
3468 concurrentObj.writeLock();
3469 try {
3470 if (serviceID.equals(myServiceID))
3471 throw new SecurityException("privileged service id");
3472 addAttributesDo(serviceID, leaseID, attrSets);
3473 addLogRecord(new AttrsAddedLogObj(serviceID, leaseID, attrSets));
3474 } finally {
3475 concurrentObj.writeUnlock();
3476 }
3477 }
3478
3479
3480 public void modifyAttributes(ServiceID serviceID,
3481 Uuid leaseID,
3482 EntryRep[] attrSetTmpls,
3483 EntryRep[] attrSets)
3484 throws NoSuchObjectException, UnknownLeaseException
3485 {
3486 concurrentObj.writeLock();
3487 try {
3488 if (serviceID.equals(myServiceID))
3489 throw new SecurityException("privileged service id");
3490 modifyAttributesDo(serviceID, leaseID, attrSetTmpls, attrSets);
3491 addLogRecord(new AttrsModifiedLogObj(serviceID, leaseID,
3492 attrSetTmpls, attrSets));
3493 } finally {
3494 concurrentObj.writeUnlock();
3495 }
3496 }
3497
3498
3499 public void setAttributes(ServiceID serviceID,
3500 Uuid leaseID,
3501 EntryRep[] attrSets)
3502 throws NoSuchObjectException, UnknownLeaseException
3503 {
3504 concurrentObj.writeLock();
3505 try {
3506 if (serviceID.equals(myServiceID))
3507 throw new SecurityException("privileged service id");
3508 setAttributesDo(serviceID, leaseID, attrSets);
3509 addLogRecord(new AttrsSetLogObj(serviceID, leaseID, attrSets));
3510 } finally {
3511 concurrentObj.writeUnlock();
3512 }
3513 }
3514
3515
3516 public void cancelServiceLease(ServiceID serviceID, Uuid leaseID)
3517 throws NoSuchObjectException, UnknownLeaseException
3518 {
3519 concurrentObj.writeLock();
3520 try {
3521 cancelServiceLeaseDo(serviceID, leaseID);
3522 addLogRecord(new ServiceLeaseCancelledLogObj(serviceID, leaseID));
3523 if (LOGGER.isLoggable(Level.FINE)) {
3524 LOGGER.log(
3525 Level.FINE,
3526 "cancelled service registration {0}",
3527 new Object[]{ serviceID });
3528 }
3529 } finally {
3530 concurrentObj.writeUnlock();
3531 }
3532 }
3533
3534
3535 public long renewServiceLease(ServiceID serviceID,
3536 Uuid leaseID,
3537 long renewDuration)
3538 throws NoSuchObjectException, UnknownLeaseException
3539 {
3540 concurrentObj.priorityWriteLock();
3541 try {
3542 return renewServiceLeaseDo(serviceID, leaseID, renewDuration);
3543
3544 } finally {
3545 concurrentObj.writeUnlock();
3546 }
3547 }
3548
3549
3550 public void cancelEventLease(long eventID, Uuid leaseID)
3551 throws NoSuchObjectException, UnknownLeaseException
3552 {
3553 concurrentObj.writeLock();
3554 try {
3555 cancelEventLeaseDo(eventID, leaseID);
3556 addLogRecord(new EventLeaseCancelledLogObj(eventID, leaseID));
3557 if (EVENT_LOGGER.isLoggable(Level.FINE)) {
3558 EVENT_LOGGER.log(
3559 Level.FINE,
3560 "cancelled event registration {0}",
3561 new Object[]{ leaseID });
3562 }
3563 } finally {
3564 concurrentObj.writeUnlock();
3565 }
3566 }
3567
3568
3569 public long renewEventLease(long eventID, Uuid leaseID, long renewDuration)
3570 throws NoSuchObjectException, UnknownLeaseException
3571 {
3572 concurrentObj.priorityWriteLock();
3573 try {
3574 return renewEventLeaseDo(eventID, leaseID, renewDuration);
3575
3576 } finally {
3577 concurrentObj.writeUnlock();
3578 }
3579 }
3580
3581
3582 public RenewResults renewLeases(Object[] regIDs,
3583 Uuid[] leaseIDs,
3584 long[] renewDurations)
3585 throws NoSuchObjectException
3586 {
3587 concurrentObj.priorityWriteLock();
3588 try {
3589 return renewLeasesDo(regIDs, leaseIDs, renewDurations);
3590
3591 } finally {
3592 concurrentObj.writeUnlock();
3593 }
3594 }
3595
3596
3597 public Exception[] cancelLeases(Object[] regIDs, Uuid[] leaseIDs)
3598 throws NoSuchObjectException
3599 {
3600 concurrentObj.writeLock();
3601 try {
3602 Exception[] exceptions = cancelLeasesDo(regIDs, leaseIDs);
3603 addLogRecord(new LeasesCancelledLogObj(regIDs, leaseIDs));
3604 if (LOGGER.isLoggable(Level.FINE)) {
3605 for (int i = 0; i < regIDs.length; i++) {
3606 if (exceptions != null && exceptions[i] != null) {
3607 continue;
3608 }
3609 if (regIDs[i] instanceof ServiceID) {
3610 LOGGER.log(
3611 Level.FINE,
3612 "cancelled service registration {0}",
3613 new Object[]{ regIDs[i] });
3614 } else {
3615 EVENT_LOGGER.log(
3616 Level.FINE,
3617 "cancelled event registration {0}",
3618 new Object[]{ leaseIDs[i] });
3619 }
3620 }
3621 }
3622 return exceptions;
3623 } finally {
3624 concurrentObj.writeUnlock();
3625 }
3626 }
3627
3628
3629 public Entry[] getLookupAttributes() throws NoSuchObjectException {
3630 concurrentObj.readLock();
3631 try {
3632
3633 return lookupAttrs;
3634 } finally {
3635 concurrentObj.readUnlock();
3636 }
3637 }
3638
3639
3640 public void addLookupAttributes(Entry[] attrSets) throws RemoteException {
3641 concurrentObj.writeLock();
3642 try {
3643 EntryRep[] attrs = EntryRep.toEntryRep(attrSets, true);
3644 addAttributesDo(myServiceID, myLeaseID, attrs);
3645 joiner.addAttributes(attrSets);
3646 lookupAttrs = joiner.getAttributes();
3647 addLogRecord(new LookupAttributesChangedLogObj(lookupAttrs));
3648 } catch (UnknownLeaseException e) {
3649 throw new AssertionError("Self-registration never expires");
3650 } finally {
3651 concurrentObj.writeUnlock();
3652 }
3653 }
3654
3655
3656 public void modifyLookupAttributes(Entry[] attrSetTemplates,
3657 Entry[] attrSets)
3658 throws RemoteException
3659 {
3660 concurrentObj.writeLock();
3661 try {
3662 EntryRep[] tmpls = EntryRep.toEntryRep(attrSetTemplates, false);
3663 EntryRep[] attrs = EntryRep.toEntryRep(attrSets, false);
3664 modifyAttributesDo(myServiceID, myLeaseID, tmpls, attrs);
3665 joiner.modifyAttributes(attrSetTemplates, attrSets, true);
3666 lookupAttrs = joiner.getAttributes();
3667 addLogRecord(new LookupAttributesChangedLogObj(lookupAttrs));
3668 } catch (UnknownLeaseException e) {
3669 throw new AssertionError("Self-registration never expires");
3670 } finally {
3671 concurrentObj.writeUnlock();
3672 }
3673 }
3674
3675
3676 public String[] getLookupGroups() throws NoSuchObjectException {
3677 concurrentObj.readLock();
3678 try {
3679
3680 return lookupGroups;
3681 } finally {
3682 concurrentObj.readUnlock();
3683 }
3684 }
3685
3686
3687 public void addLookupGroups(String[] groups) throws NoSuchObjectException {
3688 concurrentObj.writeLock();
3689 try {
3690 DiscoveryGroupManagement dgm = (DiscoveryGroupManagement) discoer;
3691 try {
3692 dgm.addGroups(groups);
3693 } catch (IOException e) {
3694 throw new RuntimeException(e.toString());
3695 }
3696 lookupGroups = dgm.getGroups();
3697 addLogRecord(new LookupGroupsChangedLogObj(lookupGroups));
3698 if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
3699 DISCOVERY_LOGGER.log(
3700 Level.CONFIG,
3701 "added lookup groups {0}",
3702 new Object[]{ Arrays.asList(groups) });
3703 }
3704 } finally {
3705 concurrentObj.writeUnlock();
3706 }
3707 }
3708
3709
3710 public void removeLookupGroups(String[] groups)
3711 throws NoSuchObjectException
3712 {
3713 concurrentObj.writeLock();
3714 try {
3715 DiscoveryGroupManagement dgm = (DiscoveryGroupManagement) discoer;
3716 dgm.removeGroups(groups);
3717 lookupGroups = dgm.getGroups();
3718 addLogRecord(new LookupGroupsChangedLogObj(lookupGroups));
3719 if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
3720 DISCOVERY_LOGGER.log(
3721 Level.CONFIG,
3722 "removed lookup groups {0}",
3723 new Object[]{ Arrays.asList(groups) });
3724 }
3725 } finally {
3726 concurrentObj.writeUnlock();
3727 }
3728 }
3729
3730
3731 public void setLookupGroups(String[] groups) throws NoSuchObjectException {
3732 concurrentObj.writeLock();
3733 try {
3734 DiscoveryGroupManagement dgm = (DiscoveryGroupManagement) discoer;
3735 try {
3736 dgm.setGroups(groups);
3737 } catch (IOException e) {
3738 throw new RuntimeException(e.toString());
3739 }
3740 lookupGroups = dgm.getGroups();
3741 addLogRecord(new LookupGroupsChangedLogObj(lookupGroups));
3742 if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
3743 DISCOVERY_LOGGER.log(
3744 Level.CONFIG,
3745 "set lookup groups {0}",
3746 new Object[]{
3747 (groups != null) ? Arrays.asList(groups) : null });
3748 }
3749 } finally {
3750 concurrentObj.writeUnlock();
3751 }
3752 }
3753
3754
3755 public LookupLocator[] getLookupLocators() throws NoSuchObjectException {
3756 concurrentObj.readLock();
3757 try {
3758
3759 return lookupLocators;
3760 } finally {
3761 concurrentObj.readUnlock();
3762 }
3763 }
3764
3765
3766 public void addLookupLocators(LookupLocator[] locators)
3767 throws RemoteException
3768 {
3769 locators = prepareLocators(locators, locatorPreparer, false);
3770 concurrentObj.writeLock();
3771 try {
3772 DiscoveryLocatorManagement dlm =
3773 (DiscoveryLocatorManagement) discoer;
3774 dlm.addLocators(locators);
3775 lookupLocators = dlm.getLocators();
3776 addLogRecord(new LookupLocatorsChangedLogObj(lookupLocators));
3777 if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
3778 DISCOVERY_LOGGER.log(
3779 Level.CONFIG,
3780 "added lookup locators {0}",
3781 new Object[]{ Arrays.asList(locators) });
3782 }
3783 } finally {
3784 concurrentObj.writeUnlock();
3785 }
3786 }
3787
3788
3789 public void removeLookupLocators(LookupLocator[] locators)
3790 throws RemoteException
3791 {
3792 locators = prepareLocators(locators, locatorPreparer, false);
3793 concurrentObj.writeLock();
3794 try {
3795 DiscoveryLocatorManagement dlm =
3796 (DiscoveryLocatorManagement) discoer;
3797 dlm.removeLocators(locators);
3798 lookupLocators = dlm.getLocators();
3799 addLogRecord(new LookupLocatorsChangedLogObj(lookupLocators));
3800 if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
3801 DISCOVERY_LOGGER.log(
3802 Level.CONFIG,
3803 "removed lookup locators {0}",
3804 new Object[]{ Arrays.asList(locators) });
3805 }
3806 } finally {
3807 concurrentObj.writeUnlock();
3808 }
3809 }
3810
3811
3812 public void setLookupLocators(LookupLocator[] locators)
3813 throws RemoteException
3814 {
3815 locators = prepareLocators(locators, locatorPreparer, false);
3816 concurrentObj.writeLock();
3817 try {
3818 DiscoveryLocatorManagement dlm =
3819 (DiscoveryLocatorManagement) discoer;
3820 dlm.setLocators(locators);
3821 lookupLocators = dlm.getLocators();
3822 addLogRecord(new LookupLocatorsChangedLogObj(lookupLocators));
3823 if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
3824 DISCOVERY_LOGGER.log(
3825 Level.CONFIG,
3826 "set lookup locators {0}",
3827 new Object[]{ Arrays.asList(locators) });
3828 }
3829 } finally {
3830 concurrentObj.writeUnlock();
3831 }
3832 }
3833
3834
3835 public void addMemberGroups(String[] groups) throws NoSuchObjectException {
3836 concurrentObj.writeLock();
3837 try {
3838 for (int i = 0; i < groups.length; i++) {
3839 if (indexOf(memberGroups, groups[i]) < 0)
3840 memberGroups = (String[])arrayAdd(memberGroups, groups[i]);
3841 }
3842 synchronized (announcer) {
3843 announcer.notify();
3844 }
3845 addLogRecord(new MemberGroupsChangedLogObj(memberGroups));
3846 if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
3847 DISCOVERY_LOGGER.log(
3848 Level.CONFIG,
3849 "added member groups {0}",
3850 new Object[]{ Arrays.asList(groups) });
3851 }
3852 } finally {
3853 concurrentObj.writeUnlock();
3854 }
3855 }
3856
3857
3858 public void removeMemberGroups(String[] groups)
3859 throws NoSuchObjectException
3860 {
3861 concurrentObj.writeLock();
3862 try {
3863 for (int i = 0; i < groups.length; i++) {
3864 int j = indexOf(memberGroups, groups[i]);
3865 if (j >= 0)
3866 memberGroups = (String[])arrayDel(memberGroups, j);
3867 }
3868 synchronized (announcer) {
3869 announcer.notify();
3870 }
3871 addLogRecord(new MemberGroupsChangedLogObj(memberGroups));
3872 if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
3873 DISCOVERY_LOGGER.log(
3874 Level.CONFIG,
3875 "removed member groups {0}",
3876 new Object[]{ Arrays.asList(groups) });
3877 }
3878 } finally {
3879 concurrentObj.writeUnlock();
3880 }
3881 }
3882
3883
3884 public String[] getMemberGroups() throws NoSuchObjectException {
3885 concurrentObj.readLock();
3886 try {
3887
3888 return memberGroups;
3889 } finally {
3890 concurrentObj.readUnlock();
3891 }
3892 }
3893
3894
3895 public void setMemberGroups(String[] groups) throws NoSuchObjectException {
3896 concurrentObj.writeLock();
3897 try {
3898 memberGroups = (String[])removeDups(groups);
3899 addLogRecord(new MemberGroupsChangedLogObj(memberGroups));
3900 synchronized (announcer) {
3901 announcer.notify();
3902 }
3903 if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
3904 DISCOVERY_LOGGER.log(
3905 Level.CONFIG,
3906 "set member groups {0}",
3907 new Object[]{ Arrays.asList(groups) });
3908 }
3909 } finally {
3910 concurrentObj.writeUnlock();
3911 }
3912 }
3913
3914
3915 public int getUnicastPort() throws NoSuchObjectException {
3916 concurrentObj.readLock();
3917 try {
3918 return unicastPort;
3919 } finally {
3920 concurrentObj.readUnlock();
3921 }
3922 }
3923
3924
3925 public void setUnicastPort(int port) throws IOException,RemoteException {
3926 concurrentObj.writeLock();
3927 try {
3928 if (port == unicastPort)
3929 return;
3930 if ((port == 0 && unicast.port == Constants.discoveryPort) ||
3931 port == unicast.port)
3932 {
3933 unicastPort = port;
3934 addLogRecord(new UnicastPortSetLogObj(port));
3935 return;
3936 }
3937
3938 unicast = new Unicast(this, port);
3939 Thread newUnicaster = new InterruptedStatusThread( unicast , "unicast request");
3940 newUnicaster.setDaemon(false);
3941
3942 unicaster.interrupt();
3943 try {
3944 unicaster.join();
3945 } catch (InterruptedException e) {
3946 Thread.currentThread().interrupt();
3947 }
3948
3949 unicaster = newUnicaster;
3950 unicaster.start();
3951 unicastPort = port;
3952 myLocator = (proxy instanceof RemoteMethodControl) ?
3953 new ConstrainableLookupLocator(
3954 myLocator.getHost(), unicast.port, null) :
3955 new LookupLocator(myLocator.getHost(), unicast.port);
3956 synchronized (announcer) {
3957 announcer.notify();
3958 }
3959 addLogRecord(new UnicastPortSetLogObj(port));
3960 if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
3961 DISCOVERY_LOGGER.log(
3962 Level.CONFIG,
3963 "changed unicast discovery port to {0}",
3964 new Object[]{ Integer.valueOf(unicast.port) });
3965 }
3966 } finally {
3967 concurrentObj.writeUnlock();
3968 }
3969 }
3970
3971
3972 public void destroy() throws RemoteException {
3973 concurrentObj.priorityWriteLock();
3974 try {
3975 LOGGER.info("starting Reggie shutdown");
3976
3977 if (activationID != null) {
3978 try {
3979 activationSystem.unregisterObject(activationID);
3980 } catch (ActivationException e) {
3981 LOGGER.log(Levels.HANDLED,
3982 "exception unregistering activation ID", e);
3983 } catch (RemoteException e) {
3984 LOGGER.log(Level.WARNING,
3985 "aborting Reggie shutdown", e);
3986 throw e;
3987 }
3988 }
3989 Thread destroy = new Thread(new Destroy(this), "destroy");
3990 destroy.setDaemon(false);
3991 destroy.start();
3992 } finally {
3993 concurrentObj.writeUnlock();
3994 }
3995 }
3996
3997
3998
3999
4000
4001 private static Object[] arrayAdd(Object[] array, Object elt) {
4002 int len = array.length;
4003 Object[] narray =
4004 (Object[])Array.newInstance(array.getClass().getComponentType(),
4005 len + 1);
4006 System.arraycopy(array, 0, narray, 0, len);
4007 narray[len] = elt;
4008 return narray;
4009 }
4010
4011
4012
4013
4014
4015 private static Object[] arrayDel(Object[] array, int i) {
4016 int len = array.length - 1;
4017 Object[] narray =
4018 (Object[])Array.newInstance(array.getClass().getComponentType(),
4019 len);
4020 System.arraycopy(array, 0, narray, 0, i);
4021 System.arraycopy(array, i + 1, narray, i, len - i);
4022 return narray;
4023 }
4024
4025
4026 private static int indexOf(Object[] array, Object elt) {
4027 return indexOf(array, array.length, elt);
4028 }
4029
4030
4031 private static int indexOf(Object[] array, int len, Object elt) {
4032 for (int i = 0; i < len; i++) {
4033 if (elt.equals(array[i]))
4034 return i;
4035 }
4036 return -1;
4037 }
4038
4039
4040 private static boolean isEmpty(Object[] array) {
4041 return (array == null || array.length == 0);
4042 }
4043
4044
4045 private static boolean overlap(Object[] arr1, Object[] arr2) {
4046 for (int i = arr1.length; --i >= 0; ) {
4047 if (indexOf(arr2, arr1[i]) >= 0)
4048 return true;
4049 }
4050 return false;
4051 }
4052
4053
4054 private static boolean allNull(Object[] array) {
4055 for (int i = array.length; --i >= 0; ) {
4056 if (array[i] != null)
4057 return false;
4058 }
4059 return true;
4060 }
4061
4062
4063 private static boolean allNull(List array) {
4064 for (int i = array.size(); --i >= 0; ) {
4065 if (array.get(i) != null)
4066 return false;
4067 }
4068 return true;
4069 }
4070
4071
4072 private static Object[] removeDups(Object[] arr) {
4073 for (int i = arr.length; --i >= 0; ) {
4074 if (indexOf(arr, i, arr[i]) >= 0)
4075 arr = arrayDel(arr, i);
4076 }
4077 return arr;
4078 }
4079
4080
4081 private static EntryRep[] deleteSet(Item item, int i) {
4082 item.setAttributeSets((EntryRep[]) arrayDel(item.getAttributeSets(), i));
4083 return item.getAttributeSets();
4084 }
4085
4086
4087
4088
4089
4090
4091 private static Item copyItem(Item item) {
4092 EntryRep[] attrSets = item.getAttributeSets();
4093 for (int i = attrSets.length; --i >= 0; ) {
4094 attrSets[i] = new EntryRep(attrSets[i], true);
4095 }
4096 return new Item(item.getServiceID(), null, null, item.service, attrSets, item.getProxy() );
4097 }
4098
4099
4100
4101
4102
4103
4104 private static EntryClass getDefiningClass(EntryClass eclass, int fldidx) {
4105 while (true) {
4106 EntryClass sup = eclass.getSuperclass();
4107 if (sup.getNumFields() <= fldidx)
4108 return eclass;
4109 eclass = sup;
4110 }
4111 }
4112
4113
4114 private void addServiceByTypes(ServiceType type, SvcReg reg) {
4115 Map<ServiceID,SvcReg> map = serviceByTypeName.get(type.getName());
4116 if (map == null) {
4117 map = new HashMap<ServiceID,SvcReg>();
4118 serviceByTypeName.put(type.getName(), map);
4119 }
4120 map.put(reg.item.getServiceID(), reg);
4121 ServiceType[] ifaces = type.getInterfaces();
4122 for (int i = ifaces.length; --i >= 0; ) {
4123 addServiceByTypes(ifaces[i], reg);
4124 }
4125 ServiceType sup = type.getSuperclass();
4126 if (sup != null)
4127 addServiceByTypes(sup, reg);
4128 }
4129
4130
4131 private void deleteServiceFromTypes(ServiceType type, SvcReg reg)
4132 {
4133 Map<ServiceID,SvcReg> map = serviceByTypeName.get(type.getName());
4134 if (map != null) {
4135 map.remove(reg.item.getServiceID());
4136 if ((map.isEmpty()) && !type.equals(objectServiceType))
4137 serviceByTypeName.remove(type.getName());
4138 ServiceType[] ifaces = type.getInterfaces();
4139 for (int j = ifaces.length; --j >= 0; ) {
4140 deleteServiceFromTypes(ifaces[j], reg);
4141 }
4142 ServiceType sup = type.getSuperclass();
4143 if (sup != null)
4144 deleteServiceFromTypes(sup, reg);
4145 }
4146 }
4147
4148
4149
4150
4151
4152 private static boolean matchItem(Template tmpl, Item item) {
4153 return ((tmpl.serviceID() == null ||
4154 tmpl.serviceID().equals(item.getServiceID())) &&
4155 matchType(tmpl.serviceTypes(), item.serviceType) &&
4156 matchAttributes(tmpl, item));
4157 }
4158
4159
4160 private static boolean matchType(ServiceType[] types, ServiceType type) {
4161 if (types != null) {
4162 for (int i = types.length; --i >= 0; ) {
4163 if (!types[i].isAssignableFrom(type))
4164 return false;
4165 }
4166 }
4167 return true;
4168 }
4169
4170
4171
4172
4173
4174 private static boolean matchEntry(EntryRep tmpl, EntryRep entry) {
4175 return entry.matchEntry(tmpl);
4176 }
4177
4178
4179
4180
4181
4182 private static boolean matchAttributes(Template tmpl, Item item) {
4183 EntryRep[] tmpls = tmpl.attributeSetTemplates();
4184 if (tmpls != null) {
4185 EntryRep[] entries = item.getAttributeSets();
4186 outer:
4187 for (int i = tmpls.length; --i >= 0; ) {
4188 EntryRep etmpl = tmpls[i];
4189 for (int j = entries.length; --j >= 0; ) {
4190 if (matchEntry(etmpl, entries[j]))
4191 continue outer;
4192 }
4193 return false;
4194 }
4195 }
4196 return true;
4197 }
4198
4199
4200
4201
4202
4203 private static boolean attrMatch(EntryRep[] tmpls, EntryRep attrSet) {
4204 boolean good = true;
4205 if (tmpls != null) {
4206 for (int i = tmpls.length; --i >= 0; ) {
4207 EntryRep tmpl = tmpls[i];
4208 if (matchEntry(tmpl, attrSet)) {
4209 if (tmpl.eclass.isAssignableFrom(attrSet.eclass) &&
4210 !tmpl.eclass.equals(attrSet.eclass))
4211 return true;
4212 good = false;
4213 }
4214 }
4215 }
4216 return good;
4217 }
4218
4219
4220
4221
4222
4223 private static boolean hasAttr(SvcReg reg,
4224 EntryClass eclass,
4225 int fldidx,
4226 Object value)
4227 {
4228 EntryRep[] sets = reg.item.getAttributeSets();
4229 for (int i = sets.length; --i >= 0; ) {
4230 EntryRep set = sets[i];
4231 if (eclass.isAssignableFrom(set.eclass) &&
4232 ((value == null && set.fields().get(fldidx) == null) ||
4233 (value != null && value.equals(set.fields().get(fldidx)))))
4234 return true;
4235 }
4236 return false;
4237 }
4238
4239
4240
4241
4242
4243 private static boolean hasEmptyAttr(SvcReg reg, EntryClass eclass) {
4244 EntryRep[] sets = reg.item.getAttributeSets();
4245 for (int i = sets.length; --i >= 0; ) {
4246 if (eclass.equals(sets[i].eclass))
4247 return true;
4248 }
4249 return false;
4250 }
4251
4252
4253
4254
4255
4256
4257 private static void addTypes(List types,
4258 List codebases,
4259 ServiceType[] bases,
4260 String prefix,
4261 ServiceType type,
4262 String codebase)
4263 {
4264 if (types.contains(type))
4265 return;
4266 if (bases != null) {
4267 for (int i = bases.length; --i >= 0; ) {
4268 if (type.isAssignableFrom(bases[i]))
4269 return;
4270 }
4271 }
4272 if (prefix == null || type.getName().startsWith(prefix)) {
4273 types.add(type);
4274 codebases.add(codebase);
4275 return;
4276 }
4277 ServiceType[] ifs = type.getInterfaces();
4278 for (int i = ifs.length; --i >= 0; ) {
4279 addTypes(types, codebases, bases, prefix, ifs[i], codebase);
4280 }
4281 ServiceType sup = type.getSuperclass();
4282 if (sup != null)
4283 addTypes(types, codebases, bases, prefix, sup, codebase);
4284 }
4285
4286
4287 private static long limitDuration(long leaseDuration, long limit) {
4288 if (leaseDuration == Lease.ANY || leaseDuration > limit)
4289 leaseDuration = limit;
4290 else if (leaseDuration < 0)
4291 throw new IllegalArgumentException("negative lease duration");
4292 return leaseDuration;
4293 }
4294
4295
4296
4297
4298
4299 private static void marshalAttributes(Entry[] attrs,
4300 ObjectOutputStream out)
4301 throws IOException
4302 {
4303 int len = attrs.length;
4304 for (int i=0; i < len; i++) {
4305 out.writeObject(new MarshalledInstance(attrs[i]));
4306 }
4307 out.writeObject(null);
4308 }
4309
4310
4311
4312
4313
4314
4315 private static Entry[] unmarshalAttributes(ObjectInputStream in)
4316 throws IOException, ClassNotFoundException
4317 {
4318 List<Entry> attributes = new LinkedList<Entry>();
4319 MarshalledInstance mi = null;
4320 while ((mi = (MarshalledInstance) in.readObject()) != null) {
4321 try {
4322 attributes.add((Entry) mi.get(false));
4323 } catch (Throwable e) {
4324 if (e instanceof Error &&
4325 ThrowableConstants.retryable(e) ==
4326 ThrowableConstants.BAD_OBJECT)
4327 {
4328 throw (Error) e;
4329 }
4330 LOGGER.log(
4331 Level.WARNING, "failed to recover LUS attribute", e);
4332 }
4333 }
4334 Entry[] attrs = new Entry[attributes.size()];
4335 return attributes.toArray(attrs);
4336
4337 }
4338
4339
4340
4341
4342
4343
4344 private static void marshalLocators(LookupLocator[] locators,
4345 ObjectOutputStream out)
4346 throws IOException
4347 {
4348 int len = locators.length;
4349 for (int i = 0; i < len; i++) {
4350 out.writeObject(new MarshalledInstance(locators[i]));
4351 }
4352 out.writeObject(null);
4353 }
4354
4355
4356
4357
4358
4359
4360 private static LookupLocator[] unmarshalLocators(ObjectInputStream in)
4361 throws IOException, ClassNotFoundException
4362 {
4363 List<LookupLocator> l = new LinkedList<LookupLocator>();
4364 MarshalledInstance mi;
4365 while ((mi = (MarshalledInstance) in.readObject()) != null) {
4366 try {
4367 l.add((LookupLocator) mi.get(false));
4368 } catch (Throwable e) {
4369 if (e instanceof Error &&
4370 ThrowableConstants.retryable(e) ==
4371 ThrowableConstants.BAD_OBJECT)
4372 {
4373 throw (Error) e;
4374 }
4375 LOGGER.log(
4376 Level.WARNING, "failed to recover lookup locator", e);
4377 }
4378 }
4379 return l.toArray(new LookupLocator[l.size()]);
4380 }
4381
4382
4383
4384
4385
4386
4387
4388
4389 private static LookupLocator[] prepareLocators(LookupLocator[] locators,
4390 ProxyPreparer preparer,
4391 boolean tolerateFailures)
4392 throws RemoteException
4393 {
4394 int len = locators.length;
4395 List<LookupLocator> l = new ArrayList<LookupLocator>(len);
4396 for (int i = 0; i < len; i++) {
4397 try {
4398 l.add((LookupLocator) preparer.prepareProxy(locators[i]));
4399 } catch (Exception e) {
4400 if (!tolerateFailures) {
4401 if (e instanceof RemoteException) {
4402 throw (RemoteException) e;
4403 } else {
4404 throw (RuntimeException) e;
4405 }
4406 }
4407 if (DISCOVERY_LOGGER.isLoggable(Level.WARNING)) {
4408 logThrow(DISCOVERY_LOGGER, Level.WARNING,
4409 RegistrarImpl.class.getName(),
4410 "prepareLocators",
4411 "failed to prepare lookup locator {0}",
4412 new Object[]{ locators[i] },
4413 e);
4414 }
4415 }
4416 }
4417 return l.toArray(new LookupLocator[l.size()]);
4418 }
4419
4420
4421
4422
4423 private static void logThrow(Logger logger,
4424 Level level,
4425 String className,
4426 String methodName,
4427 String message,
4428 Object[] args,
4429 Throwable thrown)
4430 {
4431 java.util.logging.LogRecord lr =
4432 new java.util.logging.LogRecord(level, message);
4433 lr.setLoggerName(logger.getName());
4434 lr.setSourceClassName(className);
4435 lr.setSourceMethodName(methodName);
4436 lr.setParameters(args);
4437 lr.setThrown(thrown);
4438 logger.log(lr);
4439 }
4440
4441
4442
4443
4444
4445
4446
4447
4448
4449
4450
4451 private void addService(SvcReg reg) {
4452 serviceByID.put(reg.item.getServiceID(), reg);
4453 serviceByTime.add(reg);
4454 addServiceByTypes(reg.item.serviceType, reg);
4455 EntryRep[] entries = reg.item.getAttributeSets();
4456 for (int i = entries.length; --i >= 0; ) {
4457 addAttrs(reg, entries[i]);
4458 }
4459 computeMaxLeases();
4460 }
4461
4462
4463
4464
4465
4466
4467
4468
4469
4470 private void deleteService(SvcReg reg, long now) {
4471 Item item = reg.item;
4472 generateEvents(item, null, now);
4473 serviceByID.remove(item.getServiceID());
4474 serviceByTime.remove(reg);
4475 deleteServiceFromTypes(item.serviceType, reg);
4476 EntryRep[] entries = item.getAttributeSets();
4477 for (int i = entries.length; --i >= 0; ) {
4478 deleteAttrs(reg, entries[i], false);
4479 }
4480 computeMaxLeases();
4481 }
4482
4483
4484
4485
4486
4487
4488
4489
4490
4491
4492 private void addEvent(EventReg reg) {
4493 if (reg.listener == null)
4494 return;
4495 EntryRep[] tmpls = reg.tmpl.attributeSetTemplates();
4496 if (tmpls != null) {
4497 for (int i = tmpls.length; --i >= 0; ) {
4498 EntryClass eclass = tmpls[i].eclass;
4499 eclass.setNumTemplates(eclass.getNumTemplates() + 1);
4500 }
4501 }
4502 Long id = Long.valueOf(reg.eventID);
4503 eventByID.put(id, reg);
4504 eventByTime.offer(reg);
4505 eventTaskMap.put(reg, eventNotifierExec.newSerialExecutor(new PriorityBlockingQueue()) );
4506 if (reg.tmpl.serviceID() != null) {
4507 Object val = subEventByService.get(reg.tmpl.serviceID());
4508 if (val == null)
4509 val = reg;
4510 else if (val instanceof EventReg)
4511 val = new EventReg[]{(EventReg)val, reg};
4512 else
4513 val = arrayAdd((EventReg[])val, reg);
4514 subEventByService.put(reg.tmpl.serviceID(), val);
4515 } else {
4516 subEventByID.put(id, reg);
4517 }
4518 computeMaxLeases();
4519 }
4520
4521
4522
4523
4524
4525
4526
4527 private void deleteEvent(EventReg reg) {
4528 EntryRep[] tmpls = reg.tmpl.attributeSetTemplates();
4529 if (tmpls != null) {
4530 for (int i = tmpls.length; --i >= 0; ) {
4531 EntryClass eclass = tmpls[i].eclass;
4532 eclass.setNumTemplates(eclass.getNumTemplates() - 1);
4533 }
4534 }
4535 Long id = Long.valueOf(reg.eventID);
4536 eventByID.remove(id);
4537 eventByTime.remove(reg);
4538 eventTaskMap.remove(reg);
4539 if (reg.tmpl.serviceID() != null) {
4540 Object val = subEventByService.get(reg.tmpl.serviceID());
4541 if (val == reg) {
4542 subEventByService.remove(reg.tmpl.serviceID());
4543 } else {
4544 Object[] array = (EventReg[])val;
4545 array = arrayDel(array, indexOf(array, reg));
4546 if (array.length == 1)
4547 val = array[0];
4548 else
4549 val = array;
4550 subEventByService.put(reg.tmpl.serviceID(), val);
4551 }
4552 } else {
4553 subEventByID.remove(id);
4554 }
4555 computeMaxLeases();
4556 }
4557
4558
4559
4560
4561
4562
4563
4564 private void addAttrs(SvcReg reg, EntryRep entry) {
4565 EntryClass eclass = entry.eclass;
4566 addInstance(eclass);
4567 List fields = entry.fields();
4568 if ( fields.size() > 0) {
4569
4570 for (int i = fields.size(); --i >= 0; ) {
4571 eclass = getDefiningClass(eclass, i);
4572 addAttr(reg, eclass, i, fields.get(i));
4573 }
4574 return;
4575 }
4576 List regs = serviceByEmptyAttr.get(eclass);
4577 if (regs == null) {
4578 regs = new ArrayList(2);
4579 regs.add(reg);
4580 serviceByEmptyAttr.put(eclass, regs);
4581 } else if (!regs.contains(reg)) {
4582 regs.add(reg);
4583 }
4584 }
4585
4586
4587
4588
4589
4590
4591
4592
4593
4594
4595 private void deleteAttrs(SvcReg reg, EntryRep entry, boolean checkDups) {
4596 EntryClass eclass = entry.eclass;
4597 deleteInstance(eclass);
4598 List fields = entry.fields();
4599 if ( fields.isEmpty()) {
4600 List regs = serviceByEmptyAttr.get(eclass);
4601 if (regs == null || (checkDups && hasEmptyAttr(reg, eclass)))
4602 return;
4603 int idx = regs.indexOf(reg);
4604 if (idx >= 0) {
4605 regs.remove(idx);
4606 if (regs.isEmpty())
4607 serviceByEmptyAttr.remove(eclass);
4608 }
4609 return;
4610 }
4611
4612 for (int fldidx = fields.size(); --fldidx >= 0; ) {
4613 eclass = getDefiningClass(eclass, fldidx);
4614 Map<Object,List<SvcReg>>[] attrMaps = serviceByAttr.get(eclass);
4615 if (attrMaps == null ||
4616 attrMaps[fldidx] == null ||
4617 (checkDups && hasAttr(reg, eclass, fldidx, fields.get(fldidx))))
4618 continue;
4619 Map<Object,List<SvcReg>> map = attrMaps[fldidx];
4620 Object value = fields.get(fldidx);
4621 List<SvcReg> regs = map.get(value);
4622 if (regs == null)
4623 continue;
4624 int idx = regs.indexOf(reg);
4625 if (idx < 0)
4626 continue;
4627 regs.remove(idx);
4628 if (!regs.isEmpty())
4629 continue;
4630 map.remove(value);
4631 if (!map.isEmpty())
4632 continue;
4633 attrMaps[fldidx] = null;
4634 if (allNull(attrMaps))
4635 serviceByAttr.remove(eclass);
4636 }
4637 }
4638
4639
4640
4641
4642
4643 private void updateAttrs(SvcReg reg, EntryRep entry, List values)
4644 {
4645 EntryClass eclass = entry.eclass;
4646
4647 for (int fldidx = values.size(); --fldidx >= 0; ) {
4648 Object oval = entry.fields().get(fldidx);
4649 Object nval = values.get(fldidx);
4650 if (nval != null && !nval.equals(oval)) {
4651 eclass = getDefiningClass(eclass, fldidx);
4652 Map<Object,List<SvcReg>> map = addAttr(reg, eclass, fldidx, nval);
4653 entry.fields().set(fldidx, nval);
4654 if (hasAttr(reg, eclass, fldidx, oval))
4655 continue;
4656 List regs = map.get(oval);
4657 regs.remove(regs.indexOf(reg));
4658 if (regs.isEmpty())
4659 map.remove(oval);
4660 }
4661 }
4662 }
4663
4664
4665
4666
4667
4668
4669 private Map<Object,List<SvcReg>> addAttr(SvcReg reg,
4670 EntryClass eclass,
4671 int fldidx,
4672 Object value)
4673 {
4674 Map<Object,List<SvcReg>>[] attrMaps = serviceByAttr.get(eclass);
4675 if (attrMaps == null) {
4676 attrMaps = new Map[eclass.getNumFields()];
4677 serviceByAttr.put(eclass, attrMaps);
4678 }
4679 Map<Object,List<SvcReg>> map = attrMaps[fldidx];
4680 if (map == null) {
4681 map = new HashMap(11);
4682 attrMaps[fldidx] = map;
4683 }
4684 List<SvcReg> regs = map.get(value);
4685 if (regs == null) {
4686 regs = new ArrayList(3);
4687 map.put(value, regs);
4688 } else if (regs.contains(reg))
4689 return map;
4690 regs.add(reg);
4691 return map;
4692 }
4693
4694
4695
4696
4697
4698 private void addInstance(EntryClass eclass) {
4699 int idx = entryClasses.indexOf(eclass);
4700 if (idx < 0) {
4701 entryClasses.add(eclass);
4702 idx = entryClasses.size() - 1;
4703 }
4704 eclass = (EntryClass) entryClasses.get(idx);
4705 eclass.setNumInstances(eclass.getNumInstances() + 1);
4706 }
4707
4708
4709
4710
4711
4712 private void deleteInstance(EntryClass eclass) {
4713 int idx = entryClasses.indexOf(eclass);
4714 eclass = (EntryClass) entryClasses.get(idx);
4715 int num = eclass.getNumInstances() - 1;
4716 if (num == 0)
4717 entryClasses.remove(idx);
4718 eclass.setNumInstances(num);
4719 }
4720
4721
4722 private ItemIter matchingItems(Template tmpl) {
4723 if (tmpl.serviceID() != null)
4724 return new IDItemIter(tmpl, serviceByID.get(tmpl.serviceID()));
4725 if (tmpl.serviceTypesLength() > 0){
4726 Map<ServiceID,SvcReg> map = serviceByTypeName.get(
4727 tmpl.serviceTypeAtIndex(0).getName());
4728 Iterator<SvcReg> services = map != null ? map.values().iterator() :
4729 Collections.EMPTY_LIST.iterator();
4730 return new SvcIterator(tmpl, services);
4731 }
4732 EntryRep[] sets = tmpl.attributeSetTemplates();
4733 if (isEmpty(sets))
4734 return new AllItemIter(serviceByID.values().iterator());
4735 for (int i = sets.length; --i >= 0; ) {
4736 List fields = sets[i].fields();
4737 if (fields.isEmpty()) {
4738 EntryClass eclass = getEmptyEntryClass(sets[i].eclass);
4739 if (eclass != null)
4740 return new AttrItemIter(tmpl, serviceByEmptyAttr.get(eclass));
4741 } else {
4742
4743 for (int j = fields.size(); --j >= 0; ) {
4744 if (fields.get(j) != null){
4745 EntryRep set = tmpl.attributeSetTemplateAtIndex(i);
4746 Map<Object,List<SvcReg>>[] attrMaps =
4747 serviceByAttr.get(getDefiningClass(set.eclass,j));
4748 List<SvcReg> svcs = null;
4749 if (attrMaps != null && attrMaps[j] != null) {
4750 svcs = attrMaps[j].get(set.fields().get(j));
4751 }
4752 return new AttrItemIter(tmpl, svcs);
4753 }
4754 }
4755 }
4756 }
4757 return new ClassItemIter(tmpl);
4758 }
4759
4760
4761
4762
4763
4764
4765 private EntryClass getEmptyEntryClass(EntryClass eclass) {
4766 EntryClass match = null;
4767 for (int i = entryClasses.size(); --i >= 0; ) {
4768 EntryClass cand = (EntryClass)entryClasses.get(i);
4769 if (eclass.isAssignableFrom(cand)) {
4770 if (cand.getNumFields() != 0 || match != null)
4771 return null;
4772 match = cand;
4773 }
4774 }
4775 return match;
4776 }
4777
4778
4779 private List<SvcReg> matchingServices(ServiceType[] types) {
4780 List<SvcReg> matches = new LinkedList<SvcReg>();
4781 if (isEmpty(types)) {
4782 Map<ServiceID,SvcReg> map = serviceByTypeName.get(objectServiceType.getName());
4783 matches.addAll(map.values());
4784 } else {
4785 Map map = (Map) serviceByTypeName.get(types[0].getName());
4786 if (map != null)
4787 matches.addAll(map.values());
4788 if (types.length > 1) {
4789 for (Iterator<SvcReg> it = matches.iterator(); it.hasNext(); ) {
4790 SvcReg reg = it.next();
4791 if (!matchType(types, reg.item.serviceType))
4792 it.remove();
4793 }
4794 }
4795 }
4796 return matches;
4797 }
4798
4799
4800 private String pickCodebase(EntryClass eclass, long now)
4801 throws ClassNotFoundException
4802 {
4803 if (eclass.getNumFields() == 0)
4804 return pickCodebase(eclass,
4805 serviceByEmptyAttr.get(eclass),
4806 now);
4807 int fldidx = eclass.getNumFields() - 1;
4808 Map<Object,List<SvcReg>>[] attrMaps =
4809 serviceByAttr.get(getDefiningClass(eclass, fldidx));
4810 for (Iterator<List<SvcReg>> iter = attrMaps[fldidx].values().iterator();
4811 iter.hasNext(); )
4812 {
4813 try {
4814 return pickCodebase(eclass, iter.next(), now);
4815 } catch (ClassNotFoundException e) {
4816 }
4817 }
4818 throw new ClassNotFoundException();
4819 }
4820
4821
4822 private String pickCodebase(EntryClass eclass, List<SvcReg> svcs, long now)
4823 throws ClassNotFoundException
4824 {
4825 for (int i = svcs.size(); --i >= 0; ) {
4826 SvcReg reg = svcs.get(i);
4827 if (reg.leaseExpiration <= now)
4828 continue;
4829 EntryRep[] sets = reg.item.getAttributeSets();
4830 for (int j = sets.length; --j >= 0; ) {
4831 if (eclass.equals(sets[j].eclass))
4832 return sets[j].codebase;
4833 }
4834 }
4835 throw new ClassNotFoundException();
4836 }
4837
4838
4839
4840
4841
4842
4843
4844
4845
4846 private void computeMaxLeases() {
4847 if (inRecovery)
4848 return;
4849 maxServiceLease =
4850 Math.max(minMaxServiceLease,
4851 minRenewalInterval *
4852 (serviceByID.size() +
4853 ((eventByID.size() * minMaxServiceLease) /
4854 minMaxEventLease)));
4855 maxEventLease = Math.max(minMaxEventLease,
4856 ((maxServiceLease * minMaxEventLease) /
4857 minMaxServiceLease));
4858 }
4859
4860
4861 private void respondHttps(Socket socket) throws Exception {
4862 try {
4863 try {
4864 socket.setTcpNoDelay(true);
4865 socket.setKeepAlive(true);
4866 } catch (SocketException e) {
4867 if (DISCOVERY_LOGGER.isLoggable(Levels.HANDLED))
4868 DISCOVERY_LOGGER.log(Levels.HANDLED,
4869 "problem setting socket options", e);
4870 }
4871 socket.setSoTimeout(
4872 unicastDiscoveryConstraints.getUnicastSocketTimeout(
4873 DEFAULT_SOCKET_TIMEOUT));
4874
4875 httpsDiscovery.handleUnicastDiscovery(
4876 new UnicastResponse(myLocator.getHost(),
4877 myLocator.getPort(),
4878 memberGroups,
4879 proxy),
4880 socket,
4881 unicastDiscoveryConstraints.getUnfulfilledConstraints(),
4882 unicastDiscoverySubjectChecker,
4883 Collections.EMPTY_LIST);
4884 } finally {
4885 try {
4886 socket.close();
4887 } catch (IOException e) {
4888 DISCOVERY_LOGGER.log(Levels.HANDLED, "exception closing socket", e);
4889 }
4890 }
4891 }
4892
4893
4894 private void respond(Socket socket) throws Exception {
4895 try {
4896 try {
4897 socket.setTcpNoDelay(true);
4898 socket.setKeepAlive(true);
4899 } catch (SocketException e) {
4900 if (DISCOVERY_LOGGER.isLoggable(Levels.HANDLED))
4901 DISCOVERY_LOGGER.log(Levels.HANDLED,
4902 "problem setting socket options", e);
4903 }
4904 socket.setSoTimeout(
4905 unicastDiscoveryConstraints.getUnicastSocketTimeout(
4906 DEFAULT_SOCKET_TIMEOUT));
4907 int pv = new DataInputStream(socket.getInputStream()).readInt();
4908 unicastDiscoveryConstraints.checkProtocolVersion(pv);
4909 getDiscovery(pv).handleUnicastDiscovery(
4910 new UnicastResponse(myLocator.getHost(),
4911 myLocator.getPort(),
4912 memberGroups,
4913 proxy),
4914 socket,
4915 unicastDiscoveryConstraints.getUnfulfilledConstraints(),
4916 unicastDiscoverySubjectChecker,
4917 Collections.EMPTY_LIST);
4918 } finally {
4919 try {
4920 socket.close();
4921 } catch (IOException e) {
4922 DISCOVERY_LOGGER.log(Levels.HANDLED, "exception closing socket", e);
4923 }
4924 }
4925 }
4926
4927
4928 private Discovery getDiscovery(int version)
4929 throws DiscoveryProtocolException
4930 {
4931 switch (version) {
4932 case Discovery.PROTOCOL_VERSION_1:
4933 return Discovery.getProtocol1();
4934 case Discovery.PROTOCOL_VERSION_2:
4935 return protocol2;
4936 default:
4937 throw new DiscoveryProtocolException(
4938 "unsupported protocol version: " + version);
4939 }
4940 }
4941
4942
4943 private void closeRequestSockets(List<Runnable> tasks) {
4944 for (int i = tasks.size(); --i >= 0; ) {
4945 Runnable obj = tasks.get(i);
4946 if (obj instanceof SocketTask) {
4947 try {
4948 ((SocketTask)obj).socket.close();
4949 } catch (IOException e) {
4950 }
4951 }
4952 }
4953 }
4954
4955
4956
4957
4958 private static final class Initializer{
4959 LifeCycle lifeCycle;
4960 ServerSocketFactory serverSocketFactory;
4961 int persistenceSnapshotThreshold;
4962 SocketFactory socketFactory;
4963 ProxyPreparer recoveredListenerPreparer;
4964 float persistenceSnapshotWeight;
4965 ProxyPreparer recoveredLocatorPreparer;
4966 boolean inRecovery;
4967 ActivationID activationID;
4968 ActivationSystem activationSystem;
4969 Exporter serverExporter;
4970 String[] lookupGroups = DiscoveryGroupManagement.NO_GROUPS;
4971 LookupLocator[] lookupLocators = {};
4972 String[] memberGroups = {""};
4973 int unicastPort = 0;
4974 Entry[] lookupAttrs;
4975 DiscoveryManagement discoer;
4976 ProxyPreparer listenerPreparer;
4977 ProxyPreparer locatorPreparer;
4978 long minMaxEventLease;
4979 long minMaxServiceLease;
4980 long minRenewalInterval;
4981 long multicastAnnouncementInterval;
4982 int multicastInterfaceRetryInterval;
4983 NetworkInterface[] multicastInterfaces;
4984 boolean multicastInterfacesSpecified;
4985 UuidGenerator resourceIdGenerator;
4986 UuidGenerator serviceIdGenerator;
4987 long unexportTimeout;
4988 long unexportWait;
4989 ServiceType objectServiceType;
4990 ClientSubjectChecker unicastDiscoverySubjectChecker;
4991 Discovery protocol2;
4992 InvocationConstraints rawUnicastDiscoveryConstraints;
4993 DiscoveryConstraints multicastRequestConstraints;
4994 DiscoveryConstraints multicastAnnouncementConstraints;
4995 DiscoveryConstraints unicastDiscoveryConstraints;
4996 ClientSubjectChecker multicastRequestSubjectChecker;
4997 LoginContext loginContext;
4998 String persistenceDirectory = null;
4999 boolean persistent;
5000 String unicastDiscoveryHost;
5001 Configuration config;
5002 AccessControlContext context;
5003 ExecutorService executor;
5004 ScheduledExecutorService scheduledExecutor;
5005 String certFactoryType;
5006 String certPathEncoding;
5007 byte [] encodedCerts;
5008 private int httpsUnicastPort;
5009 boolean enableHttpsUnicast;
5010 Discovery httpsDiscovery;
5011 String codebase;
5012
5013
5014
5015 Initializer ( Configuration config, ActivationID activationID,
5016 boolean persistent, LifeCycle lifeCycle, LoginContext loginContext)
5017 throws IOException, ConfigurationException, ActivationException {
5018 if (activationID != null && !persistent) {
5019 throw new IllegalArgumentException();
5020 }
5021 this.lifeCycle = lifeCycle;
5022 this.loginContext = loginContext;
5023 this.persistent = persistent;
5024 this.config = config;
5025 context = AccessController.getContext();
5026 ProxyPreparer p = new BasicProxyPreparer();
5027
5028 this.serverSocketFactory = (ServerSocketFactory) config.getEntry(
5029 COMPONENT, "serverSocketFactory", ServerSocketFactory.class,
5030 ServerSocketFactory.getDefault(), Configuration.NO_DATA);
5031 this.socketFactory = (SocketFactory) config.getEntry(
5032 COMPONENT, "socketFactory", SocketFactory.class,
5033 SocketFactory.getDefault(), Configuration.NO_DATA);
5034
5035
5036 if (persistent) {
5037 this.persistenceSnapshotThreshold = Config.getIntEntry(
5038 config, COMPONENT, "persistenceSnapshotThreshold",
5039 200, 0, Integer.MAX_VALUE);
5040 this.persistenceDirectory = (String) config.getEntry(
5041 COMPONENT, "persistenceDirectory", String.class);
5042 this.recoveredListenerPreparer = (ProxyPreparer) Config.getNonNullEntry(
5043 config, COMPONENT, "recoveredListenerPreparer",
5044 ProxyPreparer.class, p);
5045 this.recoveredLocatorPreparer = (ProxyPreparer) Config.getNonNullEntry(
5046 config, COMPONENT, "recoveredLocatorPreparer",
5047 ProxyPreparer.class, p);
5048 this.persistenceSnapshotWeight = Config.getFloatEntry(
5049 config, COMPONENT, "persistenceSnapshotWeight",
5050 10, 0F, Float.MAX_VALUE);
5051 }
5052
5053
5054 if (activationID != null) {
5055 ProxyPreparer activationIdPreparer = (ProxyPreparer)
5056 Config.getNonNullEntry(
5057 config, COMPONENT, "activationIdPreparer",
5058 ProxyPreparer.class, new BasicProxyPreparer());
5059 ProxyPreparer activationSystemPreparer = (ProxyPreparer)
5060 Config.getNonNullEntry(
5061 config, COMPONENT, "activationSystemPreparer",
5062 ProxyPreparer.class, new BasicProxyPreparer());
5063
5064 this.activationID = (ActivationID)
5065 activationIdPreparer.prepareProxy(activationID);
5066 this.activationSystem = (ActivationSystem)
5067 activationSystemPreparer.prepareProxy(
5068 net.jini.activation.ActivationGroup.getSystem());
5069
5070 this.serverExporter = (Exporter) Config.getNonNullEntry(
5071 config, COMPONENT, "serverExporter", Exporter.class,
5072 new ActivationExporter(
5073 this.activationID,
5074 new BasicJeriExporter(
5075 TcpServerEndpoint.getInstance(0),
5076 new AtomicILFactory(null, null, Registrar.class.getClassLoader()))),
5077 this.activationID);
5078 } else {
5079 this.activationID = null;
5080 activationSystem = null;
5081
5082 serverExporter = (Exporter) Config.getNonNullEntry(
5083 config, COMPONENT, "serverExporter", Exporter.class,
5084 new BasicJeriExporter(
5085 TcpServerEndpoint.getInstance(0),
5086 new AtomicILFactory(null, null, Registrar.class.getClassLoader())));
5087 }
5088
5089
5090 Entry[] initialLookupAttributes = (Entry[]) config.getEntry(
5091 COMPONENT, "initialLookupAttributes", Entry[].class,
5092 new Entry[0]);
5093 this.lookupGroups = (String[]) config.getEntry(
5094 COMPONENT, "initialLookupGroups", String[].class,
5095 DiscoveryGroupManagement.NO_GROUPS);
5096 this.lookupLocators = (LookupLocator[]) config.getEntry(
5097 COMPONENT, "initialLookupLocators", LookupLocator[].class,
5098 lookupLocators);
5099 this.memberGroups = (String[]) config.getEntry(
5100 COMPONENT, "initialMemberGroups", String[].class,
5101 memberGroups);
5102 if (memberGroups == null) {
5103 throw new ConfigurationException(
5104 "member groups cannot be ALL_GROUPS (null)");
5105 }
5106 memberGroups = (String[]) removeDups(memberGroups);
5107 this.unicastPort = Config.getIntEntry(
5108 config, COMPONENT, "initialUnicastDiscoveryPort",
5109 0, 0, 0xFFFF);
5110
5111 this.httpsUnicastPort = Config.getIntEntry(
5112 config, COMPONENT, "httpsUnicastDiscoveryPort",
5113 443, 0, 0xFFFF);
5114 this.enableHttpsUnicast = config.getEntry(COMPONENT,
5115 "enableHttpsUnicast" ,Boolean.class , Boolean.FALSE);
5116 if (initialLookupAttributes != null &&
5117 initialLookupAttributes.length > 0)
5118 {
5119 List l = new ArrayList(Arrays.asList(baseAttrs));
5120 l.addAll(Arrays.asList(initialLookupAttributes));
5121 this.lookupAttrs = (Entry[]) l.toArray(new Entry[l.size()]);
5122 } else {
5123 lookupAttrs = baseAttrs;
5124 }
5125
5126
5127 MethodConstraints discoveryConstraints =
5128 (MethodConstraints) config.getEntry(COMPONENT,
5129 "discoveryConstraints",
5130 MethodConstraints.class, null);
5131 if (discoveryConstraints == null) {
5132 discoveryConstraints =
5133 new BasicMethodConstraints(InvocationConstraints.EMPTY);
5134 }
5135 try {
5136 this.discoer = (DiscoveryManagement) config.getEntry(
5137 COMPONENT, "discoveryManager", DiscoveryManagement.class);
5138 } catch (NoSuchEntryException e) {
5139 discoer = new LookupDiscoveryManager(
5140 DiscoveryGroupManagement.NO_GROUPS, null, null, config);
5141 }
5142 this.listenerPreparer = (ProxyPreparer) Config.getNonNullEntry(
5143 config, COMPONENT, "listenerPreparer", ProxyPreparer.class, p);
5144 this.locatorPreparer = (ProxyPreparer) Config.getNonNullEntry(
5145 config, COMPONENT, "locatorPreparer", ProxyPreparer.class, p);
5146 this.minMaxEventLease = Config.getLongEntry(
5147 config, COMPONENT, "minMaxEventLease",
5148 1000 * 60 * 30 , 1, MAX_LEASE);
5149 this.minMaxServiceLease = Config.getLongEntry(
5150 config, COMPONENT, "minMaxServiceLease",
5151 1000 * 60 * 5 , 1, MAX_LEASE);
5152 this.minRenewalInterval = Config.getLongEntry(
5153 config, COMPONENT, "minRenewalInterval",
5154 100, 0, MAX_RENEW);
5155 this.multicastAnnouncementInterval = Config.getLongEntry(
5156 config, COMPONENT, "multicastAnnouncementInterval",
5157 1000 * 60 * 2 , 1, Long.MAX_VALUE);
5158
5159 this.multicastInterfaceRetryInterval = Config.getIntEntry(
5160 config, COMPONENT, "multicastInterfaceRetryInterval",
5161 1000 * 60 * 5 , 1, Integer.MAX_VALUE);
5162 try {
5163 this.multicastInterfaces = (NetworkInterface[]) config.getEntry(
5164 COMPONENT, "multicastInterfaces", NetworkInterface[].class);
5165 this.multicastInterfacesSpecified = true;
5166 } catch (NoSuchEntryException e) {
5167 Enumeration en = NetworkInterface.getNetworkInterfaces();
5168 List l = (en != null) ?
5169 Collections.list(en) : Collections.EMPTY_LIST;
5170 multicastInterfaces = (NetworkInterface[])
5171 l.toArray(new NetworkInterface[l.size()]);
5172 multicastInterfacesSpecified = false;
5173 }
5174 if (multicastInterfaces == null) {
5175 DISCOVERY_LOGGER.config("using system default interface for multicast");
5176 } else if (multicastInterfaces.length == 0) {
5177 if (multicastInterfacesSpecified) {
5178 DISCOVERY_LOGGER.config("multicast disabled");
5179 } else {
5180 DISCOVERY_LOGGER.severe("no network interfaces detected");
5181 }
5182 } else if (DISCOVERY_LOGGER.isLoggable(Level.CONFIG)) {
5183 DISCOVERY_LOGGER.log(Level.CONFIG, "multicasting on interfaces {0}",
5184 new Object[]{ Arrays.asList(multicastInterfaces) });
5185 }
5186
5187 try {
5188 this.multicastRequestSubjectChecker =
5189 (ClientSubjectChecker) Config.getNonNullEntry(
5190 config, COMPONENT, "multicastRequestSubjectChecker",
5191 ClientSubjectChecker.class);
5192 } catch (NoSuchEntryException e) {
5193
5194 }
5195 UuidGenerator u = new UuidGenerator();
5196 this.resourceIdGenerator = (UuidGenerator) Config.getNonNullEntry(
5197 config, COMPONENT, "resourceIdGenerator", UuidGenerator.class,
5198 u);
5199 this.serviceIdGenerator = (UuidGenerator) Config.getNonNullEntry(
5200 config, COMPONENT, "serviceIdGenerator", UuidGenerator.class,
5201 u);
5202
5203 double blocking_coefficient = 0.7;
5204 int numberOfCores = Runtime.getRuntime().availableProcessors();
5205 int poolSizeLimit = (int) (numberOfCores / ( 1 - blocking_coefficient));
5206 this.scheduledExecutor = Config.getNonNullEntry(
5207 config,
5208 COMPONENT,
5209 "eventNotifierExecutor",
5210 ScheduledExecutorService.class,
5211 new ScheduledThreadPoolExecutor(
5212 poolSizeLimit,
5213 new NamedThreadFactory("Reggie_Event_Notifier", false)
5214 )
5215 );
5216
5217 this.executor = Config.getNonNullEntry(
5218 config,
5219 COMPONENT,
5220 "discoveryResponseExecutor",
5221 ExecutorService.class,
5222 new ThreadPoolExecutor(
5223 poolSizeLimit,
5224 poolSizeLimit,
5225 15L,
5226 TimeUnit.MINUTES,
5227 new LinkedBlockingQueue(),
5228 new NamedThreadFactory("Reggie_Discovery_Response", false)
5229 )
5230 );
5231 this.codebase = Config.getNonNullEntry(config, COMPONENT,
5232 "Codebase_Annotation", String.class, "");
5233 this.certFactoryType = Config.getNonNullEntry(config, COMPONENT,
5234 "Codebase_CertFactoryType", String.class, "X.509");
5235 this.certPathEncoding = Config.getNonNullEntry(config, COMPONENT,
5236 "Codebase_CertPathEncoding", String.class, "PkiPath");
5237 this.encodedCerts = Config.getNonNullEntry(config, COMPONENT,
5238 "Codebase_Certs", byte[].class, new byte[0]);
5239 this.unexportTimeout = Config.getLongEntry(
5240 config, COMPONENT, "unexportTimeout", 20000L,
5241 0, Long.MAX_VALUE);
5242 this.unexportWait = Config.getLongEntry(
5243 config, COMPONENT, "unexportWait", 10000L,
5244 0, Long.MAX_VALUE);
5245 try {
5246 unicastDiscoveryHost = (String) Config.getNonNullEntry(
5247 config, COMPONENT, "unicastDiscoveryHost", String.class);
5248 } catch (NoSuchEntryException e) {
5249
5250 unicastDiscoveryHost =
5251 LocalHostLookup.getLocalHost().getCanonicalHostName();
5252 }
5253 try {
5254 this.unicastDiscoverySubjectChecker =
5255 (ClientSubjectChecker) Config.getNonNullEntry(
5256 config, COMPONENT, "unicastDiscoverySubjectChecker",
5257 ClientSubjectChecker.class);
5258 } catch (NoSuchEntryException e) {
5259
5260 }
5261
5262
5263 this.objectServiceType = new ServiceType(Object.class, null, null);
5264 this.protocol2 = Discovery.getProtocol2(null);
5265
5266
5267 this.rawUnicastDiscoveryConstraints = discoveryConstraints.getConstraints(
5268 DiscoveryConstraints.unicastDiscoveryMethod);
5269 this.multicastRequestConstraints = DiscoveryConstraints.process(
5270 discoveryConstraints.getConstraints(
5271 DiscoveryConstraints.multicastRequestMethod));
5272 this.multicastAnnouncementConstraints = DiscoveryConstraints.process(
5273 discoveryConstraints.getConstraints(
5274 DiscoveryConstraints.multicastAnnouncementMethod));
5275 this.unicastDiscoveryConstraints = DiscoveryConstraints.process(
5276 this.rawUnicastDiscoveryConstraints);
5277 }
5278 }
5279
5280 public void start() throws Exception {
5281 if (constructionException != null) throw constructionException;
5282 concurrentObj.writeLock();
5283 try {
5284 if (log != null) {
5285 inRecovery = true;
5286 log.recover();
5287 inRecovery = false;
5288 }
5289 if (enableHttpsUnicast){
5290 httpsDiscovery = Discovery.getUnicastHttps(null);
5291 }
5292
5293 if (myServiceID == null) {
5294 myServiceID = newServiceID();
5295 }
5296 computeMaxLeases();
5297
5298 AccessController.doPrivileged(new PrivilegedExceptionAction<Object>(){
5299
5300 public Object run() throws Exception {
5301 myRef = (Registrar) serverExporter.export(RegistrarImpl.this);
5302 proxy = RegistrarProxy.getInstance(myRef, myServiceID);
5303 String uri;
5304 if (enableHttpsUnicast){
5305 uri = "https://" + unicastDiscoveryHost + ":"+ httpsUnicastPort;
5306 myLocator = (proxy instanceof RemoteMethodControl) ?
5307 new ConstrainableLookupLocator(uri, null) :
5308 new LookupLocator(uri);
5309 } else {
5310 myLocator = (proxy instanceof RemoteMethodControl) ?
5311 new ConstrainableLookupLocator(
5312 unicastDiscoveryHost, unicast.port, null) :
5313 new LookupLocator(unicastDiscoveryHost, unicast.port);
5314 }
5315
5316 Item item = new Item(new ServiceItem(myServiceID,
5317 proxy,
5318 lookupAttrs));
5319 SvcReg reg = new SvcReg(item, myLeaseID, Long.MAX_VALUE);
5320 addService(reg);
5321 if (log != null) {
5322 log.snapshot();
5323 }
5324
5325 try {
5326 DiscoveryGroupManagement dgm = (DiscoveryGroupManagement) discoer;
5327 String[] groups = dgm.getGroups();
5328 if (groups == null || groups.length > 0) {
5329 throw new ConfigurationException(
5330 "discoveryManager must be initially configured with " +
5331 "no groups");
5332 }
5333 DiscoveryLocatorManagement dlm =
5334 (DiscoveryLocatorManagement) discoer;
5335 if (dlm.getLocators().length > 0) {
5336 throw new ConfigurationException(
5337 "discoveryManager must be initially configured with " +
5338 "no locators");
5339 }
5340 dgm.setGroups(lookupGroups);
5341 dlm.setLocators(lookupLocators);
5342 } catch (ClassCastException e) {
5343 throw new ConfigurationException(null, e);
5344 }
5345 joiner = new JoinManager(proxy, lookupAttrs, myServiceID,
5346 discoer, null, config);
5347
5348
5349 serviceExpirer.start();
5350 eventExpirer.start();
5351 unicaster.start();
5352 multicaster.start();
5353 announcer.start();
5354 eventNotifierExec.start();
5355
5356
5357
5358
5359
5360
5361
5362 Runtime.getRuntime().addShutdownHook(new Thread( new Runnable() {
5363 public void run() {
5364 try {
5365 announcer.interrupt();
5366 announcer.join();
5367 } catch (Throwable t) {
5368 logThrow(LOGGER, Level.FINEST, getClass().getName(),
5369 "run", "exception shutting announcer down",
5370 new Object[]{}, t);
5371 }
5372 }
5373 }));
5374
5375 snapshotter.start();
5376 if (LOGGER.isLoggable(Level.INFO)) {
5377 LOGGER.log(Level.INFO, "started Reggie: {0}, {1}, {2}",
5378 new Object[]{ myServiceID,
5379 Arrays.asList(memberGroups),
5380 myLocator });
5381 }
5382 return null;
5383 }
5384
5385 }, context);
5386 } catch (PrivilegedActionException ex) {
5387 throw ex.getException();
5388 } finally {
5389
5390 config = null;
5391 unicastDiscoveryHost = null;
5392 concurrentObj.writeUnlock();
5393 }
5394
5395 }
5396
5397
5398 private ServiceRegistration registerDo(Item nitem, long leaseDuration)
5399 {
5400 if (nitem.service == null)
5401 throw new NullPointerException("null service");
5402 if (myServiceID.equals(nitem.getServiceID()))
5403 throw new IllegalArgumentException("reserved service id");
5404 if (nitem.getAttributeSets() == null)
5405 nitem.setAttributeSets(emptyAttrs);
5406 else
5407 nitem.setAttributeSets((EntryRep[]) removeDups(nitem.getAttributeSets()));
5408 leaseDuration = limitDuration(leaseDuration, maxServiceLease);
5409 long now = System.currentTimeMillis();
5410 if (nitem.getServiceID() == null) {
5411
5412 Map<ServiceID,SvcReg> svcs = serviceByTypeName.get(nitem.serviceType.getName());
5413 if (svcs != null) {
5414 for (Iterator<SvcReg> it = svcs.values().iterator(); it.hasNext(); ) {
5415 SvcReg reg = it.next();
5416 if (nitem.service.equals(reg.item.service)) {
5417 nitem.setServiceID(reg.item.getServiceID());
5418 deleteService(reg, now);
5419 break;
5420 }
5421 }
5422 }
5423 if (nitem.getServiceID() == null)
5424 nitem.setServiceID(newServiceID());
5425 } else {
5426
5427 SvcReg reg = serviceByID.get(nitem.getServiceID());
5428 if (reg != null)
5429 deleteService(reg, now);
5430 }
5431 Util.checkRegistrantServiceID(nitem.getServiceID(), LOGGER, Level.FINE);
5432 SvcReg reg = new SvcReg(nitem, newLeaseID(), now + leaseDuration);
5433 addService(reg);
5434 generateEvents(null, nitem, now);
5435 addLogRecord(new SvcRegisteredLogObj(reg));
5436
5437 if (reg.leaseExpiration < minSvcExpiration) {
5438 minSvcExpiration = reg.leaseExpiration;
5439 serviceNotifier.signal();
5440 }
5441 return Registration.getInstance(
5442 myRef,
5443 ServiceLease.getInstance(
5444 myRef,
5445 myServiceID,
5446 nitem.getServiceID(),
5447 reg.leaseID,
5448 reg.leaseExpiration));
5449 }
5450
5451
5452
5453
5454
5455
5456
5457
5458 private MarshalledWrapper lookupDo(Template tmpl)
5459 {
5460 if (tmpl.serviceTypesLength() == 0 || tmpl.serviceID() != null)
5461 {
5462 ItemIter iter = matchingItems(tmpl);
5463 if (iter.hasNext())
5464 return iter.next().service;
5465 return null;
5466
5467 }
5468 List<SvcReg> services = matchingServices(tmpl.serviceTypes());
5469 long now = System.currentTimeMillis();
5470 int slen = services.size();
5471 if (slen == 0)
5472 return null;
5473 int srand = random.nextInt(Integer.MAX_VALUE) % slen;
5474 for (int i = 0; i < slen; i++) {
5475 SvcReg reg = services.get((i + srand) % slen);
5476 if (reg.leaseExpiration > now && matchAttributes(tmpl, reg.item))
5477 return reg.item.service;
5478 }
5479 return null;
5480 }
5481
5482
5483
5484
5485
5486
5487
5488
5489
5490
5491 private Matches lookupDo(Template tmpl, int maxMatches)
5492 {
5493 if (maxMatches < 0)
5494 throw new IllegalArgumentException("negative maxMatches");
5495 int totalMatches = 0;
5496 List matches = null;
5497 ItemIter iter = matchingItems(tmpl);
5498 if (maxMatches > 0 || iter.dupsPossible)
5499 matches = new LinkedList();
5500 if (iter.dupsPossible) {
5501 while (iter.hasNext()) {
5502 Item item = iter.next();
5503 if (!matches.contains(item))
5504 matches.add(item);
5505 }
5506 totalMatches = matches.size();
5507 if (maxMatches > 0) {
5508 for (int i = matches.size(); --i >= maxMatches; )
5509 matches.remove(i);
5510 for (int i = matches.size(); --i >= 0; ) {
5511 matches.set(i, copyItem((Item)matches.get(i)));
5512 }
5513 } else {
5514 matches = null;
5515 }
5516 } else {
5517 while (iter.hasNext()) {
5518 Item item = iter.next();
5519 totalMatches++;
5520 if (--maxMatches >= 0)
5521 matches.add(copyItem(item));
5522 }
5523 }
5524 return new Matches(matches, totalMatches);
5525 }
5526
5527
5528
5529
5530
5531
5532
5533 private EventRegistration notifyDo(Template tmpl,
5534 int transitions,
5535 RemoteEventListener listener,
5536 Object handback,
5537 long leaseDuration,
5538 boolean newNotify)
5539 throws RemoteException
5540 {
5541 if (transitions == 0 ||
5542 transitions !=
5543 (transitions & (ServiceRegistrar.TRANSITION_MATCH_NOMATCH |
5544 ServiceRegistrar.TRANSITION_NOMATCH_MATCH |
5545 ServiceRegistrar.TRANSITION_MATCH_MATCH)))
5546 throw new IllegalArgumentException("invalid transitions");
5547 if (listener == null)
5548 throw new NullPointerException("listener");
5549 listener =
5550 (RemoteEventListener) listenerPreparer.prepareProxy(listener);
5551 leaseDuration = limitDuration(leaseDuration, maxEventLease);
5552 long now = System.currentTimeMillis();
5553 EventReg reg = new EventReg(eventID, newLeaseID(), tmpl, transitions,
5554 listener, handback, now + leaseDuration, newNotify);
5555 eventID++;
5556 addEvent(reg);
5557 addLogRecord(new EventRegisteredLogObj(reg));
5558
5559 if (reg.getLeaseExpiration() < minEventExpiration) {
5560 minEventExpiration = reg.getLeaseExpiration();
5561 eventNotifier.signal();
5562 }
5563 return new EventRegistration(
5564 reg.eventID,
5565 proxy,
5566 EventLease.getInstance(
5567 myRef,
5568 myServiceID,
5569 reg.eventID,
5570 reg.leaseID, reg.getLeaseExpiration()),
5571 reg.getSeqNo());
5572 }
5573
5574
5575
5576
5577
5578
5579
5580 private EntryClassBase[] getEntryClassesDo(Template tmpl)
5581 {
5582 List<EntryClass> classes = new LinkedList<EntryClass>();
5583 List<String> codebases = new LinkedList<String>();
5584 if (tmpl.serviceID() == null &&
5585 tmpl.serviceTypesLength()==0 &&
5586 tmpl.attributeSetTemplatesLength()==0) {
5587 long now = System.currentTimeMillis();
5588 for (int i = entryClasses.size(); --i >= 0; ) {
5589 EntryClass eclass = entryClasses.get(i);
5590 try {
5591 codebases.add(pickCodebase(eclass, now));
5592 classes.add(eclass);
5593 } catch (ClassNotFoundException e) {
5594 }
5595 }
5596 } else {
5597 EntryRep[] tmplAttrSetTmpl = tmpl.attributeSetTemplates();
5598 for (ItemIter iter = matchingItems(tmpl); iter.hasNext(); ) {
5599 Item item = iter.next();
5600 EntryRep[] attrSetTmpl = item.getAttributeSets();
5601 for (int i = attrSetTmpl.length; --i >= 0; ) {
5602 EntryRep attrSet = attrSetTmpl[i];
5603 if (attrMatch(tmplAttrSetTmpl, attrSet) &&
5604 !classes.contains(attrSet.eclass)) {
5605 classes.add(attrSet.eclass);
5606 codebases.add(attrSet.codebase);
5607 }
5608 }
5609 }
5610 }
5611 if (classes.isEmpty())
5612 return null;
5613 EntryClassBase[] vals = new EntryClassBase[classes.size()];
5614 for (int i = vals.length; --i >= 0; ) {
5615 vals[i] = new EntryClassBase(
5616 classes.get(i).getReplacement(),
5617 codebases.get(i));
5618 }
5619 return vals;
5620 }
5621
5622
5623
5624
5625
5626
5627
5628
5629 private Object[] getFieldValuesDo(Template tmpl, int setidx, int fldidx)
5630 {
5631 List values = new LinkedList();
5632 EntryRep etmpl = tmpl.attributeSetTemplateAtIndex(setidx);
5633 boolean allNull = false;
5634 if (tmpl.serviceID() == null &&
5635 tmpl.serviceTypesLength() == 0 &&
5636 tmpl.attributeSetTemplatesLength() == 1)
5637 {
5638 allNull = allNull(etmpl.fields());
5639 }
5640 if (allNull) {
5641 long now = System.currentTimeMillis();
5642 EntryClass eclass = getDefiningClass(etmpl.eclass, fldidx);
5643 boolean checkAttr = !eclass.equals(etmpl.eclass);
5644 Map<Object,List<SvcReg>>[] attrMaps = serviceByAttr.get(eclass);
5645 if (attrMaps != null && attrMaps[fldidx] != null) {
5646 for (Iterator<Map.Entry<Object,List<SvcReg>>> iter = attrMaps[fldidx].entrySet().iterator();
5647 iter.hasNext(); )
5648 {
5649 Map.Entry<Object,List<SvcReg>> ent = iter.next();
5650 List<SvcReg> regs = ent.getValue();
5651 Object value = ent.getKey();
5652 for (int i = regs.size(); --i >= 0; ) {
5653 SvcReg reg = regs.get(i);
5654 if (reg.leaseExpiration > now &&
5655 (!checkAttr ||
5656 hasAttr(reg, etmpl.eclass, fldidx, value))) {
5657 values.add(value);
5658 break;
5659 }
5660 }
5661 }
5662 }
5663 } else {
5664 for (ItemIter iter = matchingItems(tmpl); iter.hasNext(); ) {
5665 Item item = iter.next();
5666 EntryRep [] attributeSets = item.getAttributeSets();
5667 for (int j = attributeSets.length; --j >= 0; ) {
5668 if (matchEntry(etmpl, attributeSets[j])) {
5669 Object value = attributeSets[j].fields().get(fldidx);
5670 if (!values.contains(value)) values.add(value);
5671 }
5672 }
5673 }
5674 }
5675 if (values.isEmpty())
5676 return null;
5677 return values.toArray();
5678 }
5679
5680
5681
5682
5683
5684
5685
5686
5687 private ServiceTypeBase[] getServiceTypesDo(Template tmpl, String prefix)
5688 {
5689 List<ServiceType> classes = new LinkedList<ServiceType>();
5690 List<String> codebases = new LinkedList<String>();
5691 if (tmpl.serviceID() == null && tmpl.attributeSetTemplatesLength() == 0) {
5692 List services = matchingServices(tmpl.serviceTypes());
5693 for (Iterator it = services.iterator(); it.hasNext(); ) {
5694 Item item = ((SvcReg)it.next()).item;
5695 addTypes(classes, codebases, tmpl.serviceTypes(), prefix,
5696 item.serviceType, item.codebase);
5697 }
5698 } else {
5699 for (ItemIter iter = matchingItems(tmpl); iter.hasNext(); ) {
5700 Item item = iter.next();
5701 addTypes(classes, codebases, tmpl.serviceTypes(), prefix,
5702 item.serviceType, item.codebase);
5703 }
5704 }
5705 if (classes.isEmpty())
5706 return null;
5707 ServiceTypeBase[] vals = new ServiceTypeBase[classes.size()];
5708 for (int i = vals.length; --i >= 0; ) {
5709 vals[i] = new ServiceTypeBase(
5710 classes.get(i).getReplacement(),
5711 codebases.get(i));
5712 }
5713 return vals;
5714 }
5715
5716
5717
5718
5719
5720
5721
5722 private void addAttributesDo(ServiceID serviceID,
5723 Uuid leaseID,
5724 EntryRep[] attrSets)
5725 throws UnknownLeaseException
5726 {
5727 long now = System.currentTimeMillis();
5728 SvcReg reg = checkLease(serviceID, leaseID, now);
5729 Item pre = (Item)reg.item.clone();
5730 EntryRep[] sets = reg.item.getAttributeSets();
5731 int i = 0;
5732
5733 for (int j = 0; j < attrSets.length; j++) {
5734 EntryRep set = attrSets[j];
5735 if (indexOf(sets, set) < 0 && indexOf(attrSets, j, set) < 0) {
5736 attrSets[i++] = set;
5737 addAttrs(reg, set);
5738 }
5739 }
5740 if (i > 0) {
5741 int len = sets.length;
5742 EntryRep[] nsets = new EntryRep[len + i];
5743 System.arraycopy(sets, 0, nsets, 0, len);
5744 System.arraycopy(attrSets, 0, nsets, len, i);
5745 reg.item.setAttributeSets(nsets);
5746 }
5747 generateEvents(pre, reg.item, now);
5748 }
5749
5750
5751
5752
5753
5754
5755
5756
5757 private void modifyAttributesDo(ServiceID serviceID,
5758 Uuid leaseID,
5759 EntryRep[] attrSetTmpls,
5760 EntryRep[] attrSets)
5761 throws UnknownLeaseException
5762 {
5763 if (attrSetTmpls.length != attrSets.length)
5764 throw new IllegalArgumentException(
5765 "attribute set length mismatch");
5766 for (int i = attrSets.length; --i >= 0; ) {
5767 if (attrSets[i] != null &&
5768 !attrSets[i].eclass.isAssignableFrom(attrSetTmpls[i].eclass))
5769 throw new IllegalArgumentException(
5770 "attribute set type mismatch");
5771 }
5772 long now = System.currentTimeMillis();
5773 SvcReg reg = checkLease(serviceID, leaseID, now);
5774 Item pre = (Item)reg.item.clone();
5775 EntryRep[] preSets = pre.getAttributeSets();
5776 EntryRep[] sets = reg.item.getAttributeSets();
5777 for (int i = preSets.length; --i >= 0; ) {
5778 EntryRep preSet = preSets[i];
5779 EntryRep set = sets[i];
5780 for (int j = attrSetTmpls.length; --j >= 0; ) {
5781 if (matchEntry(attrSetTmpls[j], preSet)) {
5782 EntryRep attrs = attrSets[j];
5783 if (attrs == null) {
5784 sets = deleteSet(reg.item, i);
5785 deleteAttrs(reg, set, true);
5786 break;
5787 } else {
5788 updateAttrs(reg, set, attrs.fields());
5789 }
5790 }
5791 }
5792 }
5793 for (int i = sets.length; --i >= 0; ) {
5794 EntryRep set = sets[i];
5795 if (indexOf(sets, i, set) >= 0) {
5796 sets = deleteSet(reg.item, i);
5797 deleteInstance(set.eclass);
5798 }
5799 }
5800 reg.item.setAttributeSets(sets);
5801 generateEvents(pre, reg.item, now);
5802 }
5803
5804
5805
5806
5807
5808
5809
5810 private void setAttributesDo(ServiceID serviceID,
5811 Uuid leaseID,
5812 EntryRep[] attrSets)
5813 throws UnknownLeaseException
5814 {
5815 if (attrSets == null)
5816 attrSets = emptyAttrs;
5817 else
5818 attrSets = (EntryRep[])removeDups(attrSets);
5819 long now = System.currentTimeMillis();
5820 SvcReg reg = checkLease(serviceID, leaseID, now);
5821 Item pre = (Item)reg.item.clone();
5822 EntryRep[] entries = reg.item.getAttributeSets();
5823 for (int i = entries.length; --i >= 0; ) {
5824 deleteAttrs(reg, entries[i], false);
5825 }
5826 reg.item.setAttributeSets(attrSets);
5827 for (int i = attrSets.length; --i >= 0; ) {
5828 addAttrs(reg, attrSets[i]);
5829 }
5830 generateEvents(pre, reg.item, now);
5831 }
5832
5833
5834 private void cancelServiceLeaseDo(ServiceID serviceID, Uuid leaseID)
5835 throws UnknownLeaseException
5836 {
5837 if (serviceID.equals(myServiceID))
5838 throw new SecurityException("privileged service id");
5839 long now = System.currentTimeMillis();
5840 SvcReg reg = checkLease(serviceID, leaseID, now);
5841 deleteService(reg, now);
5842
5843 if (reg.leaseExpiration == minSvcExpiration)
5844 serviceNotifier.signal();
5845 }
5846
5847
5848 private long renewServiceLeaseDo(ServiceID serviceID,
5849 Uuid leaseID,
5850 long renewDuration)
5851 throws UnknownLeaseException
5852 {
5853 long now = System.currentTimeMillis();
5854 long renewExpiration = renewServiceLeaseInt(serviceID, leaseID,
5855 renewDuration, now);
5856 addLogRecord(new ServiceLeaseRenewedLogObj(serviceID, leaseID,
5857 renewExpiration));
5858 return renewExpiration - now;
5859 }
5860
5861 private SvcReg checkLease(ServiceID serviceID, Uuid leaseID, long now)
5862 throws UnknownLeaseException
5863 {
5864
5865 SvcReg reg = serviceByID.get(serviceID);
5866 if (reg == null) throw new UnknownLeaseException("No service recorded for ID: " + serviceID);
5867 if (!reg.leaseID.equals(leaseID)) throw new UnknownLeaseException("Incorrect lease ID: " + leaseID + " not equal to reg lease ID: " + reg.leaseID);
5868 if (reg.leaseExpiration <= now) throw new UnknownLeaseException("Lease expired");
5869 return reg;
5870 }
5871
5872 private EventReg checkEvent(Uuid leaseID, long eventID, long now)
5873 throws UnknownLeaseException
5874 {
5875 EventReg reg = eventByID.get(Long.valueOf(eventID));
5876 if (reg == null) throw new UnknownLeaseException("No event recorded for ID: " + eventID);
5877 if (!reg.leaseID.equals(leaseID)) throw new UnknownLeaseException("Incorrect lease ID: " + eventID + " not equal to reg lease ID: " + reg.leaseID);
5878 if (reg.getLeaseExpiration() <= now) throw new UnknownLeaseException("Lease expired");
5879 return reg;
5880 }
5881
5882
5883 private long renewServiceLeaseInt(ServiceID serviceID,
5884 Uuid leaseID,
5885 long renewDuration,
5886 long now)
5887 throws UnknownLeaseException
5888 {
5889 if (serviceID.equals(myServiceID))
5890 throw new SecurityException("privileged service id");
5891 if (renewDuration == Lease.ANY)
5892 renewDuration = maxServiceLease;
5893 else if (renewDuration < 0)
5894 throw new IllegalArgumentException("negative lease duration");
5895 SvcReg reg = checkLease(serviceID, leaseID, now);
5896 if (renewDuration > maxServiceLease &&
5897 renewDuration > reg.leaseExpiration - now)
5898 renewDuration = Math.max(reg.leaseExpiration - now,
5899 maxServiceLease);
5900 long renewExpiration = now + renewDuration;
5901
5902 serviceByTime.remove(reg);
5903 reg.leaseExpiration = renewExpiration;
5904 serviceByTime.add(reg);
5905
5906 if (renewExpiration < minSvcExpiration) {
5907 minSvcExpiration = renewExpiration;
5908 serviceNotifier.signal();
5909 }
5910 return renewExpiration;
5911 }
5912
5913
5914 private void renewServiceLeaseAbs(ServiceID serviceID,
5915 Uuid leaseID,
5916 long renewExpiration)
5917 {
5918 concurrentObj.writeLock();
5919 try {
5920 SvcReg reg = serviceByID.get(serviceID);
5921 if (reg == null || !reg.leaseID.equals(leaseID))
5922 return;
5923
5924 serviceByTime.remove(reg);
5925 reg.leaseExpiration = renewExpiration;
5926 serviceByTime.add(reg);
5927 } finally {
5928 concurrentObj.writeUnlock();
5929 }
5930 }
5931
5932
5933 private void cancelEventLeaseDo(long eventID, Uuid leaseID)
5934 throws UnknownLeaseException
5935 {
5936 long now = System.currentTimeMillis();
5937 EventReg reg = checkEvent(leaseID, eventID, now);
5938 deleteEvent(reg);
5939
5940 if (reg.getLeaseExpiration() == minEventExpiration)
5941 eventNotifier.signal();
5942 }
5943
5944
5945 private long renewEventLeaseDo(long eventID,
5946 Uuid leaseID,
5947 long renewDuration)
5948 throws UnknownLeaseException
5949 {
5950 long now = System.currentTimeMillis();
5951 long renewExpiration = renewEventLeaseInt(eventID, leaseID,
5952 renewDuration, now);
5953 addLogRecord(new EventLeaseRenewedLogObj(eventID, leaseID,
5954 renewExpiration));
5955 return renewExpiration - now;
5956 }
5957
5958 private long renewEventLeaseInt(long eventID,
5959 Uuid leaseID,
5960 long renewDuration,
5961 long now)
5962 throws UnknownLeaseException
5963 {
5964 if (renewDuration == Lease.ANY)
5965 renewDuration = maxEventLease;
5966 else if (renewDuration < 0)
5967 throw new IllegalArgumentException("negative lease duration");
5968 EventReg reg = checkEvent(leaseID, eventID, now);
5969 if (renewDuration > maxEventLease &&
5970 renewDuration > reg.getLeaseExpiration() - now)
5971 renewDuration = Math.max(reg.getLeaseExpiration() - now, maxEventLease);
5972 long renewExpiration = now + renewDuration;
5973
5974 eventByTime.remove(reg);
5975 reg.setLeaseExpiration(renewExpiration);
5976 eventByTime.offer(reg);
5977
5978 if (renewExpiration < minEventExpiration) {
5979 minEventExpiration = renewExpiration;
5980 eventNotifier.signal();
5981 }
5982 return renewExpiration;
5983 }
5984
5985
5986 private void renewEventLeaseAbs(long eventID,
5987 Uuid leaseID,
5988 long renewExpiration)
5989 {
5990 concurrentObj.writeLock();
5991 try {
5992 EventReg reg = eventByID.get(Long.valueOf(eventID));
5993 if (reg == null || !reg.leaseID.equals(leaseID))
5994 return;
5995
5996 eventByTime.remove(reg);
5997 reg.setLeaseExpiration(renewExpiration);
5998 eventByTime.offer(reg);
5999 } finally {
6000 concurrentObj.writeUnlock();
6001 }
6002 }
6003
6004
6005
6006
6007
6008
6009
6010 private RenewResults renewLeasesDo(Object[] regIDs,
6011 Uuid[] leaseIDs,
6012 long[] renewals)
6013 {
6014 long now = System.currentTimeMillis();
6015 Exception[] exceptions = null;
6016 int l = regIDs.length;
6017 for (int i = 0; i < l; i++) {
6018 Object id = regIDs[i];
6019 try {
6020 if (id instanceof ServiceID)
6021 renewals[i] = renewServiceLeaseInt((ServiceID)id,
6022 leaseIDs[i],
6023 renewals[i], now);
6024 else
6025 renewals[i] = renewEventLeaseInt(((Long)id).longValue(),
6026 leaseIDs[i], renewals[i],
6027 now);
6028 } catch (Exception e) {
6029 renewals[i] = -1;
6030 if (exceptions == null)
6031 exceptions = new Exception[]{e};
6032 else
6033 exceptions = (Exception[])arrayAdd(exceptions, e);
6034 }
6035 }
6036
6037 addLogRecord(new LeasesRenewedLogObj(regIDs, leaseIDs, renewals));
6038 for (int i = regIDs.length; --i >= 0; ) {
6039 if (renewals[i] >= 0)
6040 renewals[i] -= now;
6041 }
6042 return new RenewResults(renewals, exceptions);
6043 }
6044
6045
6046
6047
6048
6049 private void renewLeasesAbs(Object[] regIDs,
6050 Uuid[] leaseIDs,
6051 long[] renewExpirations)
6052 {
6053 for (int i = regIDs.length; --i >= 0; ) {
6054 long expiration = renewExpirations[i];
6055 if (expiration < 0)
6056 continue;
6057 Object id = regIDs[i];
6058 if (id instanceof ServiceID)
6059 renewServiceLeaseAbs((ServiceID)id, leaseIDs[i], expiration);
6060 else
6061 renewEventLeaseAbs(((Long)id).longValue(), leaseIDs[i],
6062 expiration);
6063 }
6064 }
6065
6066
6067
6068
6069
6070
6071
6072
6073
6074 private Exception[] cancelLeasesDo(Object[] regIDs, Uuid[] leaseIDs) {
6075 Exception[] exceptions = null;
6076 for (int i = regIDs.length; --i >= 0; ) {
6077 Object id = regIDs[i];
6078 try {
6079 if (id instanceof ServiceID)
6080 cancelServiceLeaseDo((ServiceID)id, leaseIDs[i]);
6081 else
6082 cancelEventLeaseDo(((Long)id).longValue(), leaseIDs[i]);
6083 } catch (Exception e) {
6084 if (exceptions == null)
6085 exceptions = new Exception[regIDs.length];
6086 exceptions[i] = e;
6087 }
6088 }
6089 return exceptions;
6090 }
6091
6092
6093
6094
6095
6096
6097 private void generateEvents(Item pre, Item post, long now) {
6098 if (inRecovery)
6099 return;
6100 ServiceID sid = (pre != null) ? pre.getServiceID() : post.getServiceID();
6101 Object val = subEventByService.get(sid);
6102 if (val instanceof EventReg) {
6103 generateEvent((EventReg)val, pre, post, sid, now);
6104 } else if (val instanceof EventReg[]) {
6105 EventReg[] regs = (EventReg[])val;
6106 for (int i = regs.length; --i >= 0; ) {
6107 generateEvent(regs[i], pre, post, sid, now);
6108 }
6109 }
6110 for (Iterator iter = subEventByID.values().iterator();
6111 iter.hasNext(); )
6112 {
6113 generateEvent((EventReg)iter.next(), pre, post, sid, now);
6114 }
6115 }
6116
6117
6118
6119
6120
6121
6122 private void generateEvent(EventReg reg,
6123 Item pre,
6124 Item post,
6125 ServiceID sid,
6126 long now)
6127 {
6128 if (reg.getLeaseExpiration() <= now)
6129 return;
6130 if ((reg.transitions &
6131 ServiceRegistrar.TRANSITION_NOMATCH_MATCH) != 0 &&
6132 (pre == null || !matchItem(reg.tmpl, pre)) &&
6133 (post != null && matchItem(reg.tmpl, post)))
6134 pendingEvent(reg, sid, post,
6135 ServiceRegistrar.TRANSITION_NOMATCH_MATCH, now);
6136 else if ((reg.transitions &
6137 ServiceRegistrar.TRANSITION_MATCH_NOMATCH) != 0 &&
6138 (pre != null && matchItem(reg.tmpl, pre)) &&
6139 (post == null || !matchItem(reg.tmpl, post)))
6140 pendingEvent(reg, sid, post,
6141 ServiceRegistrar.TRANSITION_MATCH_NOMATCH, now);
6142 else if ((reg.transitions &
6143 ServiceRegistrar.TRANSITION_MATCH_MATCH) != 0 &&
6144 (pre != null && matchItem(reg.tmpl, pre)) &&
6145 (post != null && matchItem(reg.tmpl, post)))
6146 pendingEvent(reg, sid, post,
6147 ServiceRegistrar.TRANSITION_MATCH_MATCH, now);
6148 }
6149
6150
6151 private void pendingEvent(EventReg reg, ServiceID sid, Item item, int transition, long now)
6152 {
6153 if (item != null)
6154 item = copyItem(item);
6155
6156 eventTaskMap.get(reg).submit(
6157 Security.withContext(
6158 new EventTask(reg, sid, item, transition, proxy, this, now),
6159 context
6160 )
6161 );
6162 }
6163
6164
6165 private ServiceID newServiceID() {
6166 Uuid uuid = serviceIdGenerator.generate();
6167 return new ServiceID(
6168 uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
6169 }
6170
6171
6172 private Uuid newLeaseID() {
6173 return resourceIdGenerator.generate();
6174 }
6175
6176
6177
6178
6179
6180
6181
6182
6183
6184
6185
6186
6187
6188
6189
6190
6191
6192
6193
6194
6195
6196
6197
6198
6199
6200
6201
6202
6203 private void takeSnapshot(OutputStream out) throws IOException {
6204 ObjectOutputStream stream = new ObjectOutputStream(out);
6205
6206 stream.writeUTF(getClass().getName());
6207 stream.writeInt(LOG_VERSION);
6208 stream.writeObject(myServiceID);
6209 stream.writeLong(eventID);
6210 stream.writeInt(unicastPort);
6211 stream.writeObject(memberGroups);
6212 stream.writeObject(lookupGroups);
6213 stream.writeLong(announcementSeqNo.get());
6214 marshalAttributes(lookupAttrs, stream);
6215 marshalLocators(lookupLocators, stream);
6216 for (Iterator iter = serviceByID.entrySet().iterator();
6217 iter.hasNext(); )
6218 {
6219 Map.Entry entry = (Map.Entry) iter.next();
6220 if (myServiceID != entry.getKey())
6221 stream.writeObject(entry.getValue());
6222 }
6223 stream.writeObject(null);
6224 for (Iterator iter = eventByID.values().iterator(); iter.hasNext(); )
6225 {
6226 stream.writeObject(iter.next());
6227 }
6228 stream.writeObject(null);
6229 stream.writeInt(httpsUnicastPort);
6230 stream.writeBoolean(enableHttpsUnicast);
6231 stream.flush();
6232 LOGGER.finer("wrote state snapshot");
6233 }
6234
6235
6236
6237
6238
6239
6240
6241
6242
6243
6244
6245
6246
6247
6248
6249
6250
6251
6252
6253
6254
6255
6256
6257
6258
6259
6260
6261
6262
6263
6264
6265
6266
6267
6268 private void recoverSnapshot(InputStream in)
6269 throws IOException, ClassNotFoundException
6270 {
6271 ObjectInputStream stream = new ObjectInputStream(in);
6272 if (!getClass().getName().equals(stream.readUTF()))
6273 throw new IOException("log from wrong implementation");
6274 int logVersion = stream.readInt();
6275 if (logVersion != LOG_VERSION)
6276 throw new IOException("wrong log format version");
6277 myServiceID = (ServiceID)stream.readObject();
6278 eventID = stream.readLong();
6279 unicastPort = stream.readInt();
6280 memberGroups = (String[])stream.readObject();
6281 lookupGroups = (String[])stream.readObject();
6282 announcementSeqNo.set( stream.readLong() + Integer.MAX_VALUE);
6283 lookupAttrs = unmarshalAttributes(stream);
6284 lookupLocators = prepareLocators(
6285 unmarshalLocators(stream), recoveredLocatorPreparer, true);
6286 recoverServiceRegistrations(stream, logVersion);
6287 recoverEventRegistrations(stream);
6288 httpsUnicastPort = stream.readInt();
6289 enableHttpsUnicast = stream.readBoolean();
6290 LOGGER.finer("recovered state from snapshot");
6291 }
6292
6293
6294 private void recoverServiceRegistrations(ObjectInputStream stream,
6295 int logVersion)
6296 throws IOException, ClassNotFoundException
6297 {
6298 SvcReg sReg;
6299 while ((sReg = (SvcReg)stream.readObject()) != null) {
6300 addService(sReg);
6301 }
6302 }
6303
6304
6305 private void recoverEventRegistrations(ObjectInputStream stream)
6306 throws IOException, ClassNotFoundException
6307 {
6308 EventReg eReg;
6309 while ((eReg = (EventReg)stream.readObject()) != null) {
6310 eReg.prepareListener(recoveredListenerPreparer);
6311 addEvent(eReg);
6312 }
6313 }
6314
6315
6316
6317
6318
6319
6320
6321
6322
6323
6324
6325
6326
6327
6328
6329
6330
6331
6332
6333
6334
6335
6336
6337
6338
6339
6340
6341
6342
6343
6344
6345
6346
6347
6348
6349
6350
6351 private void addLogRecord(LogRecord rec) {
6352 if (log == null) {
6353 return;
6354 }
6355 try {
6356 log.update(rec, true);
6357 if (LOGGER.isLoggable(Level.FINER)) {
6358 LOGGER.log(Level.FINER, "wrote log record {0}",
6359 new Object[]{ rec });
6360 }
6361 if (logFileSize.incrementAndGet() >= persistenceSnapshotThreshold) {
6362 int snapshotSize = serviceByID.size() + eventByID.size();
6363 if (logFileSize.get() >= persistenceSnapshotWeight * snapshotSize) {
6364 snapshotNotifier.signal();
6365 }
6366 }
6367 } catch (Exception e) {
6368 if (!Thread.currentThread().isInterrupted()) {
6369 LOGGER.log(Level.WARNING, "log update failed", e);
6370 }
6371 }
6372 }
6373 }