1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package net.jini.lookup;
20
21 import java.io.IOException;
22 import java.rmi.RemoteException;
23 import java.rmi.server.ExportException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.FutureTask;
39 import java.util.concurrent.LinkedBlockingQueue;
40 import java.util.concurrent.PriorityBlockingQueue;
41 import java.util.concurrent.RunnableFuture;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import java.util.concurrent.ThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicLong;
47 import java.util.concurrent.locks.ReentrantReadWriteLock;
48 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
49 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
50 import java.util.function.BiConsumer;
51 import java.util.function.BiFunction;
52 import java.util.logging.Level;
53 import net.jini.config.ConfigurationException;
54 import net.jini.core.entry.Entry;
55 import net.jini.core.event.RemoteEvent;
56 import net.jini.core.event.RemoteEventListener;
57 import net.jini.core.event.UnknownEventException;
58 import net.jini.core.lookup.ServiceEvent;
59 import net.jini.core.lookup.ServiceID;
60 import net.jini.core.lookup.ServiceItem;
61 import net.jini.core.lookup.ServiceMatches;
62 import net.jini.core.lookup.ServiceRegistrar;
63 import net.jini.core.lookup.ServiceTemplate;
64 import net.jini.export.Exporter;
65 import net.jini.lookup.ServiceAttributesAccessor;
66 import net.jini.lookup.ServiceIDAccessor;
67 import net.jini.lookup.ServiceProxyAccessor;
68 import net.jini.io.MarshalledInstance;
69 import net.jini.jeri.AtomicILFactory;
70 import net.jini.jeri.BasicILFactory;
71 import net.jini.jeri.BasicJeriExporter;
72 import net.jini.jeri.tcp.TcpServerEndpoint;
73 import net.jini.security.TrustVerifier;
74 import net.jini.security.proxytrust.ServerProxyTrust;
75 import org.apache.river.concurrent.RC;
76 import org.apache.river.concurrent.Ref;
77 import org.apache.river.concurrent.Referrer;
78 import org.apache.river.lookup.entry.LookupAttributes;
79 import org.apache.river.proxy.BasicProxyTrustVerifier;
80 import org.apache.river.thread.DependencyLinker;
81 import org.apache.river.thread.ExtensibleExecutorService;
82 import org.apache.river.thread.FutureObserver;
83 import org.apache.river.thread.NamedThreadFactory;
84 import org.apache.river.thread.ObservableFutureTask;
85
86
87
88
89
90
91 final class LookupCacheImpl implements LookupCache {
92
93 private static final int ITEM_ADDED = 0;
94 private static final int ITEM_REMOVED = 2;
95 private static final int ITEM_CHANGED = 3;
96
97 private final LookupListener lookupListener;
98
99 private volatile Exporter lookupListenerExporter;
100
101 private volatile RemoteEventListener lookupListenerProxy;
102
103
104
105 private volatile ExecutorService cacheTaskMgr;
106 private volatile CacheTaskDependencyManager cacheTaskDepMgr;
107
108 private volatile ExecutorService incomingEventExecutor;
109
110 private volatile boolean bCacheTerminated = false;
111
112 private final ReadLock sItemListenersRead;
113 private final WriteLock sItemListenersWrite;
114 private final Collection<ServiceDiscoveryListener> sItemListeners;
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 private final ConcurrentMap<ServiceID, ServiceItemReg> serviceIdMap;
137
138
139
140 private final ConcurrentMap<ProxyReg, EventReg> eventRegMap;
141
142 private final ServiceTemplate tmpl;
143
144 private final ServiceItemFilter filter;
145
146 private final long leaseDuration;
147
148
149
150 private final long startTime;
151
152
153
154 private volatile ScheduledExecutorService serviceDiscardTimerTaskMgr;
155 private final ConcurrentMap<ServiceID, Future> serviceDiscardFutures;
156
157
158
159
160
161
162
163 private final AtomicLong taskSeqN;
164 private final ServiceDiscoveryManager sdm;
165 private final boolean useInsecureLookup;
166
167 LookupCacheImpl(ServiceTemplate tmpl, ServiceItemFilter filter,
168 ServiceDiscoveryListener sListener, long leaseDuration,
169 ServiceDiscoveryManager sdm, boolean useInsecureLookup)
170 throws RemoteException
171 {
172 this.useInsecureLookup = useInsecureLookup;
173 this.taskSeqN = new AtomicLong();
174 this.startTime = System.currentTimeMillis();
175 this.eventRegMap = new ConcurrentHashMap<ProxyReg, EventReg>();
176 this.serviceIdMap = new ConcurrentHashMap<ServiceID, ServiceItemReg>();
177 this.sItemListeners = new HashSet<ServiceDiscoveryListener>();
178 this.serviceDiscardFutures = RC.concurrentMap(new ConcurrentHashMap<Referrer<ServiceID>, Referrer<Future>>(), Ref.WEAK_IDENTITY, Ref.STRONG, 60000, 60000);
179 this.tmpl = tmpl.clone();
180 this.leaseDuration = leaseDuration;
181 this.filter = filter;
182 lookupListener = new LookupListener();
183 if (sListener != null) {
184 sItemListeners.add(sListener);
185 }
186 this.sdm = sdm;
187 ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
188 sItemListenersRead = rwl.readLock();
189 sItemListenersWrite = rwl.writeLock();
190 }
191
192 private ExecutorService eventNotificationExecutor;
193
194
195
196
197
198
199
200
201
202 private final class LookupListener implements RemoteEventListener,
203 ServerProxyTrust {
204
205 RemoteEventListener export() throws ExportException {
206 return (RemoteEventListener) lookupListenerExporter.export(this);
207 }
208
209 @Override
210 public void notify(RemoteEvent evt) throws UnknownEventException,
211 java.rmi.RemoteException {
212 if (!(evt instanceof ServiceEvent)) {
213 throw new UnknownEventException("ServiceEvent required,not: " + evt.toString());
214 }
215 notifyServiceMap((ServiceEvent) evt);
216 }
217
218
219
220
221
222 @Override
223 public TrustVerifier getProxyVerifier() {
224 return new BasicProxyTrustVerifier(lookupListenerProxy);
225 }
226 }
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 private static final class RegisterListenerTask extends
252 CacheTask {
253
254 final LookupCacheImpl cache;
255
256 public RegisterListenerTask(ProxyReg reg,
257 long seqN, LookupCacheImpl cache) {
258 super(reg, seqN);
259 this.cache = cache;
260 }
261
262 @Override
263 public boolean hasDeps() {
264 return true;
265 }
266
267 @Override
268 public boolean dependsOn(CacheTask t) {
269 if (t instanceof ProxyRegDropTask) {
270 ProxyReg r = getProxyReg();
271 if (r != null && r.equals(t.getProxyReg())) {
272 if (t.getSeqN() < getSeqN()) {
273 return true;
274 }
275 }
276 }
277 return false;
278 }
279
280 @Override
281 public void run() {
282 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
283 ServiceDiscoveryManager.log(Level.FINER,
284 "ServiceDiscoveryManager - RegisterListenerTask started");
285 }
286 long duration = cache.getLeaseDuration();
287 if (duration < 0) {
288 return;
289 }
290 try {
291 EventReg eventReg
292 = cache.sdm.registerListener(
293 reg.getProxy(),
294 cache.tmpl,
295 cache.lookupListenerProxy,
296 duration
297 );
298
299
300
301
302 if (cache.bCacheTerminated
303 || Thread.currentThread().isInterrupted()) {
304
305 cache.sdm.cancelLease(eventReg.lease);
306 } else {
307 eventReg.suspendEvents();
308 EventReg existed
309 = cache.eventRegMap.putIfAbsent(reg, eventReg);
310 if (existed != null){
311 cache.sdm.cancelLease(eventReg.lease);
312 } else {
313 try {
314
315 cache.lookup(reg);
316 } finally {
317 synchronized (eventReg){
318 eventReg.releaseEvents();
319 eventReg.notify();
320 }
321 }
322 }
323
324 }
325 } catch (Exception e) {
326 cache.sdm.fail(e,
327 reg.getProxy(),
328 this.getClass().getName(),
329 "run",
330 "Exception occurred while attempting to register with the lookup service event mechanism",
331 cache.bCacheTerminated
332 );
333 } finally {
334 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
335 ServiceDiscoveryManager.log(Level.FINER,
336 "ServiceDiscoveryManager - RegisterListenerTask completed");
337 }
338 }
339 }
340 }
341
342
343
344
345
346 private static final class ProxyRegDropTask extends CacheTask {
347
348 final LookupCacheImpl cache;
349 final EventReg eReg;
350
351 public ProxyRegDropTask(ProxyReg reg,
352 EventReg eReg,
353 long seqN,
354 LookupCacheImpl cache) {
355 super(reg, seqN);
356 this.cache = cache;
357 this.eReg = eReg;
358 }
359
360 @Override
361 public void run() {
362 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
363 ServiceDiscoveryManager.log(Level.FINEST,
364 "ServiceDiscoveryManager - ProxyRegDropTask started");
365 }
366
367
368
369 synchronized (eReg){
370 while (eReg.eventsSuspended()) {
371
372
373 try {
374 eReg.wait(200L);
375 } catch (InterruptedException e){
376 Thread.currentThread().interrupt();
377 }
378 }
379
380
381
382
383 if (eReg.discard()) cache.eventRegMap.remove(reg, eReg);
384 }
385
386
387
388
389
390
391 Iterator<Map.Entry<ServiceID, ServiceItemReg>> iter = cache.serviceIdMap.entrySet().iterator();
392 while (iter.hasNext()) {
393 Map.Entry<ServiceID, ServiceItemReg> e = iter.next();
394 ServiceID srvcID = e.getKey();
395 DissociateLusCleanUpOrphan dlcl = new DissociateLusCleanUpOrphan(cache, reg.getProxy());
396 cache.serviceIdMap.computeIfPresent(srvcID, dlcl);
397 if (dlcl.itemRegProxy != null) {
398 cache.itemMatchMatchChange(srvcID, dlcl.itmReg, dlcl.itemRegProxy, dlcl.newItem, false);
399 } else if (dlcl.notify && dlcl.filteredItem != null) {
400 cache.removeServiceNotify(dlcl.filteredItem);
401 }
402 }
403 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
404 ServiceDiscoveryManager.log(Level.FINEST,
405 "ServiceDiscoveryManager - ProxyRegDropTask completed");
406 }
407 }
408
409 @Override
410 public boolean hasDeps() {
411 return true;
412 }
413
414 @Override
415 public boolean dependsOn(CacheTask t) {
416 if (t instanceof RegisterListenerTask || t instanceof ProxyRegDropTask) {
417 ProxyReg r = getProxyReg();
418 if (r != null && r.equals(t.getProxyReg())) {
419 if (t.getSeqN() < getSeqN()) {
420 return true;
421 }
422 }
423 }
424 return false;
425 }
426 }
427
428
429
430
431
432
433
434 private static final class ServiceDiscardTimerTask implements Runnable {
435
436 private final ServiceID serviceID;
437 private final LookupCacheImpl cache;
438
439 public ServiceDiscardTimerTask(LookupCacheImpl cache, ServiceID serviceID) {
440 this.serviceID = serviceID;
441 this.cache = cache;
442 }
443
444 @Override
445 public void run() {
446 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
447 ServiceDiscoveryManager.log(Level.FINEST,
448 "ServiceDiscoveryManager - ServiceDiscardTimerTask started");
449 }
450 try {
451
452 if (cache.bCacheTerminated) {
453 return;
454 }
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475 ServiceItemReg itemReg = cache.serviceIdMap.get(serviceID);
476 if (itemReg != null) {
477
478
479
480
481
482 ServiceItem itemToSend;
483 if (!itemReg.isDiscarded()) return;
484 ServiceItem item = null;
485 ServiceItem filteredItem = null;
486 boolean addFilteredItemToMap = false;
487 boolean remove = false;
488 boolean notify = true;
489 itemToSend = itemReg.getFilteredItem();
490 if (itemToSend == null) {
491 item = itemReg.getItem();
492 filteredItem = item.clone();
493
494 if (cache.useInsecureLookup){
495 if (ServiceDiscoveryManager.filterPassed(filteredItem, cache.filter)) {
496 addFilteredItemToMap = true;
497 } else {
498
499 remove = true;
500 notify = false;
501 }
502 } else {
503
504
505 try {
506 if(ServiceDiscoveryManager.filterPassed(filteredItem, cache.filter)){
507 addFilteredItemToMap = true;
508 } else {
509
510 remove = true;
511 notify = false;
512 }
513 } catch (SecurityException ex){
514 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
515 ServiceDiscoveryManager.log(Level.FINE,
516 "Exception caught, while attempting to filter a bootstrap proxy", ex);
517 }
518 try {
519 filteredItem.service = ((ServiceProxyAccessor) filteredItem.service).getServiceProxy();
520 if(ServiceDiscoveryManager.filterPassed(filteredItem, cache.filter)){
521 addFilteredItemToMap = true;
522 } else {
523
524 remove = true;
525 notify = false;
526 }
527 } catch (RemoteException ex1) {
528 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
529 ServiceDiscoveryManager.log(Level.FINE,
530 "Exception caught, while attempting to filter a bootstrap proxy", ex1);
531 }
532
533 remove = true;
534 notify = false;
535 }
536 } catch (ClassCastException ex){
537 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
538 ServiceDiscoveryManager.log(Level.FINE,
539 "Exception caught, while attempting to filter a bootstrap proxy", ex);
540 }
541 try {
542 filteredItem.service = ((ServiceProxyAccessor) filteredItem.service).getServiceProxy();
543 if(ServiceDiscoveryManager.filterPassed(filteredItem, cache.filter)){
544 addFilteredItemToMap = true;
545 } else {
546
547 remove = true;
548 notify = false;
549 }
550 } catch (RemoteException ex1) {
551 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
552 ServiceDiscoveryManager.log(Level.FINE,
553 "Exception caught, while attempting to filter a bootstrap proxy", ex1);
554 }
555
556 remove = true;
557 notify = false;
558 }
559 }
560 }
561 }
562
563
564
565
566
567
568
569
570
571 AddOrRemove aor =
572 new AddOrRemove(cache, item, filteredItem,
573 itemToSend, addFilteredItemToMap,
574 remove, notify
575 );
576 cache.serviceIdMap.computeIfPresent(serviceID, aor);
577 if (aor.notify) cache.addServiceNotify(aor.itemToSend);
578 }
579 } finally {
580 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
581 ServiceDiscoveryManager.log(Level.FINEST,
582 "ServiceDiscoveryManager - ServiceDiscardTimerTask completed");
583 }
584 }
585 }
586 }
587
588
589
590
591 private static class AddOrRemove
592 implements BiFunction<ServiceID, ServiceItemReg, ServiceItemReg>
593 {
594 final LookupCacheImpl cache;
595 final ServiceItem item;
596 final ServiceItem filteredItem;
597 final boolean addFilteredItemToMap;
598 final boolean remove;
599 boolean notify;
600 ServiceItem itemToSend;
601
602 AddOrRemove(LookupCacheImpl cache, ServiceItem item,
603 ServiceItem filteredItem, ServiceItem itemToSend,
604 boolean addFilteredItemToMap, boolean remove, boolean notify)
605 {
606 this.cache = cache;
607 this.item = item;
608 this.filteredItem = filteredItem;
609 this.itemToSend = itemToSend;
610 this.addFilteredItemToMap = addFilteredItemToMap;
611 this.remove = remove;
612 this.notify = notify;
613 }
614
615 @Override
616 public ServiceItemReg apply(ServiceID serviceID, ServiceItemReg itemReg) {
617 if (!itemReg.unDiscard()){
618 notify = false;
619 return itemReg;
620 }
621
622
623
624
625
626
627
628
629
630 if (addFilteredItemToMap){
631 itemReg.replaceProxyUsedToTrackChange(null, item);
632 itemReg.setFilteredItem(filteredItem);
633 itemToSend = filteredItem;
634 return itemReg;
635 } else if (remove){
636 return null;
637 }
638 return itemReg;
639 }
640
641 }
642
643
644 @Override
645 public void terminate() {
646 synchronized (this) {
647 if (bCacheTerminated) {
648 return;
649 }
650 bCacheTerminated = true;
651 }
652 sdm.removeLookupCache(this);
653
654 cacheTaskMgr.shutdownNow();
655
656 serviceDiscardTimerTaskMgr.shutdownNow();
657 eventNotificationExecutor.shutdownNow();
658
659 Set set = eventRegMap.entrySet();
660 Iterator iter = set.iterator();
661 while (iter.hasNext()) {
662 Map.Entry e = (Map.Entry) iter.next();
663 EventReg eReg = (EventReg) e.getValue();
664 sdm.cancelLease(eReg.lease);
665 }
666
667 try {
668 lookupListenerExporter.unexport(true);
669 } catch (IllegalStateException e) {
670 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
671 ServiceDiscoveryManager.log(
672 Level.FINEST,
673 "IllegalStateException occurred while unexporting the cache's remote event listener",
674 e
675 );
676 }
677 }
678 incomingEventExecutor.shutdownNow();
679 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)){
680 ServiceDiscoveryManager.log(
681 Level.FINEST,
682 "ServiceDiscoveryManager - LookupCache terminated"
683 );
684 }
685 }
686
687
688 @Override
689 public ServiceItem lookup(ServiceItemFilter myFilter) {
690 checkCacheTerminated();
691 ServiceItem[] ret = getServiceItems(myFilter);
692 if (ret.length == 0) {
693 return null;
694 }
695
696
697 int rand = sdm.random.nextInt(ret.length);
698 return ret[rand];
699 }
700
701
702 @Override
703 public ServiceItem[] lookup(ServiceItemFilter myFilter, int maxMatches) {
704 checkCacheTerminated();
705 if (maxMatches < 1) {
706 throw new IllegalArgumentException("maxMatches must be > 0");
707 }
708 ServiceItem[] sa = getServiceItems(myFilter);
709 int len = sa.length;
710 if (len == 0 || len <= maxMatches) return sa;
711 List<ServiceItem> items = new LinkedList<ServiceItem>();
712 int rand = sdm.random.nextInt(Integer.MAX_VALUE) % len;
713 for (int i = 0; i < len; i++) {
714 items.add(sa[(i + rand) % len]);
715 if (items.size() == maxMatches) {
716 break;
717 }
718 }
719 ServiceItem[] ret = new ServiceItem[items.size()];
720 items.toArray(ret);
721 return ret;
722 }
723
724
725 @Override
726 public void discard(Object serviceReference) {
727 checkCacheTerminated();
728
729
730
731
732
733 Iterator<Map.Entry<ServiceID, ServiceItemReg>> iter = serviceIdMap.entrySet().iterator();
734 while (iter.hasNext()) {
735 Map.Entry<ServiceID, ServiceItemReg> e = iter.next();
736 ServiceItemReg itmReg = e.getValue();
737 ServiceID sid = e.getKey();
738 ServiceItem filteredItem = itmReg.getFilteredItem();
739 if (filteredItem != null && (filteredItem.service).equals(serviceReference)) {
740 Discard dis = new Discard(this, itmReg, filteredItem, sdm.getDiscardWait());
741 serviceIdMap.computeIfPresent(sid, dis);
742 }
743 }
744 }
745
746 private static class Discard
747 implements BiFunction<ServiceID, ServiceItemReg, ServiceItemReg>{
748
749 LookupCacheImpl cache;
750 ServiceItemReg expected;
751 ServiceItem filteredItem;
752 long discardWait;
753
754 Discard(LookupCacheImpl cache, ServiceItemReg itmReg, ServiceItem filteredItem, long discardWait){
755 this.cache = cache;
756 this.expected = itmReg;
757 this.filteredItem = filteredItem;
758 this.discardWait = discardWait;
759 }
760
761 @Override
762 public ServiceItemReg apply(ServiceID sid, ServiceItemReg itmReg) {
763 if (!expected.equals(itmReg)) return itmReg;
764 if (itmReg.discard()) {
765 Future f =
766 cache.serviceDiscardTimerTaskMgr.schedule(
767 new ServiceDiscardTimerTask(cache, sid),
768 discardWait,
769 TimeUnit.MILLISECONDS
770 );
771 cache.serviceDiscardFutures.put(sid, f);
772 cache.removeServiceNotify(filteredItem);
773 }
774 return itmReg;
775 }
776 }
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801 private ServiceItem[] getServiceItems(ServiceItemFilter filter2) {
802 FilteredItems items = new FilteredItems(this, filter2);
803 serviceIdMap.forEach(items);
804 return items.result();
805 }
806
807 private static class FilteredItems implements BiConsumer<ServiceID, ServiceItemReg> {
808
809 private final List<ServiceItem> items;
810 private final ServiceItemFilter filter2;
811 private final LookupCacheImpl cache;
812
813 FilteredItems(LookupCacheImpl cache, ServiceItemFilter filter2){
814 this.items = new LinkedList<ServiceItem>();
815 this.filter2 = filter2;
816 this.cache = cache;
817 }
818
819 @Override
820 public void accept(ServiceID sid, ServiceItemReg itemReg) {
821 ServiceItem itemToFilter = itemReg.getFilteredItem();
822 if ((itemToFilter == null) || (itemReg.isDiscarded())) return;
823
824
825
826
827 itemToFilter = itemToFilter.clone();
828
829 boolean pass = (filter2 == null) || (filter2.check(itemToFilter));
830
831 if (!pass) return;
832
833 if (itemToFilter.service != null) {
834 items.add(itemToFilter);
835 return;
836 }
837
838 Discard dis =
839 new Discard(
840 cache,
841 itemReg,
842 itemToFilter,
843 cache.sdm.getDiscardWait()
844 );
845 cache.serviceIdMap.computeIfPresent(sid, dis);
846 }
847
848 ServiceItem[] result(){
849 ServiceItem[] ret = new ServiceItem[items.size()];
850 items.toArray(ret);
851 return ret;
852 }
853
854 }
855
856 ServiceItem [] processBootStrapProxys(Object [] proxys){
857 int length = proxys.length;
858 Collection<ServiceItem> result = new ArrayList<ServiceItem>(length);
859 for (int i = 0; i < length; i++){
860 Object bootstrap;
861 Entry [] attributes;
862 ServiceID id;
863 try {
864 bootstrap = sdm.bootstrapProxyPreparer.prepareProxy(proxys[i]);
865 attributes = ((ServiceAttributesAccessor) bootstrap).getServiceAttributes();
866 id = ((ServiceIDAccessor) bootstrap).serviceID();
867 result.add(new ServiceItem(id, bootstrap, attributes));
868 } catch (IOException ex) { }
869 }
870 return result.toArray(new ServiceItem[result.size()]);
871 }
872
873
874 @Override
875 public void addListener(ServiceDiscoveryListener listener) {
876 checkCacheTerminated();
877 if (listener == null) {
878 throw new NullPointerException("can't add null listener");
879 }
880
881 ServiceItem[] items = getServiceItems(null);
882 boolean added;
883 sItemListenersWrite.lock();
884 try {
885 added = sItemListeners.add(listener);
886 } finally {
887 sItemListenersWrite.unlock();
888 }
889 if (added){
890 for (int i = 0, l = items.length; i < l; i++) {
891 addServiceNotify(items[i], listener);
892 }
893 }
894 }
895
896
897 @Override
898 public void removeListener(ServiceDiscoveryListener listener) {
899 checkCacheTerminated();
900 if (listener == null) {
901 return;
902 }
903 sItemListenersWrite.lock();
904 try {
905 sItemListeners.remove(listener);
906 } finally {
907 sItemListenersWrite.unlock();
908 }
909 }
910
911
912
913
914
915
916
917 public void addProxyReg(ProxyReg reg) {
918 RegisterListenerTask treg = new RegisterListenerTask(reg, taskSeqN.getAndIncrement(), this);
919 cacheTaskDepMgr.submit(treg);
920 }
921
922
923
924
925
926
927
928 public void removeProxyReg(ProxyReg reg) {
929 ProxyRegDropTask t;
930
931 EventReg eReg = eventRegMap.get(reg);
932 if (eReg != null) {
933 try {
934 sdm.leaseRenewalMgr.remove(eReg.lease);
935 } catch (Exception e) {
936 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
937 ServiceDiscoveryManager.log(
938 Level.FINER,
939 "exception occurred while removing an event registration lease",
940 e
941 );
942 }
943 }
944 t = new ProxyRegDropTask(reg, eReg, taskSeqN.getAndIncrement(), this);
945 cacheTaskDepMgr.removeUselessTask(reg);
946 cacheTaskDepMgr.submit(t);
947 }
948 }
949
950
951
952
953 private void checkCacheTerminated() {
954 sdm.checkTerminated();
955 if (bCacheTerminated) {
956 throw new IllegalStateException("this lookup cache was terminated");
957 }
958 }
959
960
961
962
963
964
965 private void notifyServiceMap(ServiceEvent theEvent){
966 if (theEvent.getSource() == null) {
967 return;
968 }
969 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
970 ServiceDiscoveryManager.log(
971 Level.FINE,
972 "HandleServiceEventTask submitted"
973 );
974 }
975 incomingEventExecutor.submit(new HandleServiceEventTask(this, theEvent));
976 }
977
978
979
980
981
982
983
984
985
986
987 private static class HandleServiceEventTask implements Runnable, Comparable {
988
989 private final LookupCacheImpl cache;
990 private final ServiceEvent theEvent;
991
992
993 private volatile ProxyReg reg;
994 private volatile EventReg eReg;
995 private volatile long timestamp;
996 private volatile ServiceItem item;
997
998 HandleServiceEventTask(LookupCacheImpl cache, ServiceEvent event){
999 this.cache = cache;
1000 this.theEvent = event;
1001 }
1002
1003 @Override
1004 public void run() {
1005 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
1006 ServiceDiscoveryManager.log(Level.FINER,"HandleServiceEventTask started");
1007 }
1008 try {
1009 if (item == null){
1010 if (cache.useInsecureLookup){
1011 item = theEvent.getServiceItem();
1012 } else {
1013
1014
1015
1016
1017
1018 Object proxy = theEvent.getBootstrapProxy();
1019 if (proxy != null){
1020 Entry [] attributes;
1021 try {
1022 proxy = cache.sdm.bootstrapProxyPreparer.prepareProxy(proxy);
1023
1024
1025
1026
1027
1028
1029 attributes = ((ServiceAttributesAccessor)proxy).getServiceAttributes();
1030 item = new ServiceItem(theEvent.getServiceID(), proxy, attributes);
1031 } catch (IOException ex) {
1032 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1033 ServiceDiscoveryManager.log(
1034 Level.FINE,
1035 "exception thrown while attempting to establish contact via a bootstrap proxy",
1036 ex
1037 );
1038 }
1039
1040 }
1041 }
1042 }
1043 }
1044
1045 FIND_ProxyReg: while (reg == null || eReg == null) {
1046 Set<Map.Entry<ProxyReg, EventReg>> set = cache.eventRegMap.entrySet();
1047 Iterator<Map.Entry<ProxyReg, EventReg>> iter = set.iterator();
1048 while (iter.hasNext()) {
1049 Map.Entry<ProxyReg, EventReg> e = iter.next();
1050 eReg = e.getValue();
1051 if (theEvent.getID() == eReg.eventID && theEvent.getSource().equals(eReg.source)) {
1052 reg = e.getKey();
1053 break FIND_ProxyReg;
1054 }
1055 }
1056 try {
1057 cache.incomingEventExecutor.submit(this);
1058 Thread.sleep(50L);
1059 return;
1060 } catch (InterruptedException ex) {
1061 Thread.currentThread().interrupt();
1062 return;
1063 }
1064 }
1065 if (timestamp == 0){
1066 timestamp = System.currentTimeMillis();
1067 } else {
1068
1069 if (!cache.eventRegMap.containsKey(reg)) return;
1070 }
1071 long currentTime = System.currentTimeMillis();
1072 long delta = 0;
1073 boolean resubmit = false;
1074 int waiting = 0;
1075 synchronized (eReg){
1076 if (eReg.discarded()) return;
1077 if (eReg.nonContiguousEvent(theEvent.getSequenceNumber())
1078 && (currentTime - timestamp < 500))
1079 {
1080 resubmit = true;
1081 eReg.notifyAll();
1082 } else {
1083 while (eReg.eventsSuspended()){
1084 try {
1085 waiting ++;
1086 eReg.wait(100L);
1087 waiting --;
1088 if (eReg.discarded()) return;
1089
1090 if (waiting > 0 && eReg.nonContiguousEvent(
1091 theEvent.getSequenceNumber()))
1092 {
1093 eReg.notifyAll();
1094 resubmit = true;
1095 break;
1096 }
1097 } catch (InterruptedException ex) {
1098 Thread.currentThread().interrupt();
1099 return;
1100 }
1101 }
1102 if (!resubmit){
1103 eReg.suspendEvents();
1104 delta = eReg.updateSeqNo(theEvent.getSequenceNumber());
1105 }
1106 }
1107 }
1108 if (resubmit){
1109 try {
1110 cache.incomingEventExecutor.submit(this);
1111 Thread.sleep(50L);
1112 } catch (InterruptedException ex) {
1113 Thread.currentThread().interrupt();
1114 return;
1115 }
1116 return;
1117 }
1118 try {
1119 cache.notifyServiceMap(delta,
1120 theEvent.getServiceID(),
1121 item,
1122 theEvent.getTransition(),
1123 reg);
1124 } finally {
1125 synchronized (eReg){
1126 eReg.releaseEvents();
1127 eReg.notifyAll();
1128 }
1129 }
1130 } catch (RuntimeException e){
1131 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
1132 ServiceDiscoveryManager.log(Level.FINER, "HandleServiceEventTask threw a RuntimeException", e);
1133 } finally {
1134 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
1135 ServiceDiscoveryManager.log(Level.FINER, "HandleServiceEventTask completed");
1136 }
1137 }
1138
1139 @Override
1140 public int compareTo(Object o) {
1141 if (!(o instanceof HandleServiceEventTask)) return 0;
1142 HandleServiceEventTask that = (HandleServiceEventTask) o;
1143 long dif = this.theEvent.getSequenceNumber() - that.theEvent.getSequenceNumber();
1144 if (dif == 0) return 0;
1145 if (dif < 0) return -1;
1146 return 1;
1147 }
1148 }
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186 private void notifyServiceMap(long delta,
1187 ServiceID sid,
1188 ServiceItem item,
1189 int transition,
1190 ProxyReg reg)
1191 {
1192
1193 if (delta == 1) {
1194
1195 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1196 ServiceDiscoveryManager.log(
1197 Level.FINE,
1198 "No gap, handle current ServiceEvent, ServiceID: {0} transition: {1}",
1199 new Object[]{sid, transition}
1200 );
1201 }
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213 if ((item != null) && (item.service == null)) {
1214 return;
1215 }
1216
1217
1218
1219
1220 if (transition == ServiceRegistrar.TRANSITION_MATCH_NOMATCH)
1221 {
1222 handleMatchNoMatch(reg.getProxy(), sid);
1223 } else if (transition == ServiceRegistrar.TRANSITION_NOMATCH_MATCH
1224 || transition == ServiceRegistrar.TRANSITION_MATCH_MATCH)
1225 {
1226 newOldService(reg, sid, item, transition == ServiceRegistrar.TRANSITION_MATCH_MATCH);
1227 }
1228 return;
1229 }
1230 if (delta == 0) {
1231 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1232 ServiceDiscoveryManager.log(
1233 Level.FINE,
1234 "Repeat ServiceEvent, ignore, ServiceID: {0} transition: {1}",
1235 new Object[]{sid, transition}
1236 );
1237 }
1238 return;
1239 }
1240 if (delta < 0) {
1241 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1242 ServiceDiscoveryManager.log(
1243 Level.FINE,
1244 "Old ServiceEvent, ignore, ServiceID: {0} transition: {1}",
1245 new Object[]{sid, transition}
1246 );
1247 }
1248 return;
1249 }
1250
1251 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1252 ServiceDiscoveryManager.log(
1253 Level.FINE,
1254 "Gap in ServiceEvent sequence, performing lookup, ServiceID: {0} transition: {1}",
1255 new Object[]{sid, transition}
1256 );
1257 }
1258 lookup(reg);
1259 }
1260
1261
1262
1263
1264
1265 private void lookup(ProxyReg reg) {
1266 ServiceRegistrar proxy = reg.getProxy();
1267 ServiceItem[] items;
1268
1269 try {
1270 if (useInsecureLookup){
1271 ServiceMatches matches = proxy.lookup(tmpl, Integer.MAX_VALUE);
1272 items = matches.items;
1273 } else {
1274 Object [] matches = ((SafeServiceRegistrar) proxy).lookUp(
1275 tmpl, Integer.MAX_VALUE);
1276 items = processBootStrapProxys(matches);
1277 }
1278 } catch (Exception e) {
1279
1280
1281 sdm.fail(e, proxy, this.getClass().getName(), "run", "Exception occurred during call to lookup", bCacheTerminated);
1282 return;
1283 }
1284 if (items == null) {
1285 throw new AssertionError("spec violation in queried lookup service: ServicesMatches instance returned by call to lookup() method contains null 'items' field");
1286 }
1287
1288
1289 Iterator<Map.Entry<ServiceID, ServiceItemReg>> iter = serviceIdMap.entrySet().iterator();
1290 while (iter.hasNext()) {
1291 Map.Entry<ServiceID, ServiceItemReg> e = iter.next();
1292 ServiceID srvcID = e.getKey();
1293 ServiceItem itemInSnapshot = findItem(srvcID, items);
1294 if (itemInSnapshot != null) continue;
1295 if (Thread.currentThread().isInterrupted()) return;
1296 DissociateLusCleanUpOrphan dlcl = new DissociateLusCleanUpOrphan(this, reg.getProxy());
1297 serviceIdMap.computeIfPresent(srvcID, dlcl);
1298 if (dlcl.itemRegProxy != null) {
1299 itemMatchMatchChange(srvcID, dlcl.itmReg, dlcl.itemRegProxy, dlcl.newItem, false);
1300 } else if (dlcl.notify && dlcl.filteredItem != null) {
1301 removeServiceNotify(dlcl.filteredItem);
1302 }
1303 }
1304
1305 for (int i = 0, l = items.length; i < l; i++) {
1306
1307 if (items[i].service == null) {
1308 continue;
1309 }
1310 if (items[i].serviceID == null && !useInsecureLookup){
1311 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
1312 ServiceDiscoveryManager.log(Level.FINE,
1313 "ServiceItem contained null serviceID field, attempting to retrieve again");
1314 try {
1315 ServiceID id = ((ServiceIDAccessor)items[i].service).serviceID();
1316
1317 if (id == null) continue;
1318 items[i].serviceID = id;
1319 } catch ( IOException e){
1320 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
1321 ServiceDiscoveryManager.log(Level.FINE,
1322 "ServiceItem contained null serviceID field, attempt to retrieve again failed, ignoring",
1323 e);
1324 continue;
1325 }
1326 }
1327 newOldService(reg, items[i].serviceID, items[i], false);
1328 }
1329 }
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357 private void newOldService(ProxyReg reg, ServiceID id, ServiceItem item, boolean matchMatchEvent) {
1358 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1359 ServiceDiscoveryManager.log(
1360 Level.FINE,
1361 "newOldService called, ServiceItem: {0}",
1362 new Object[]{item}
1363 );
1364 }
1365 try {
1366 boolean previouslyDiscovered = false;
1367 ServiceItemReg itemReg;
1368 itemReg = serviceIdMap.get(id);
1369 if (itemReg == null) {
1370 if (!eventRegMap.containsKey(reg)) {
1371
1372 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
1373 ServiceDiscoveryManager.log(
1374 Level.FINER,
1375 "eventRegMap doesn't contain ProxyReg, returning, ServiceItem: {0}",
1376 new Object[]{item}
1377 );
1378 return;
1379 }
1380
1381 itemReg = new ServiceItemReg(reg.getProxy(), item);
1382 ServiceItemReg existed = serviceIdMap.putIfAbsent(id, itemReg);
1383 if (existed != null) {
1384 itemReg = existed;
1385 if (itemReg.isDiscarded()) {
1386 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
1387 ServiceDiscoveryManager.log(
1388 Level.FINER,
1389 "newOldService, discarded returning, ServiceItem: {0}",
1390 new Object[]{item}
1391 );
1392 }
1393 return;
1394 }
1395 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
1396 ServiceDiscoveryManager.log(
1397 Level.FINER,
1398 "newOldService, previously discovered, ServiceItem: {0}",
1399 new Object[]{item}
1400 );
1401 }
1402 previouslyDiscovered = true;
1403 }
1404 } else if (itemReg.isDiscarded()) {
1405 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
1406 ServiceDiscoveryManager.log(
1407 Level.FINER,
1408 "newOldService, discarded returning, ServiceItem: {0}",
1409 new Object[]{item}
1410 );
1411 }
1412 return;
1413 } else {
1414 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER)){
1415 ServiceDiscoveryManager.log(
1416 Level.FINER,
1417 "newOldService, previously discovered, ServiceItem: {0}",
1418 new Object[]{item}
1419 );
1420 }
1421 previouslyDiscovered = true;
1422 }
1423 if (previouslyDiscovered) {
1424
1425 itemMatchMatchChange(id, itemReg, reg.getProxy(), item, matchMatchEvent);
1426 } else {
1427
1428 ServiceItem newFilteredItem
1429 = filterMaybeDiscard(id, itemReg, item, false);
1430
1431 if (newFilteredItem != null) {
1432 addServiceNotify(newFilteredItem);
1433 }
1434 }
1435 } catch (RuntimeException e) {
1436 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
1437 ServiceDiscoveryManager.log(Level.FINE, "Runtime exception thrown in newOldService call", e);
1438 } finally {
1439 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
1440 ServiceDiscoveryManager.log(Level.FINER,
1441 "newOldService call complete, ServiceItem: {0}",
1442 new Object[]{item});
1443 }
1444 }
1445
1446
1447
1448
1449 private ServiceItem findItem(ServiceID sid, ServiceItem[] items) {
1450 if (items != null) {
1451 for (int i = 0, length = items.length; i < length; i++) {
1452 if (sid.equals(items[i].serviceID)) {
1453 return items[i];
1454 }
1455 }
1456 }
1457 return null;
1458 }
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530 private void itemMatchMatchChange(ServiceID srvcID, ServiceItemReg itemReg, ServiceRegistrar proxy, ServiceItem newItem, boolean matchMatchEvent) {
1531
1532
1533
1534 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE)){
1535 ServiceDiscoveryManager.log(
1536 Level.FINE,
1537 "itemMatchMatchChange called, ServiceID: {0} ",
1538 new Object[]{srvcID}
1539 );
1540 }
1541 PreEventState pev = new PreEventState(proxy,itemReg,newItem,matchMatchEvent);
1542 ServiceItem newFilteredItem;
1543 serviceIdMap.computeIfPresent(srvcID, pev);
1544 if (pev.needToFilter){
1545
1546 newFilteredItem = filterMaybeDiscard(srvcID, itemReg, newItem, pev.notDiscarded);
1547 if (newFilteredItem != null) {
1548
1549 if (pev.attrsChanged && pev.oldFilteredItem != null) {
1550 changeServiceNotify(newFilteredItem, pev.oldFilteredItem);
1551 }
1552 if (pev.versionChanged) {
1553 if (pev.notDiscarded && pev.oldFilteredItem != null) {
1554 removeServiceNotify(pev.oldFilteredItem);
1555 }
1556 addServiceNotify(newFilteredItem);
1557 }
1558 }
1559 }
1560 }
1561
1562 private static class PreEventState implements BiFunction<ServiceID, ServiceItemReg, ServiceItemReg> {
1563
1564 private final ServiceRegistrar proxy;
1565 private final ServiceItemReg reg;
1566 private final ServiceItem newItem;
1567 private final boolean matchMatchEvent;
1568 ServiceItem oldItem;
1569 ServiceItem oldFilteredItem;
1570 boolean notDiscarded;
1571 boolean attrsChanged = false;
1572 boolean versionChanged = false;
1573 boolean needToFilter = false;
1574 ServiceRegistrar proxyChanged = null;
1575
1576 PreEventState(ServiceRegistrar proxy, ServiceItemReg reg, ServiceItem newItem, boolean matchMatchEvent){
1577 this.proxy = proxy;
1578 this.reg = reg;
1579 this.newItem = newItem;
1580 this.matchMatchEvent = matchMatchEvent;
1581 }
1582
1583 @Override
1584 public ServiceItemReg apply(ServiceID t, ServiceItemReg itemReg) {
1585 boolean loggable = ServiceDiscoveryManager.logger.isLoggable(Level.FINER);
1586 if (! reg.equals(itemReg)) {
1587 if (loggable)
1588 ServiceDiscoveryManager.log(
1589 Level.FINER,
1590 "PreEventState.apply, ServiceItemReg's not equal, returning. ServiceID: {0}",
1591 new Object[]{t}
1592 );
1593 return itemReg;
1594 }
1595 notDiscarded = !itemReg.isDiscarded();
1596 oldItem = itemReg.getItem();
1597 oldFilteredItem = itemReg.getFilteredItem();
1598 if (itemReg.proxyNotUsedToTrackChange(proxy, newItem)) {
1599
1600 if (loggable)
1601 ServiceDiscoveryManager.log(
1602 Level.FINER,
1603 "PreEventState.apply, proxyNotUsedToTrackChange. ServiceID: {0}",
1604 new Object[]{t}
1605 );
1606 if (matchMatchEvent) {
1607 if (loggable)
1608 ServiceDiscoveryManager.log(
1609 Level.FINER,
1610 "PreEventState.apply, matchMatchEvent true returning. ServiceID: {0}",
1611 new Object[]{t}
1612 );
1613 return itemReg;
1614 }
1615 if (notDiscarded) {
1616 if (loggable)
1617 ServiceDiscoveryManager.log(
1618 Level.FINER,
1619 "PreEventState.apply, notifyServiceRemoved true returning. ServiceID: {0}",
1620 new Object[]{t}
1621 );
1622 return itemReg;
1623 }
1624 if (loggable)
1625 ServiceDiscoveryManager.log(
1626 Level.FINER,
1627 "PreEventState.apply, proxyChanged = proxy. ServiceID: {0}",
1628 new Object[]{t}
1629 );
1630 proxyChanged = proxy;
1631 }
1632 if (!notDiscarded) {
1633 if (loggable)
1634 ServiceDiscoveryManager.log(
1635 Level.FINER,
1636 "PreEventState.apply, !notifyServiceRemoved, replacProxyUsedToTrackChange ServiceID: {0}",
1637 new Object[]{t}
1638 );
1639 itemReg.replaceProxyUsedToTrackChange(proxyChanged, newItem);
1640 itemReg.setFilteredItem(null);
1641 itemReg.discard();
1642 if (matchMatchEvent) {
1643 return itemReg;
1644 }
1645 }
1646
1647
1648
1649 if (matchMatchEvent || sameVersion(newItem, oldItem)) {
1650 if (!notDiscarded) {
1651 if (loggable)
1652 ServiceDiscoveryManager.log(
1653 Level.FINER,
1654 "PreEventState.apply, matchMatchEvent || sameVersion && !notifyServiceRemoved return itemReg, no need to filter ServiceID: {0}",
1655 new Object[]{t}
1656 );
1657 return itemReg;
1658 }
1659
1660
1661
1662
1663
1664
1665
1666 attrsChanged = !LookupAttributes.equal(newItem.attributeSets, oldItem.attributeSets);
1667 if (!attrsChanged) {
1668 if (loggable)
1669 ServiceDiscoveryManager.log(
1670 Level.FINER,
1671 "PreEventState.apply, matchMatchEvent || sameVersion && !attrsChanged return itemReg, no need to filter ServiceID: {0}",
1672 new Object[]{t}
1673 );
1674 return itemReg;
1675 }
1676 } else {
1677
1678 if (loggable)
1679 ServiceDiscoveryManager.log(
1680 Level.FINER,
1681 "PreEventState.apply, !matchMatchEvent &&! sameVersion ==> re-registrattion, versionChanged. ServiceID: {0}",
1682 new Object[]{t}
1683 );
1684 versionChanged = true;
1685 }
1686 if (loggable)
1687 ServiceDiscoveryManager.log(
1688 Level.FINER,
1689 "PreEventState.apply, need to filter true. ServiceID: {0}",
1690 new Object[]{t}
1691 );
1692 needToFilter = true;
1693 return itemReg;
1694 }
1695
1696 }
1697
1698
1699
1700
1701
1702
1703
1704 private static boolean sameVersion(ServiceItem item0, ServiceItem item1) {
1705 boolean fullyEqual = false;
1706 try {
1707 MarshalledInstance mi0 = new MarshalledInstance(item0.service);
1708 MarshalledInstance mi1 = new MarshalledInstance(item1.service);
1709 fullyEqual = mi0.fullyEquals(mi1);
1710 } catch (IOException e) {
1711 if (ServiceDiscoveryManager.logger.isLoggable(Level.INFO)){
1712 ServiceDiscoveryManager.log(
1713 Level.INFO,
1714 "failure marshalling old and new services for equality check",
1715 e
1716 );
1717 }
1718 }
1719 return fullyEqual;
1720 }
1721
1722
1723
1724
1725 public long getLeaseDuration() {
1726 if (leaseDuration == Long.MAX_VALUE) {
1727 return Long.MAX_VALUE;
1728 }
1729 return leaseDuration + startTime - System.currentTimeMillis();
1730 }
1731
1732
1733
1734
1735 private void addServiceNotify(ServiceItem item) {
1736 serviceNotifyDo(null, item, ITEM_ADDED);
1737 }
1738
1739
1740
1741
1742
1743 private void addServiceNotify(ServiceItem item, ServiceDiscoveryListener srvcListener) {
1744 eventNotificationExecutor.execute(new ServiceNotifyDo(null, item, ITEM_ADDED, srvcListener, this));
1745 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)) {
1746 try {
1747 throw new Exception("Back Trace");
1748 } catch (Exception ex) {
1749 ex.fillInStackTrace();
1750 ServiceDiscoveryManager.log(
1751 Level.FINEST,
1752 "Log back trace",
1753 ex
1754 );
1755 }
1756 }
1757 }
1758
1759
1760
1761
1762 private void removeServiceNotify(ServiceItem item) {
1763 serviceNotifyDo(item, null, ITEM_REMOVED);
1764 }
1765
1766
1767
1768
1769
1770 private void changeServiceNotify(ServiceItem newItem, ServiceItem oldItem) {
1771 serviceNotifyDo(oldItem, newItem, ITEM_CHANGED);
1772 }
1773
1774
1775
1776
1777 private void serviceNotifyDo(ServiceItem oldItem, ServiceItem item, int action) {
1778 sItemListenersRead.lock();
1779 try {
1780 if (sItemListeners.isEmpty()) {
1781 return;
1782 }
1783 Iterator<ServiceDiscoveryListener> iter = sItemListeners.iterator();
1784 while (iter.hasNext()) {
1785 ServiceDiscoveryListener sl = iter.next();
1786 eventNotificationExecutor.execute(new ServiceNotifyDo(oldItem, item, action, sl, this));
1787 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST)) {
1788 try {
1789 throw new Exception("Back Trace");
1790 } catch (Exception ex) {
1791 ex.fillInStackTrace();
1792 ServiceDiscoveryManager.log(
1793 Level.FINEST,
1794 "Log back trace",
1795 ex
1796 );
1797 }
1798 }
1799 }
1800 } finally {
1801 sItemListenersRead.unlock();
1802 }
1803 }
1804
1805
1806
1807
1808
1809 private static class ServiceNotifyDo implements Runnable {
1810
1811 final ServiceItem oldItem;
1812 final ServiceItem item;
1813 final int action;
1814 final ServiceDiscoveryListener sl;
1815 final Object lookupCache;
1816
1817 ServiceNotifyDo(ServiceItem oldItem, ServiceItem item, int action, ServiceDiscoveryListener sl, LookupCache lookupCache) {
1818 this.oldItem = oldItem;
1819 this.item = item;
1820 this.action = action;
1821 this.sl = sl;
1822 this.lookupCache = lookupCache;
1823 }
1824
1825 @Override
1826 public void run() {
1827 ServiceDiscoveryEvent event;
1828 try {
1829 event = new ServiceDiscoveryEvent(lookupCache, oldItem, item);
1830 } catch (NullPointerException e) {
1831 boolean lookupCacheNull = lookupCache == null;
1832 boolean oldItemNull = oldItem == null;
1833 boolean itemNull = item == null;
1834 if (ServiceDiscoveryManager.logger.isLoggable(Level.INFO))
1835 ServiceDiscoveryManager.log(
1836 Level.INFO,
1837 "ServiceDiscoveryEvent constructor threw NullPointerException, lookupCache null? {0} oldItem null? {1} item null? {2}",
1838 new Object[]{lookupCacheNull, oldItemNull, itemNull}
1839 );
1840 return;
1841 }
1842 switch (action) {
1843 case ITEM_ADDED:
1844 sl.serviceAdded(event);
1845 break;
1846 case ITEM_REMOVED:
1847 sl.serviceRemoved(event);
1848 break;
1849 case ITEM_CHANGED:
1850 sl.serviceChanged(event);
1851 break;
1852 default:
1853 throw new IllegalArgumentException("case must be one of the following: ITEM_ADDED, ITEM_REMOVED or ITEM_CHANGED");
1854 }
1855 }
1856 }
1857
1858 void initCache() throws RemoteException {
1859
1860
1861
1862 try {
1863 Exporter defaultExporter =
1864 new BasicJeriExporter(TcpServerEndpoint.getInstance(0),
1865 new AtomicILFactory(null, null, LookupCacheImpl.class.getClassLoader()),
1866 false,
1867 false
1868 );
1869 lookupListenerExporter =
1870 sdm.thisConfig.getEntry(
1871 ServiceDiscoveryManager.COMPONENT_NAME,
1872 "eventListenerExporter",
1873 Exporter.class,
1874 defaultExporter
1875 );
1876 } catch (ConfigurationException e) {
1877
1878 ExportException e1 = new ExportException("Configuration exception while " + "retrieving exporter for " + "cache's remote event listener", e);
1879 throw e1;
1880 }
1881
1882
1883
1884 try {
1885 eventNotificationExecutor =
1886 sdm.thisConfig.getEntry(
1887 ServiceDiscoveryManager.COMPONENT_NAME,
1888 "eventNotificationExecutor",
1889 ExecutorService.class
1890 );
1891 } catch (ConfigurationException e) {
1892
1893 eventNotificationExecutor =
1894 new ThreadPoolExecutor(1, 1, 15L, TimeUnit.SECONDS,
1895 new LinkedBlockingQueue<Runnable>(),
1896 new NamedThreadFactory("SDM event notifier: " + toString(), false),
1897 new ThreadPoolExecutor.CallerRunsPolicy());
1898 }
1899
1900
1901
1902
1903 try {
1904 cacheTaskMgr = sdm.thisConfig.getEntry(
1905 ServiceDiscoveryManager.COMPONENT_NAME,
1906 "cacheExecutorService",
1907 ExecutorService.class
1908 );
1909 } catch (ConfigurationException e) {
1910
1911 cacheTaskMgr = new ThreadPoolExecutor(3, 3, 15L, TimeUnit.SECONDS,
1912 new LinkedBlockingQueue<Runnable>(),
1913 new NamedThreadFactory("SDM lookup cache: " + toString(), false),
1914 new ThreadPoolExecutor.CallerRunsPolicy()
1915 );
1916 }
1917 cacheTaskMgr = new ExtensibleExecutorService(cacheTaskMgr, new ExtensibleExecutorService.RunnableFutureFactory() {
1918 @Override
1919 public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
1920 if (r instanceof ObservableFutureTask) {
1921 return (RunnableFuture<T>) r;
1922 }
1923 return new CacheTaskWrapper<T>(r, value);
1924 }
1925
1926 @Override
1927 public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
1928 if (c instanceof ObservableFutureTask) {
1929 return (RunnableFuture<T>) c;
1930 }
1931 return new CacheTaskWrapper<T>(c);
1932 }
1933 });
1934 cacheTaskDepMgr = new CacheTaskDependencyManager(cacheTaskMgr);
1935
1936
1937
1938
1939
1940
1941 try {
1942 serviceDiscardTimerTaskMgr =
1943 sdm.thisConfig.getEntry(
1944 ServiceDiscoveryManager.COMPONENT_NAME,
1945 "discardExecutorService",
1946 ScheduledExecutorService.class
1947 );
1948 } catch (ConfigurationException e) {
1949
1950 serviceDiscardTimerTaskMgr =
1951 new ScheduledThreadPoolExecutor(
1952 4,
1953 new NamedThreadFactory("SDM discard timer: " + toString(), false)
1954 );
1955 }
1956
1957
1958
1959 try {
1960 incomingEventExecutor = sdm.thisConfig.getEntry(
1961 ServiceDiscoveryManager.COMPONENT_NAME,
1962 "ServiceEventExecutorService",
1963 ExecutorService.class
1964 );
1965 } catch (ConfigurationException e){
1966 incomingEventExecutor =
1967 new ThreadPoolExecutor(1, 1, 15L, TimeUnit.SECONDS,
1968 new PriorityBlockingQueue(256),
1969 new NamedThreadFactory("SDM ServiceEvent: " + toString(), false),
1970 new ThreadPoolExecutor.DiscardOldestPolicy()
1971 );
1972 }
1973 incomingEventExecutor = new ExtensibleExecutorService(incomingEventExecutor,
1974 new ExtensibleExecutorService.RunnableFutureFactory()
1975 {
1976
1977 @Override
1978 public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
1979 return new ComparableFutureTask<T>(r,value);
1980 }
1981
1982 @Override
1983 public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
1984 return new ComparableFutureTask<T>(c);
1985 }
1986
1987 }
1988 );
1989
1990 lookupListenerProxy = lookupListener.export();
1991 sdm.proxyRegSetRead.lock();
1992 try {
1993 Iterator<ProxyReg> it = sdm.proxyRegSet.iterator();
1994 while (it.hasNext()) {
1995 addProxyReg(it.next());
1996 }
1997 } finally {
1998 sdm.proxyRegSetRead.unlock();
1999 }
2000
2001 }
2002
2003
2004
2005
2006
2007
2008
2009 private static class ComparableFutureTask<V> extends FutureTask<V>
2010 implements Comparable<ComparableFutureTask>
2011 {
2012
2013 private final Object task;
2014
2015 public ComparableFutureTask(Runnable runnable, V result) {
2016 super(runnable, result);
2017 task = runnable;
2018 }
2019
2020 public ComparableFutureTask(Callable<V> callable){
2021 super(callable);
2022 task = callable;
2023 }
2024
2025 @Override
2026 public int compareTo(ComparableFutureTask o) {
2027 if (task instanceof Comparable && o.task instanceof Comparable){
2028 return ((Comparable)task).compareTo(o.task);
2029 }
2030 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2031 ServiceDiscoveryManager.log(
2032 Level.FINEST,
2033 "task not instanceof Comparable {0}",
2034 new Object [] {task.getClass().getCanonicalName()}
2035 );
2036 return 0;
2037 }
2038
2039 @Override
2040 public int hashCode() {
2041 int hash = 3;
2042 hash = 89 * hash + (this.task != null ? this.task.hashCode() : 0);
2043 return hash;
2044 }
2045
2046 @Override
2047 public boolean equals(Object o){
2048 if (!(o instanceof ComparableFutureTask)) return false;
2049 return this.task.equals(((ComparableFutureTask)o).task);
2050 }
2051
2052 }
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080 private ServiceItem filterMaybeDiscard(ServiceID srvcID, ServiceItemReg itemReg, ServiceItem item, boolean sendEvent) {
2081 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
2082 ServiceDiscoveryManager.log(
2083 Level.FINE,
2084 "filterMaybeDiscard called, ServiceID: {0}",
2085 new Object [] {srvcID}
2086 );
2087 if ((item == null) || (item.service == null)) {
2088 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
2089 ServiceDiscoveryManager.log(
2090 Level.FINER,
2091 "filterMaybeDiscard, item or service was null, returning null, ServiceID: {0}",
2092 new Object []{srvcID}
2093 );
2094 return null;
2095 }
2096 boolean addFilteredItemToMap = false;
2097
2098 ServiceItem filteredItem = item.clone();
2099 boolean discardRetryLater = false;
2100 boolean pass = false;
2101 if (filter == null) {
2102 pass = true;
2103 if (useInsecureLookup){
2104 addFilteredItemToMap = true;
2105 } else {
2106 try {
2107 filteredItem.service =
2108 ((ServiceProxyAccessor) filteredItem.service).getServiceProxy();
2109 addFilteredItemToMap = true;
2110 } catch (RemoteException ex) {
2111 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
2112 ServiceDiscoveryManager.log(Level.FINE,
2113 "Exception thrown while trying to download service proxy",
2114 ex
2115 );
2116 discardRetryLater = true;
2117 }
2118 }
2119 } else {
2120 if (useInsecureLookup){
2121 pass = filter.check(filteredItem);
2122 } else {
2123 try {
2124 pass = filter.check(filteredItem);
2125 } catch (SecurityException ex){
2126 try {
2127
2128 filteredItem.service = ((ServiceProxyAccessor) filteredItem.service).getServiceProxy();
2129 pass = filter.check(filteredItem);
2130 } catch (RemoteException ex1) {
2131 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
2132 ServiceDiscoveryManager.log(Level.FINE,
2133 "Exception thrown while trying to download service proxy",
2134 ex1
2135 );
2136 discardRetryLater = true;
2137 }
2138 } catch (ClassCastException ex){
2139 try {
2140
2141 filteredItem.service = ((ServiceProxyAccessor) filteredItem.service).getServiceProxy();
2142 pass = filter.check(filteredItem);
2143 } catch (RemoteException ex1) {
2144 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINE))
2145 ServiceDiscoveryManager.log(Level.FINE,
2146 "Exception thrown while trying to download service proxy",
2147 ex1
2148 );
2149 discardRetryLater = true;
2150 }
2151 }
2152 }
2153
2154 if (pass && !discardRetryLater && filteredItem.service != null) {
2155 addFilteredItemToMap = true;
2156 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
2157 ServiceDiscoveryManager.log(Level.FINER,
2158 "filterMaybeDiscard, filter passed, ServiceID: {0}",
2159 new Object[]{srvcID}
2160 );
2161 }
2162 }
2163 PostEventState pes =
2164 new PostEventState(this, itemReg, item, filteredItem, sendEvent,
2165 pass, discardRetryLater, addFilteredItemToMap, sdm.getDiscardWait());
2166 serviceIdMap.computeIfPresent(srvcID, pes);
2167 if (pes.notifyRemoved && pes.oldFilteredItem != null) {
2168 removeServiceNotify(pes.oldFilteredItem);
2169 }
2170 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
2171 ServiceDiscoveryManager.log(Level.FINER,
2172 "filterMaybeDiscard, returning filtered ServiceItem: {0}",
2173 new Object []{pes.filteredItemPass}
2174 );
2175 return pes.filteredItemPass;
2176 }
2177
2178 private static class PostEventState implements BiFunction<ServiceID, ServiceItemReg, ServiceItemReg> {
2179
2180 ServiceItem filteredItemPass;
2181 ServiceItem oldFilteredItem = null;
2182 final ServiceItem item;
2183 final ServiceItem filteredItem;
2184 final boolean pass;
2185 final boolean discardRetryLater;
2186 final boolean sendEvent;
2187 final boolean addFilteredItemToMap;
2188 boolean notifyRemoved = false;
2189 final ServiceItemReg expected;
2190 final LookupCacheImpl cache;
2191 final long discardWait;
2192
2193 PostEventState(LookupCacheImpl cache, ServiceItemReg itemReg,
2194 ServiceItem item, ServiceItem filteredItem, boolean sendEvent,
2195 boolean pass, boolean discardRetryLater,
2196 boolean addFilteredItemToMap, long discardWait)
2197 {
2198 this.cache = cache;
2199 this.expected = itemReg;
2200 this.item = item;
2201 this.filteredItem = filteredItem;
2202 this.sendEvent = sendEvent;
2203 this.pass = pass;
2204 this.discardRetryLater = discardRetryLater;
2205 this.addFilteredItemToMap = addFilteredItemToMap;
2206 this.discardWait = discardWait;
2207 }
2208
2209 @Override
2210 public ServiceItemReg apply(ServiceID id, ServiceItemReg itemReg) {
2211 if (!expected.equals(itemReg)) return itemReg;
2212 ServiceItemReg removeIfNull = itemReg;
2213
2214 if (!pass && !discardRetryLater) {
2215 if (itemReg != null) {
2216 if (sendEvent) {
2217 oldFilteredItem = itemReg.getFilteredItem();
2218 notifyRemoved = true;
2219 removeIfNull = null;
2220 } else {
2221 removeIfNull = null;
2222 }
2223 }
2224 filteredItemPass = null;
2225 }
2226 if (addFilteredItemToMap){
2227
2228
2229
2230
2231
2232
2233 cache.cancelDiscardTask(id);
2234 itemReg.replaceProxyUsedToTrackChange(null, item);
2235 itemReg.setFilteredItem(filteredItem);
2236 filteredItemPass = filteredItem;
2237 }
2238
2239 if (discardRetryLater){
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250 if (itemReg.discard()) {
2251 itemReg.replaceProxyUsedToTrackChange(null, item);
2252 itemReg.setFilteredItem(null);
2253 Future f = cache.serviceDiscardTimerTaskMgr.schedule(
2254 new ServiceDiscardTimerTask(cache, item.serviceID),
2255 discardWait,
2256 TimeUnit.MILLISECONDS
2257 );
2258 cache.serviceDiscardFutures.put(item.serviceID, f);
2259 if (sendEvent) {
2260 notifyRemoved = true;
2261 }
2262 }
2263 }
2264 return removeIfNull;
2265 }
2266
2267 }
2268
2269
2270
2271
2272
2273
2274
2275
2276 private void handleMatchNoMatch(ServiceRegistrar proxy, ServiceID srvcID) {
2277 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINER))
2278 ServiceDiscoveryManager.log(Level.FINER,
2279 "handleMatchNoMatch called, ServiceID: {0}",
2280 new Object []{srvcID}
2281 );
2282 DissociateLusCleanUpOrphan dlcl = new DissociateLusCleanUpOrphan(this, proxy);
2283 serviceIdMap.computeIfPresent(srvcID, dlcl);
2284 if (dlcl.itemRegProxy != null) {
2285 itemMatchMatchChange(srvcID, dlcl.itmReg, dlcl.itemRegProxy, dlcl.newItem, false);
2286 } else if (dlcl.notify && dlcl.filteredItem != null) {
2287 removeServiceNotify(dlcl.filteredItem);
2288 }
2289 }
2290
2291
2292
2293
2294 private static class DissociateLusCleanUpOrphan
2295 implements BiFunction<ServiceID, ServiceItemReg, ServiceItemReg>
2296 {
2297
2298 final LookupCacheImpl cache;
2299 boolean notify;
2300 final ServiceRegistrar proxy;
2301 ServiceRegistrar itemRegProxy;
2302 ServiceItemReg itmReg;
2303 ServiceItem newItem;
2304 ServiceItem filteredItem;
2305
2306 DissociateLusCleanUpOrphan(LookupCacheImpl cache, ServiceRegistrar proxy){
2307 this.itmReg = null;
2308 this.notify = false;
2309 this.itemRegProxy = null;
2310 this.cache = cache;
2311 this.proxy = proxy;
2312 this.newItem = null;
2313 this.filteredItem = null;
2314 }
2315
2316 @Override
2317 public ServiceItemReg apply(ServiceID srvcID, ServiceItemReg itemReg) {
2318 itmReg = itemReg;
2319 newItem = itemReg.removeProxy(proxy);
2320 filteredItem = itemReg.getFilteredItem();
2321 if (newItem != null) {
2322 itemRegProxy = itemReg.getProxy();
2323 }
2324
2325
2326
2327 else if (itemReg.hasNoProxys()) {
2328 if (itemReg.isDiscarded()) {
2329
2330 itmReg = null;
2331 cache.cancelDiscardTask(srvcID);
2332 } else {
2333
2334 notify = true;
2335 itmReg = null;
2336 }
2337 }
2338 return itmReg;
2339 }
2340 }
2341
2342
2343
2344
2345 private void cancelDiscardTask(ServiceID sid) {
2346
2347 Future task = serviceDiscardFutures.get(sid);
2348 if (task != null) {
2349 task.cancel(true);
2350 }
2351 }
2352
2353
2354
2355
2356 final static class CacheTaskDependencyManager implements FutureObserver {
2357
2358
2359
2360 private final ConcurrentLinkedQueue<CacheTaskWrapper> pending;
2361 private final ExecutorService executor;
2362
2363 CacheTaskDependencyManager(ExecutorService e) {
2364 this.pending = new ConcurrentLinkedQueue<CacheTaskWrapper>();
2365 executor = e;
2366 }
2367
2368 CacheTaskWrapper submit(Runnable t) {
2369 CacheTaskWrapper future = new CacheTaskWrapper(t, null);
2370 pending.offer(future);
2371 future.addObserver(this);
2372 if (t instanceof CacheTask && ((CacheTask) t).hasDeps()) {
2373 List<FutureObserver.ObservableFuture> deps = new LinkedList<FutureObserver.ObservableFuture>();
2374 Iterator<CacheTaskWrapper> it = pending.iterator();
2375 while (it.hasNext()) {
2376 CacheTaskWrapper w = it.next();
2377 Object c = w.getTask();
2378 if (c instanceof CacheTask && ((CacheTask) t).dependsOn((CacheTask) c)) {
2379 deps.add(w);
2380 }
2381 }
2382 if (deps.isEmpty()) {
2383 executor.submit(future);
2384 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2385 ServiceDiscoveryManager.log(
2386 Level.FINEST,
2387 "ServiceDiscoveryManager {0} submitted to executor task queue",
2388 new Object []{t.toString()}
2389 );
2390 } else {
2391 DependencyLinker linker = new DependencyLinker(executor, deps, future);
2392 linker.register();
2393 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2394 ServiceDiscoveryManager.log(
2395 Level.FINEST,
2396 "ServiceDiscoveryManager {0} registered dependencies",
2397 new Object [] {t.toString()}
2398 );
2399 }
2400 } else {
2401 executor.submit(future);
2402 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2403 ServiceDiscoveryManager.log(Level.FINEST,
2404 "ServiceDiscoveryManager {0} submitted to executor task queue",
2405 new Object []{t.toString()}
2406 );
2407 }
2408 return future;
2409 }
2410
2411 @Override
2412 public void futureCompleted(Future e) {
2413 pending.remove(e);
2414 Object t;
2415 if (e instanceof CacheTaskWrapper) {
2416 t = ((CacheTaskWrapper) e).getTask();
2417 } else {
2418 t = e;
2419 }
2420 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2421 ServiceDiscoveryManager.log(
2422 Level.FINEST,
2423 "ServiceDiscoveryManager {0} completed execution",
2424 new Object[]{t.toString()}
2425 );
2426 }
2427
2428
2429
2430
2431
2432
2433 void removeUselessTask(ProxyReg reg) {
2434 Iterator<CacheTaskWrapper> it = pending.iterator();
2435 while (it.hasNext()) {
2436 CacheTaskWrapper w = it.next();
2437 Object t = w.getTask();
2438 if (t instanceof CacheTask && ((CacheTask) t).isFromProxy(reg)) {
2439 w.cancel(true);
2440 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2441 ServiceDiscoveryManager.log(
2442 Level.FINEST,
2443 "ServiceDiscoveryManager {0} cancelled",
2444 new Object[]{t.toString()}
2445 );
2446 }
2447 }
2448 }
2449
2450 }
2451
2452
2453
2454
2455
2456
2457 final static class CacheTaskWrapper<T> extends ObservableFutureTask<T> {
2458
2459 private final Object task;
2460
2461 CacheTaskWrapper(Runnable r, T result) {
2462 super(r, result);
2463 task = r;
2464 }
2465
2466 CacheTaskWrapper(Callable<T> c) {
2467 super(c);
2468 task = c;
2469 }
2470
2471 Object getTask() {
2472 return task;
2473 }
2474
2475 }
2476
2477
2478
2479
2480 abstract static class CacheTask implements Runnable {
2481
2482 protected final ProxyReg reg;
2483 protected volatile long thisTaskSeqN;
2484
2485 protected CacheTask(ProxyReg reg, long seqN) {
2486 this.reg = reg;
2487 this.thisTaskSeqN = seqN;
2488 if (ServiceDiscoveryManager.logger.isLoggable(Level.FINEST))
2489 ServiceDiscoveryManager.log(
2490 Level.FINEST,
2491 "ServiceDiscoveryManager {0} constructed",
2492 new Object[]{toString()}
2493 );
2494 }
2495
2496
2497 public boolean isFromProxy(ProxyReg reg) {
2498 if (this.reg == null) {
2499 return false;
2500 }
2501 return (this.reg).equals(reg);
2502 }
2503
2504
2505
2506
2507 public ProxyReg getProxyReg() {
2508 return reg;
2509 }
2510
2511
2512
2513
2514 public long getSeqN() {
2515 return thisTaskSeqN;
2516 }
2517
2518 public abstract boolean hasDeps();
2519
2520 public boolean dependsOn(CacheTask task) {
2521 return false;
2522 }
2523 }
2524 }