1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.river.outrigger;
19
20 import org.apache.river.concurrent.RC;
21 import org.apache.river.concurrent.Ref;
22 import org.apache.river.concurrent.Referrer;
23 import org.apache.river.config.Config;
24 import org.apache.river.constants.TimeConstants;
25 import org.apache.river.landlord.Landlord;
26 import org.apache.river.landlord.LandlordUtil;
27 import org.apache.river.landlord.LeasedResource;
28 import org.apache.river.landlord.LeasePeriodPolicy;
29 import org.apache.river.landlord.FixedLeasePeriodPolicy;
30 import org.apache.river.landlord.LocalLandlord;
31 import org.apache.river.landlord.LeaseFactory;
32 import org.apache.river.logging.Levels;
33 import org.apache.river.start.lifecycle.LifeCycle;
34 import org.apache.river.api.util.Startable;
35
36 import net.jini.id.Uuid;
37 import net.jini.id.UuidFactory;
38 import net.jini.activation.ActivationGroup;
39
40 import net.jini.config.Configuration;
41 import net.jini.config.ConfigurationProvider;
42 import net.jini.config.ConfigurationException;
43
44 import net.jini.export.Exporter;
45 import net.jini.jeri.BasicJeriExporter;
46 import net.jini.jeri.BasicILFactory;
47 import net.jini.jeri.tcp.TcpServerEndpoint;
48
49 import net.jini.core.constraint.RemoteMethodControl;
50 import net.jini.security.TrustVerifier;
51 import net.jini.security.ProxyPreparer;
52 import net.jini.security.proxytrust.ServerProxyTrust;
53
54 import net.jini.activation.ActivationExporter;
55
56 import net.jini.core.discovery.LookupLocator;
57 import net.jini.core.lookup.ServiceID;
58 import net.jini.core.entry.Entry;
59 import net.jini.core.event.EventRegistration;
60 import net.jini.core.event.RemoteEventListener;
61 import net.jini.core.lease.Lease;
62 import net.jini.core.lease.LeaseDeniedException;
63 import net.jini.core.lease.UnknownLeaseException;
64 import net.jini.core.transaction.CannotJoinException;
65 import net.jini.core.transaction.CannotNestException;
66 import net.jini.core.transaction.Transaction;
67 import net.jini.core.transaction.TransactionException;
68 import net.jini.core.transaction.UnknownTransactionException;
69 import net.jini.core.transaction.server.ServerTransaction;
70 import net.jini.core.transaction.server.TransactionManager;
71 import net.jini.lookup.entry.ServiceInfo;
72 import net.jini.space.InternalSpaceException;
73 import net.jini.space.JavaSpace;
74
75 import java.io.IOException;
76 import java.rmi.MarshalledObject;
77 import java.rmi.NoSuchObjectException;
78 import java.rmi.RemoteException;
79 import java.rmi.UnmarshalException;
80 import java.rmi.activation.ActivationID;
81 import java.rmi.activation.ActivationSystem;
82 import java.rmi.activation.ActivationException;
83 import java.rmi.server.ExportException;
84 import java.security.AccessControlContext;
85 import java.security.AccessController;
86 import java.security.SecureRandom;
87 import java.security.PrivilegedExceptionAction;
88 import java.security.PrivilegedActionException;
89 import java.util.Collection;
90 import java.util.Collections;
91 import java.util.HashMap;
92 import java.util.Iterator;
93 import java.util.List;
94 import java.util.Map;
95 import java.util.Set;
96 import java.util.Stack;
97 import java.util.concurrent.ConcurrentHashMap;
98 import java.util.concurrent.atomic.AtomicLong;
99 import java.util.logging.Level;
100 import java.util.logging.Logger;
101
102 import javax.security.auth.Subject;
103 import javax.security.auth.login.LoginContext;
104 import javax.security.auth.login.LoginException;
105 import net.jini.core.transaction.server.TransactionConstants;
106 import net.jini.export.CodebaseAccessor;
107 import net.jini.jeri.AtomicILFactory;
108 import net.jini.lookup.ServiceAttributesAccessor;
109 import net.jini.lookup.ServiceIDAccessor;
110 import net.jini.lookup.ServiceProxyAccessor;
111 import org.apache.river.outrigger.proxy.OutriggerServer;
112 import org.apache.river.outrigger.proxy.AdminProxy;
113 import org.apache.river.outrigger.proxy.ConstrainableAdminProxy;
114 import org.apache.river.outrigger.proxy.ParticipantProxy;
115 import org.apache.river.outrigger.proxy.SpaceProxy2;
116 import org.apache.river.outrigger.proxy.EntryRep;
117 import org.apache.river.outrigger.proxy.MatchSetData;
118 import org.apache.river.outrigger.proxy.ConstrainableParticipantProxy;
119 import org.apache.river.outrigger.proxy.ConstrainableSpaceProxy2;
120 import org.apache.river.outrigger.proxy.StorableObject;
121 import org.apache.river.outrigger.proxy.StorableResource;
122 import org.apache.river.outrigger.proxy.OutriggerQueryCookie;
123 import org.apache.river.outrigger.proxy.ProxyVerifier;
124 import org.apache.river.admin.JavaSpaceAdmin;
125 import org.apache.river.proxy.CodebaseProvider;
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172 public class OutriggerServerImpl
173 implements OutriggerServer, TimeConstants, LocalLandlord, Recover,
174 ServerProxyTrust, Startable, ServiceProxyAccessor,
175 ServiceAttributesAccessor, ServiceIDAccessor, CodebaseAccessor
176 {
177
178
179
180 public final static String COMPONENT_NAME = "org.apache.river.outrigger";
181
182
183
184
185
186 static final String lifecycleLoggerName = COMPONENT_NAME + ".lifecycle";
187
188
189 static final String opsLoggerName = COMPONENT_NAME + ".operations";
190
191
192 static final String txnLoggerName = COMPONENT_NAME + ".transactions";
193
194
195 static final String leaseLoggerName = COMPONENT_NAME + ".leases";
196
197
198 static final String iteratorLoggerName = COMPONENT_NAME + ".iterator";
199
200
201 static final String joinLoggerName = COMPONENT_NAME + ".join";
202
203
204 static final String matchingLoggerName = COMPONENT_NAME + ".entryMatching";
205
206
207 static final String eventLoggerName = COMPONENT_NAME + ".event";
208
209
210 static public final String storeLoggerName = COMPONENT_NAME + ".store";
211
212
213
214
215
216 static private final Logger lifecycleLogger =
217 Logger.getLogger(lifecycleLoggerName);
218
219
220 static private final Logger opsLogger = Logger.getLogger(opsLoggerName);
221
222
223 static private final Logger txnLogger = Logger.getLogger(txnLoggerName);
224
225
226 static private final Logger leaseLogger =
227 Logger.getLogger(leaseLoggerName);
228
229
230 private static final Logger iteratorLogger =
231 Logger.getLogger(iteratorLoggerName);
232
233
234 private static final Logger joinLogger = Logger.getLogger(joinLoggerName);
235
236
237
238
239
240 public static final String PERSISTENCE_DIR_CONFIG_ENTRY =
241 "persistenceDirectory";
242
243
244
245
246
247 private final EntryHolderSet contents;
248
249
250
251
252 private final TypeTree types = new TypeTree();
253
254
255
256
257 private final HashMap<String,Long> typeHashes = new HashMap<String,Long>();
258
259
260
261
262 private final TransitionWatchers templates;
263
264
265
266
267
268 final private Map<Uuid,LeasedResource> eventRegistrations =
269 new ConcurrentHashMap<Uuid,LeasedResource>();
270
271
272
273
274 final private Map<Uuid,LeasedResource> contentsQueries =
275 new ConcurrentHashMap<Uuid,LeasedResource>();
276
277
278
279
280
281 private final Map<Long,Txn> recoveredTxns = new ConcurrentHashMap<Long,Txn>();
282
283
284
285
286
287 private final TxnTable txnTable;
288
289
290
291
292
293 private final long crashCount = System.currentTimeMillis();
294
295
296
297
298
299
300 private final TemplateReaper templateReaperThread;
301
302
303
304
305
306
307
308
309
310 private final EntryReaper entryReaperThread;
311
312
313
314
315
316
317 private final ContentsQueryReaper contentsQueryReaperThread;
318
319
320
321
322
323 private final OperationJournal operationJournal;
324
325
326
327
328
329
330 private volatile Notifier notifier;
331
332
333
334
335
336
337 private final ExpirationOpQueue expirationOpQueue;
338
339
340
341
342 private final TxnMonitor txnMonitor;
343
344
345
346
347
348
349 private final OutriggerServerWrapper serverGate;
350
351
352 private final LifeCycle lifeCycle;
353
354
355
356
357 private final Exporter exporter;
358
359
360
361
362
363
364 private volatile OutriggerServer ourRemoteRef;
365
366
367
368
369
370
371
372 private volatile Uuid topUuid = null;
373
374
375
376
377
378
379 private volatile SpaceProxy2 spaceProxy;
380
381
382
383
384
385
386 private volatile AdminProxy adminProxy;
387
388
389
390
391
392
393 private volatile ParticipantProxy participantProxy;
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412 private final AtomicLong sessionId = new AtomicLong();
413
414
415
416
417 private final LeasePeriodPolicy entryLeasePolicy;
418
419
420
421
422 private final LeasePeriodPolicy eventLeasePolicy;
423
424
425
426
427 private final LeasePeriodPolicy contentsLeasePolicy;
428
429
430
431
432 private volatile LeaseFactory leaseFactory;
433
434
435
436
437 private final JoinStateManager joinStateManager = new JoinStateManager();
438
439
440
441
442
443 private static final SecureRandom idGen = new SecureRandom();
444
445
446 private final ActivationID activationID;
447
448
449
450
451
452 private final ActivationSystem activationSystem;
453
454
455
456
457
458 private volatile Store store;
459
460
461
462
463
464
465 private volatile LogOps log = null;
466
467
468 private final Map iterations =
469 Collections.synchronizedMap(new java.util.HashMap());
470
471
472 private final LoginContext loginContext;
473
474
475 private final ProxyPreparer transactionManagerPreparer;
476
477
478 private final ProxyPreparer listenerPreparer;
479
480
481
482
483
484
485 private final ProxyPreparer recoveredTransactionManagerPreparer;
486
487
488
489
490
491
492 private final ProxyPreparer recoveredListenerPreparer;
493
494
495 private final int nextLimit;
496
497
498 private final int takeLimit;
499
500
501
502
503
504
505 private final long maxUnexportDelay;
506
507
508
509
510 private final long unexportRetryDelay;
511
512
513 private boolean started = false;
514
515 private Configuration config;
516 private Thread starter;
517 private Throwable thrown;
518 private Exception except;
519 private boolean persistent;
520 private final long maxServerQueryTimeout;
521 private final AccessControlContext context;
522 private String codebase;
523 private String certFactoryType;
524 private String certPathEncoding;
525 private byte[] encodedCerts;
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554 OutriggerServerImpl(final ActivationID activationID, LifeCycle lifeCycle,
555 String[] configArgs, final boolean persistent,
556 final OutriggerServerWrapper wrapper)
557 throws IOException, ConfigurationException, LoginException,
558 ActivationException
559 {
560 this.lifeCycle = lifeCycle;
561 this.serverGate = wrapper;
562 this.persistent = persistent;
563 InitHolder h = null;
564 LoginContext loginContext = null;
565 try {
566 final Configuration config =
567 ConfigurationProvider.getInstance(configArgs,
568 getClass().getClassLoader());
569 this.config = config;
570 loginContext = (LoginContext) config.getEntry(
571 COMPONENT_NAME, "loginContext", LoginContext.class, null);
572 if (loginContext == null) {
573 h = init(config, persistent, activationID);
574 } else {
575 loginContext.login();
576 try {
577 h = Subject.doAsPrivileged(
578 loginContext.getSubject(),
579 new PrivilegedExceptionAction<InitHolder>() {
580 public InitHolder run() throws Exception {
581 return init(config, persistent, activationID);
582 }
583 },
584 null);
585 } catch (PrivilegedActionException e) {
586 throw e.getCause();
587 }
588 }
589 } catch (IOException e) {
590 unwindConstructor(e);
591 except = e;
592 } catch (ConfigurationException e) {
593 unwindConstructor(e);
594 except = e;
595 } catch (LoginException e) {
596 unwindConstructor(e);
597 except = e;
598 } catch (RuntimeException e) {
599 unwindConstructor(e);
600 except = e;
601 } catch (Throwable e) {
602 unwindConstructor(e);
603 thrown = e;
604 }
605 if (thrown == null && except == null) {
606 lifecycleLogger.log(Level.INFO, "Outrigger server started: {0}", this);
607 this.loginContext = loginContext;
608 this.activationID = h.activationID;
609 txnMonitor = h.txnMonitor;
610 activationSystem = h.activationSystem;
611 transactionManagerPreparer = h.transactionManagerPreparer;
612 listenerPreparer = h.listenerPreparer;
613 this.codebase = h.codebase;
614 this.certFactoryType = h.certFactoryType;
615 this.certPathEncoding = h.certPathEncoding;
616 this.encodedCerts = h.encodedCerts.clone();
617 exporter = h.exporter;
618 contents = h.contents;
619 templates = h.templates;
620 operationJournal = h.operationJournal;
621 expirationOpQueue = h.expirationOpQueue;
622 recoveredTransactionManagerPreparer = h.recoveredTransactionManagerPreparer;
623 recoveredListenerPreparer = h.recoveredListenerPreparer;
624 txnTable = h.txnTable;
625 entryLeasePolicy = h.entryLeasePolicy;
626 eventLeasePolicy = h.eventLeasePolicy;
627 contentsLeasePolicy = h.contentsLeasePolicy;
628 nextLimit = h.nextLimit;
629 takeLimit = h.takeLimit;
630 maxUnexportDelay = h.maxUnexportDelay;
631 unexportRetryDelay = h.unexportRetryDelay;
632 templateReaperThread = h.templateReaperThread;
633 entryReaperThread = h.entryReaperThread;
634 contentsQueryReaperThread = h.contentsQueryReaperThread;
635 starter = h.starter;
636 maxServerQueryTimeout = h.maxServerQueryTimeout;
637 context = h.context;
638 } else {
639 lifecycleLogger.log(Level.SEVERE, "Failed to construct Outrigger server", except == null ? thrown : except);
640 this.loginContext = null;
641 this.activationID = null;
642 txnMonitor = null;
643 activationSystem = null;
644 transactionManagerPreparer = null;
645 listenerPreparer = null;
646 exporter = null;
647 ourRemoteRef = null;
648 contents = null;
649 templates = null;
650 operationJournal = null;
651 expirationOpQueue = null;
652 recoveredTransactionManagerPreparer = null;
653 recoveredListenerPreparer = null;
654 txnTable = null;
655 leaseFactory = null;
656 entryLeasePolicy = null;
657 eventLeasePolicy = null;
658 contentsLeasePolicy = null;
659 nextLimit = 0;
660 takeLimit = 0;
661 maxUnexportDelay = 0;
662 unexportRetryDelay = 0;
663 notifier = null;
664 templateReaperThread = null;
665 entryReaperThread = null;
666 contentsQueryReaperThread = null;
667 starter = null;
668 maxServerQueryTimeout = 0;
669 context = null;
670 }
671 }
672
673 public void start() throws IOException, ConfigurationException, LoginException {
674
675
676 synchronized (this){
677 if (started) return;
678 started = true;
679 }
680 if (thrown != null) throw (Error) thrown;
681 if (except != null){
682 if (except instanceof IOException) throw (IOException) except;
683 else if (except instanceof ConfigurationException) throw (ConfigurationException) except;
684 else if (except instanceof LoginException) throw (LoginException) except;
685 else if (except instanceof RuntimeException) throw (RuntimeException) except;
686 }
687
688 try {
689 AccessController.doPrivileged(new PrivilegedExceptionAction(){
690
691 @Override
692 public Object run() throws ExportException, ConfigurationException, IOException {
693 ourRemoteRef = (OutriggerServer) exporter.export(serverGate);
694
695 txnMonitor.start();
696 starter.start();
697
698
699 if (persistent) {
700 store = (Store)Config.getNonNullEntry(config,
701 COMPONENT_NAME,
702 "store", Store.class);
703 expirationOpQueue.start();
704 }
705
706 if (store != null) {
707 log = store.setupStore(OutriggerServerImpl.this);
708
709
710
711 log.bootOp(System.currentTimeMillis(), getSessionId());
712 recoverTxns();
713 } else if (activationID != null || persistent) {
714
715
716
717 throw new ConfigurationException("Must provide for a " +
718 "store for component " + COMPONENT_NAME + ", by providing " +
719 "valid values for the store or " +
720 PERSISTENCE_DIR_CONFIG_ENTRY + " entries if creating " +
721 " a persistent space");
722 }
723
724
725
726
727 if (topUuid == null) {
728 topUuid = UuidFactory.generate();
729 if (log != null)
730 log.uuidOp(topUuid);
731 }
732
733 if (ourRemoteRef instanceof RemoteMethodControl) {
734 spaceProxy = new ConstrainableSpaceProxy2(ourRemoteRef, topUuid,
735 maxServerQueryTimeout, null);
736 adminProxy =
737 new ConstrainableAdminProxy(ourRemoteRef, topUuid, null);
738 participantProxy =
739 new ConstrainableParticipantProxy(ourRemoteRef, topUuid, null);
740 } else {
741 spaceProxy = new SpaceProxy2(ourRemoteRef, topUuid,
742 maxServerQueryTimeout);
743 adminProxy = new AdminProxy(ourRemoteRef, topUuid);
744 participantProxy = new ParticipantProxy(ourRemoteRef, topUuid);
745 }
746
747 leaseFactory = new LeaseFactory(ourRemoteRef, topUuid);
748
749
750
751
752 joinStateManager.startManager(config, log, spaceProxy,
753 new ServiceID(topUuid.getMostSignificantBits(),
754 topUuid.getLeastSignificantBits()),
755 attributesFor());
756
757 notifier = new Notifier(spaceProxy, recoveredListenerPreparer, config);
758 operationJournal.start();
759 templateReaperThread.start();
760 entryReaperThread.start();
761 contentsQueryReaperThread.start();
762 return null;
763 }
764
765 }, context);
766
767 } catch (PrivilegedActionException ex) {
768 Exception e = ex.getException();
769
770 lifecycleLogger.log(Level.SEVERE, "Failed to start Outrigger server", e);
771
772 unwindExporter(ourRemoteRef, exporter);
773
774
775 try {
776 joinStateManager.destroy();
777 } catch (Throwable t) {
778
779 }
780
781 if (expirationOpQueue != null)
782 expirationOpQueue.terminate();
783
784 if (txnMonitor != null) {
785 try {
786 txnMonitor.destroy();
787 } catch (Throwable t) {
788
789 }
790 }
791
792
793 if (notifier != null) {
794 try {
795 notifier.terminate();
796 } catch (Throwable t) {
797
798 }
799 }
800
801 if (operationJournal != null) {
802 try {
803 operationJournal.terminate();
804 } catch (Throwable t) {
805
806 }
807 }
808
809 unwindReaper(templateReaperThread);
810 unwindReaper(entryReaperThread);
811 unwindReaper(contentsQueryReaperThread);
812
813
814 if (store != null) {
815 try {
816 store.close();
817 } catch (Throwable t) {
818
819 }
820 }
821 if (e instanceof IOException) throw (IOException) e;
822 else if (e instanceof ConfigurationException) throw (ConfigurationException) e;
823 else if (e instanceof LoginException) throw (LoginException) e;
824 else if (e instanceof RuntimeException) throw (RuntimeException) e;
825 } finally {
826 config = null;
827 starter = null;
828 except = null;
829 thrown = null;
830 }
831 }
832
833 @Override
834 public Entry[] getServiceAttributes() throws IOException {
835 return getLookupAttributes();
836 }
837
838 @Override
839 public ServiceID serviceID() throws IOException {
840 return new ServiceID(topUuid.getMostSignificantBits(),
841 topUuid.getLeastSignificantBits());
842 }
843
844 @Override
845 public String getClassAnnotation() throws IOException {
846 return "".equals(codebase) ?
847 CodebaseProvider.getClassAnnotation(OutriggerServer.class)
848 : codebase;
849 }
850
851 @Override
852 public String getCertFactoryType() throws IOException {
853 return certFactoryType;
854 }
855
856 @Override
857 public String getCertPathEncoding() throws IOException {
858 return certPathEncoding;
859 }
860
861 @Override
862 public byte[] getEncodedCerts() throws IOException {
863 return encodedCerts.clone();
864 }
865
866 private static class InitHolder {
867 ActivationID activationID;
868 TxnMonitor txnMonitor;
869 ActivationSystem activationSystem;
870 ProxyPreparer transactionManagerPreparer;
871 ProxyPreparer listenerPreparer;
872 Exporter exporter;
873 EntryHolderSet contents;
874 TransitionWatchers templates;
875 OperationJournal operationJournal;
876 ExpirationOpQueue expirationOpQueue;
877 ProxyPreparer recoveredTransactionManagerPreparer;
878 ProxyPreparer recoveredListenerPreparer;
879 TxnTable txnTable;
880 LeasePeriodPolicy entryLeasePolicy;
881 LeasePeriodPolicy eventLeasePolicy;
882 LeasePeriodPolicy contentsLeasePolicy;
883 int nextLimit;
884 int takeLimit;
885 long maxUnexportDelay;
886 long unexportRetryDelay;
887 TemplateReaper templateReaperThread;
888 EntryReaper entryReaperThread;
889 ContentsQueryReaper contentsQueryReaperThread;
890 Thread starter;
891 long maxServerQueryTimeout;
892 AccessControlContext context;
893 private String codebase;
894 private String certFactoryType;
895 private String certPathEncoding;
896 private byte[] encodedCerts;
897 }
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918 private InitHolder init(Configuration config,
919 boolean persistent,
920 ActivationID activationID)
921 throws IOException, ConfigurationException, ActivationException
922 {
923 InitHolder h = new InitHolder();
924 h.context = AccessController.getContext();
925
926 h.txnMonitor = new TxnMonitor(this, config, h.context);
927
928
929
930 final ProxyPreparer defaultPreparer =
931 new net.jini.security.BasicProxyPreparer();
932
933 if (activationID != null) {
934 final ProxyPreparer aidPreparer =
935 (ProxyPreparer)Config.getNonNullEntry(config,
936 COMPONENT_NAME, "activationIdPreparer",
937 ProxyPreparer.class, defaultPreparer);
938
939 final ProxyPreparer aSysPreparer =
940 (ProxyPreparer)Config.getNonNullEntry(config,
941 COMPONENT_NAME, "activationSystemPreparer",
942 ProxyPreparer.class, defaultPreparer);
943
944 h.activationID =
945 (ActivationID)aidPreparer.prepareProxy(activationID);
946 h.activationSystem =
947 (ActivationSystem)aSysPreparer.prepareProxy(
948 net.jini.activation.ActivationGroup.getSystem());
949 }
950
951
952
953 h.transactionManagerPreparer =
954 (ProxyPreparer)Config.getNonNullEntry(config,
955 COMPONENT_NAME, "transactionManagerPreparer",
956 ProxyPreparer.class, defaultPreparer);
957
958 h.listenerPreparer =
959 (ProxyPreparer)Config.getNonNullEntry(config,
960 COMPONENT_NAME, "listenerPreparer",
961 ProxyPreparer.class, defaultPreparer);
962
963
964
965 h.codebase = Config.getNonNullEntry(config, COMPONENT_NAME,
966 "Codebase_Annotation", String.class, "");
967 h.certFactoryType = Config.getNonNullEntry(config, COMPONENT_NAME,
968 "Codebase_CertFactoryType", String.class, "X.509");
969 h.certPathEncoding = Config.getNonNullEntry(config, COMPONENT_NAME,
970 "Codebase_CertPathEncoding", String.class, "PkiPath");
971 h.encodedCerts = Config.getNonNullEntry(config, COMPONENT_NAME,
972 "Codebase_Certs", byte[].class, new byte[0]);
973
974
975
976
977
978
979
980
981
982
983 final Exporter basicExporter =
984 new BasicJeriExporter(TcpServerEndpoint.getInstance(0),
985 new AtomicILFactory(null, null, OutriggerServer.class.getClassLoader()), false, true);
986 if (activationID == null) {
987 h.exporter = (Exporter)Config.getNonNullEntry(config,
988 COMPONENT_NAME, "serverExporter", Exporter.class,
989 basicExporter);
990 } else {
991 h.exporter = (Exporter)Config.getNonNullEntry(config,
992 COMPONENT_NAME, "serverExporter", Exporter.class,
993 new ActivationExporter(activationID, basicExporter),
994 activationID);
995 }
996
997
998 h.maxServerQueryTimeout =
999 Config.getLongEntry(config, COMPONENT_NAME,
1000 "maxServerQueryTimeout", Long.MAX_VALUE, 1, Long.MAX_VALUE);
1001
1002
1003
1004
1005 h.contents = new EntryHolderSet(this);
1006 h.templates = new TransitionWatchers(this);
1007
1008
1009 h.starter = new Thread() {
1010 public void run() {
1011 OutriggerServerImpl.nextID();
1012 }
1013 };
1014
1015
1016
1017 h.operationJournal = new OperationJournal(h.templates);
1018 h.operationJournal.setDaemon(false);
1019
1020
1021 if (persistent){
1022
1023
1024
1025 h.expirationOpQueue = new ExpirationOpQueue(this);
1026 h.expirationOpQueue.setDaemon(false);
1027 h.recoveredTransactionManagerPreparer =
1028 (ProxyPreparer)Config.getNonNullEntry(config,
1029 COMPONENT_NAME, "recoveredTransactionManagerPreparer",
1030 ProxyPreparer.class, defaultPreparer);
1031
1032 h.recoveredListenerPreparer =
1033 (ProxyPreparer)Config.getNonNullEntry(config,
1034 COMPONENT_NAME, "recoveredListenerPreparer",
1035 ProxyPreparer.class, defaultPreparer);
1036 }
1037
1038
1039
1040 h.txnTable = new TxnTable(h.recoveredTransactionManagerPreparer);
1041
1042
1043
1044
1045 h.entryLeasePolicy = (LeasePeriodPolicy)Config.getNonNullEntry(
1046 config, COMPONENT_NAME, "entryLeasePeriodPolicy",
1047 LeasePeriodPolicy.class,
1048 new FixedLeasePeriodPolicy(Long.MAX_VALUE, 1 * DAYS));
1049
1050 h.eventLeasePolicy = (LeasePeriodPolicy)Config.getNonNullEntry(
1051 config, COMPONENT_NAME, "eventLeasePeriodPolicy",
1052 LeasePeriodPolicy.class,
1053 new FixedLeasePeriodPolicy(1 * HOURS, 1 * HOURS));
1054
1055 h.contentsLeasePolicy = (LeasePeriodPolicy)Config.getNonNullEntry(
1056 config, COMPONENT_NAME, "contentsLeasePeriodPolicy",
1057 LeasePeriodPolicy.class,
1058 new FixedLeasePeriodPolicy(1 * HOURS, 1 * HOURS));
1059
1060 h.nextLimit = Config.getIntEntry(config, COMPONENT_NAME,
1061 "iteratorBatchSize", 100, 1, Integer.MAX_VALUE);
1062
1063 h.takeLimit = Config.getIntEntry(config, COMPONENT_NAME,
1064 "takeMultipleLimit", 100, 1, Integer.MAX_VALUE);
1065
1066 h.maxUnexportDelay = Config.getLongEntry(config, COMPONENT_NAME,
1067 "maxUnexportDelay", 2 * MINUTES, 0, Long.MAX_VALUE);
1068
1069 h.unexportRetryDelay = Config.getLongEntry(config, COMPONENT_NAME,
1070 "unexportRetryDelay", SECONDS, 1, Long.MAX_VALUE);
1071
1072
1073
1074 final long reapingInterval =
1075 Config.getLongEntry(config, COMPONENT_NAME, "reapingInterval",
1076 10000, 1, Long.MAX_VALUE);
1077
1078 final int reapingPriority =
1079 Config.getIntEntry(config, COMPONENT_NAME,"reapingPriority",
1080 Thread.NORM_PRIORITY, Thread.MIN_PRIORITY,
1081 Thread.MAX_PRIORITY);
1082
1083 h.templateReaperThread = new TemplateReaper(reapingInterval);
1084 h.templateReaperThread.setPriority(reapingPriority);
1085 h.templateReaperThread.setDaemon(false);
1086 h.entryReaperThread = new EntryReaper(reapingInterval);
1087 h.entryReaperThread.setPriority(reapingPriority);
1088 h.entryReaperThread.setDaemon(false);
1089 h.contentsQueryReaperThread = new ContentsQueryReaper(reapingInterval);
1090 h.contentsQueryReaperThread.setPriority(reapingPriority);
1091 h.contentsQueryReaperThread.setDaemon(false);
1092 return h;
1093
1094 }
1095
1096
1097
1098
1099
1100
1101
1102
1103 private void unwindConstructor(Throwable cause) {
1104 serverGate.rejectCalls(
1105 new RemoteException("Constructor failure", cause));
1106
1107 lifecycleLogger.log(Level.SEVERE,
1108 "exception encountered while (re)starting server", cause);
1109 }
1110
1111 private void unwindExporter(OutriggerServer os, Exporter e){
1112
1113
1114 if (os != null) {
1115 try {
1116 e.unexport(true);
1117 } catch (Throwable t) {
1118
1119 }
1120 }
1121 }
1122
1123
1124 private void unwindReaper(Reaper r) {
1125 if (r == null)
1126 return;
1127
1128 try {
1129 r.kill();
1130 r.join();
1131 } catch (Throwable t) {
1132
1133 }
1134 }
1135
1136
1137
1138
1139
1140
1141 private void recoverTxns() {
1142
1143 if (recoveredTxns.isEmpty())
1144 return;
1145
1146
1147
1148
1149
1150 final Collection values = recoveredTxns.values();
1151 final Iterator t = values.iterator();
1152 while (t.hasNext()) {
1153
1154 txnTable.recover((Txn)t.next());
1155 }
1156
1157
1158
1159 monitor(values);
1160 recoveredTxns.clear();
1161 }
1162
1163 long getSessionId() {
1164 return sessionId.get();
1165 }
1166
1167
1168
1169
1170
1171 void cancelOp(Uuid cookie, boolean expired) {
1172 if (log != null) log.cancelOp(cookie, expired);
1173 }
1174
1175
1176
1177
1178
1179
1180 void scheduleCancelOp(Uuid cookie) {
1181 if (expirationOpQueue != null)
1182 expirationOpQueue.enqueue(cookie);
1183 }
1184
1185
1186
1187
1188
1189
1190 private void typeCheck(EntryRep rep) throws UnmarshalException {
1191 if (rep == null)
1192 return;
1193
1194 synchronized (typeHashes) {
1195
1196
1197
1198
1199 if (checkClass(rep.classFor(), rep.getHash()))
1200 return;
1201
1202
1203 String[] superclasses = rep.superclasses();
1204 long[] hashes = rep.getHashes();
1205 for (int i = 0; i < superclasses.length; i++) {
1206 if (checkClass(superclasses[i], hashes[i]))
1207 return;
1208 }
1209 }
1210 }
1211
1212
1213
1214
1215
1216
1217
1218
1219 private boolean checkClass(String className, long value)
1220 throws UnmarshalException
1221 {
1222 Long hash = (Long)typeHashes.get(className);
1223
1224 if (hash == null) {
1225 typeHashes.put(className, Long.valueOf(value));
1226 return false;
1227 } else {
1228 if (hash.longValue() != value) {
1229 final String msg = "Class mismatch: " + className;
1230 final UnmarshalException ue = new UnmarshalException(msg);
1231 opsLogger.log(Levels.FAILED, msg, ue);
1232 throw ue;
1233 }
1234 }
1235 return true;
1236 }
1237
1238
1239
1240
1241
1242
1243 private LeasePeriodPolicy.Result grant(LeasedResource resource,
1244 long requestedDuration, LeasePeriodPolicy policy, String policyName)
1245 {
1246 final LeasePeriodPolicy.Result r;
1247 try {
1248 r = policy.grant(resource, requestedDuration);
1249 resource.setExpiration(r.expiration);
1250 } catch (LeaseDeniedException e) {
1251
1252
1253
1254 throw logAndThrow(new
1255 InternalSpaceException("OutriggerServerImpl:" + policyName +
1256 ".grant threw LeaseDeniedException", e),
1257 opsLogger);
1258 }
1259
1260 return r;
1261 }
1262
1263
1264
1265
1266
1267 private void checkForEmpty(EntryRep[] entries, String msg) {
1268 if (entries.length == 0)
1269 throw logAndThrowIllegalArg(msg);
1270 }
1271
1272
1273
1274
1275
1276 private void checkForNull(Object value, String msg) {
1277 if (value == null)
1278 throw logAndThrow(new NullPointerException(msg), opsLogger);
1279 }
1280
1281
1282 private void checkTimeout(long timeout) {
1283 if (timeout < 0) {
1284 throw logAndThrowIllegalArg(
1285 "timeout = " + timeout + "must be a non-negative value");
1286 }
1287 }
1288
1289
1290 private void checkLimit(long limit) {
1291 if (limit < 1) {
1292 throw logAndThrowIllegalArg(
1293 "limit = " + limit + "must be a positive value");
1294 }
1295 }
1296
1297
1298 public long[] write(EntryRep rep, Transaction tr, long lease)
1299 throws TransactionException, RemoteException
1300 {
1301 opsLogger.entering("OutriggerServerImpl", "write");
1302
1303 typeCheck(rep);
1304 rep.pickID();
1305
1306 if (opsLogger.isLoggable(Level.FINER) && (tr != null)) {
1307 ServerTransaction str = serverTransaction(tr);
1308 opsLogger.log(Level.FINER, "OutriggerServerImpl: write under " +
1309 "transaction [mgr:{0} id:{1}]",
1310 new Object[]{str.mgr, Long.valueOf(str.id)});
1311 }
1312
1313 Txn txn = enterTxn(tr);
1314
1315
1316 final LeasePeriodPolicy.Result r =
1317 grant(rep, lease, entryLeasePolicy, "entryLeasePolicy");
1318
1319 final EntryHolder holder = contents.holderFor(rep);
1320 final EntryHandle handle = holder.newEntryHandle(rep, txn);
1321
1322
1323
1324
1325 try {
1326 if (txn != null)
1327 txn.ensureActive();
1328
1329
1330 if (log != null)
1331 log.writeOp(rep, (txn == null) ? null : txn.getId());
1332
1333 synchronized (handle) {
1334 addWrittenRep(handle, holder, txn);
1335 recordTransition(new EntryTransition(handle, txn, true, true,
1336 true));
1337 }
1338
1339 } finally {
1340 if (txn != null)
1341 txn.allowStateChange();
1342 }
1343
1344 if (opsLogger.isLoggable(Level.FINEST)) {
1345 opsLogger.log(Level.FINEST, "writing {0} (txn = {1})",
1346 new Object[]{rep, txn});
1347 }
1348
1349 return new long[] {r.duration,
1350 rep.id().getMostSignificantBits(),
1351 rep.id().getLeastSignificantBits()};
1352 }
1353
1354 public long[] write(EntryRep[] entries, Transaction tr, long[] leaseTimes)
1355 throws TransactionException, RemoteException
1356 {
1357 opsLogger.entering("OutriggerServerImpl", "write<multiple>");
1358 checkForEmpty(entries, "Must write at least one entry");
1359
1360 if (entries.length != leaseTimes.length) {
1361 throw logAndThrowIllegalArg(
1362 "Collection of entries and lease times must be same length");
1363 }
1364
1365 for (int i=0;i<entries.length;i++) {
1366 checkForNull(entries[i], "Can't write null entry");
1367
1368 if (leaseTimes[i] < 0 && leaseTimes[i] != Lease.ANY) {
1369 throw logAndThrowIllegalArg("Bad requested lease length:" +
1370 leaseTimes[i]);
1371 }
1372 }
1373
1374 for (int i=0; i<entries.length; i++) {
1375 typeCheck(entries[i]);
1376 entries[i].pickID();
1377 }
1378
1379 if (opsLogger.isLoggable(Level.FINER) && (tr != null)) {
1380 ServerTransaction str = serverTransaction(tr);
1381 opsLogger.log(Level.FINER, "OutriggerServerImpl: write<multiple> " +
1382 "under transaction [mgr:{0} id:{1}]",
1383 new Object[]{str.mgr, Long.valueOf(str.id)});
1384 }
1385
1386 Txn txn = enterTxn(tr);
1387
1388 final LeasePeriodPolicy.Result[] leaseData =
1389 new LeasePeriodPolicy.Result[entries.length];
1390 final EntryHolder[] holders = new EntryHolder[entries.length];
1391 final EntryHandle[] handles = new EntryHandle[entries.length];
1392
1393
1394 for (int i=0; i<entries.length; i++) {
1395 final EntryRep entry = entries[i];
1396 leaseData[i] =
1397 grant(entry, leaseTimes[i], entryLeasePolicy, "entryLeasePolicy");
1398 holders[i] = contents.holderFor(entry);
1399 handles[i] = holders[i].newEntryHandle(entry, txn);
1400 }
1401
1402
1403
1404
1405 try {
1406 if (txn != null)
1407 txn.ensureActive();
1408
1409
1410 if (log != null)
1411 log.writeOp(entries, (txn == null) ? null : txn.getId());
1412
1413 for (int i=0; i<handles.length; i++) {
1414 synchronized (handles[i]) {
1415 addWrittenRep(handles[i], holders[i], txn);
1416 recordTransition(
1417 new EntryTransition(handles[i], txn, true, true, true));
1418 }
1419 }
1420 } finally {
1421 if (txn != null)
1422 txn.allowStateChange();
1423 }
1424
1425 if (opsLogger.isLoggable(Level.FINEST)) {
1426 opsLogger.log(Level.FINEST, "writing multiples (txn = {0})",
1427 new Object[]{txn});
1428 }
1429
1430 final long[] rslt = new long[entries.length * 3];
1431 for (int i=0; i<entries.length; i++) {
1432 rslt[i] = leaseData[i].duration;
1433 rslt[i+1] = entries[i].id().getMostSignificantBits();
1434 rslt[i+2] = entries[i].id().getLeastSignificantBits();
1435 }
1436 return rslt;
1437 }
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447 private void addWrittenRep(EntryHandle handle, EntryHolder holder,
1448 Txn txn)
1449 {
1450 opsLogger.log(Level.FINEST, "OutriggerServerImpl: addWrittenRep");
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463 types.addTypes(handle.rep());
1464
1465
1466
1467
1468
1469 holder.add(handle, txn);
1470 }
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491 void recordTransition(EntryTransition transition) {
1492 operationJournal.recordTransition(transition);
1493 }
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503 void enqueueDelivery(EventSender sender) {
1504 notifier.enqueueDelivery(sender, context);
1505 }
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546 boolean attemptCapture(EntryHandle handle, TransactableMgr txn,
1547 boolean takeIt, Set lockedEntrySet,
1548 Set<EntryHandle> provisionallyRemovedEntrySet, long now,
1549 QueryWatcher watcher)
1550 {
1551 final EntryHolder holder = contents.holderFor(handle.rep());
1552 final Set conflictSet = new java.util.HashSet();
1553 if (holder.attemptCapture(handle, txn, takeIt,
1554 conflictSet, lockedEntrySet,
1555 provisionallyRemovedEntrySet, now))
1556 return true;
1557
1558 monitor(watcher, conflictSet);
1559 return false;
1560 }
1561
1562
1563 public EventRegistration
1564 notify(EntryRep tmpl, Transaction tr, RemoteEventListener listener,
1565 long leaseTime, MarshalledObject handback)
1566 throws TransactionException, RemoteException
1567 {
1568 opsLogger.entering("OutriggerServerImpl", "notify");
1569
1570 typeCheck(tmpl);
1571
1572 checkForNull(listener, "Passed null listener for event registration");
1573
1574 listener =
1575 (RemoteEventListener)listenerPreparer.prepareProxy(listener);
1576
1577 final long currentOrdinal = operationJournal.currentOrdinal();
1578
1579
1580 tmpl = setupTmpl(tmpl);
1581
1582 ServerTransaction str = serverTransaction(tr);
1583 Txn txn = enterTxn(tr);
1584
1585 final Uuid cookie = UuidFactory.generate();
1586 final long eventID = nextID();
1587 final long now = System.currentTimeMillis();
1588
1589 final EventRegistrationWatcher reg;
1590 if (txn == null) {
1591 reg = new StorableEventWatcher(now, currentOrdinal, cookie,
1592 handback, eventID, listener);
1593 } else {
1594 reg = new TransactableEventWatcher(now, currentOrdinal, cookie,
1595 handback, eventID, listener, txn);
1596 }
1597
1598
1599 grant(reg, leaseTime, eventLeasePolicy, "eventLeasePolicy");
1600
1601
1602
1603
1604 eventRegistrations.put(cookie, reg);
1605
1606
1607
1608
1609 if (txn != null) {
1610 try {
1611 txn.ensureActive();
1612 templates.add(reg, tmpl);
1613 txn.add((Transactable)reg);
1614 } finally {
1615 txn.allowStateChange();
1616 }
1617 } else {
1618
1619 if (log != null)
1620 log.registerOp((StorableResource)reg,
1621 "StorableEventWatcher",
1622 new StorableObject[]{tmpl});
1623
1624 templates.add(reg, tmpl);
1625 }
1626
1627 return new EventRegistration(eventID, spaceProxy,
1628 leaseFactory.newLease(cookie, reg.getExpiration()),
1629 0);
1630 }
1631
1632
1633 public EventRegistration registerForAvailabilityEvent(EntryRep[] tmpls,
1634 Transaction tr, boolean visibilityOnly, RemoteEventListener listener,
1635 long leaseTime, MarshalledObject handback)
1636 throws TransactionException, RemoteException
1637 {
1638 opsLogger.entering("OutriggerServerImpl", "registeForAvailabilityEvent");
1639
1640 checkForNull(listener, "Passed null listener for event registration");
1641 checkForEmpty(tmpls, "Must provide at least one template");
1642
1643
1644 if (leaseTime == 0) {
1645 throw logAndThrowIllegalArg("leaseTime must be non-zero");
1646 }
1647
1648 listener =
1649 (RemoteEventListener)listenerPreparer.prepareProxy(listener);
1650 final long currentOrdinal = operationJournal.currentOrdinal();
1651
1652 ServerTransaction str = serverTransaction(tr);
1653 Txn txn = enterTxn(tr);
1654
1655 for (int i=0; i<tmpls.length; i++) {
1656 typeCheck(tmpls[i]);
1657 }
1658
1659 for (int i=0; i<tmpls.length; i++) {
1660 tmpls[i] = setupTmpl(tmpls[i]);
1661 }
1662
1663 final Uuid cookie = UuidFactory.generate();
1664 final long eventID = nextID();
1665 final long now = System.currentTimeMillis();
1666
1667 final AvailabilityRegistrationWatcher reg;
1668 if (txn == null) {
1669 reg = new StorableAvailabilityWatcher(now, currentOrdinal, cookie,
1670 visibilityOnly, handback, eventID, listener);
1671 } else {
1672 reg = new TransactableAvailabilityWatcher(now, currentOrdinal,
1673 cookie, visibilityOnly, handback, eventID, listener, txn);
1674 }
1675
1676
1677 grant(reg, leaseTime, eventLeasePolicy, "eventLeasePolicy");
1678
1679
1680
1681
1682 eventRegistrations.put(cookie, reg);
1683
1684 if (txn != null) {
1685 try {
1686 txn.ensureActive();
1687 for (int i=0; i<tmpls.length; i++) {
1688 templates.add(reg, tmpls[i]);
1689 }
1690 txn.add((Transactable)reg);
1691 } finally {
1692 txn.allowStateChange();
1693 }
1694 } else {
1695
1696 if (log != null)
1697 log.registerOp((StorableResource)reg,
1698 "StorableAvailabilityWatcher",
1699 tmpls);
1700
1701 for (int i=0; i<tmpls.length; i++) {
1702 templates.add(reg, tmpls[i]);
1703 }
1704 }
1705
1706 return new EventRegistration(eventID, spaceProxy,
1707 leaseFactory.newLease(cookie, reg.getExpiration()),
1708 0);
1709 }
1710
1711
1712
1713
1714
1715
1716
1717
1718 void removeEventRegistration(EventRegistrationRecord reg) {
1719 eventRegistrations.remove(reg.getCookie());
1720 }
1721
1722 private EntryRep setupTmpl(EntryRep tmpl) {
1723
1724
1725
1726
1727
1728
1729
1730 if (tmpl == null)
1731 tmpl = EntryRep.matchAnyEntryRep();
1732
1733
1734
1735
1736
1737 types.addTypes(tmpl);
1738
1739 return tmpl;
1740 }
1741
1742
1743 public void cancel(Uuid cookie) throws UnknownLeaseException {
1744 leaseLogger.entering("OutriggerServerImpl","cancel");
1745
1746
1747 final EntryHandle handle = contents.handleFor(cookie);
1748 if (handle != null) {
1749 synchronized (handle) {
1750 if (handle.removed())
1751 throw throwNewUnknownLeaseException(cookie);
1752
1753 if (handle.isProvisionallyRemoved()) {
1754 try {
1755 handle.waitOnCompleteRemoval();
1756 } catch (InterruptedException e) {
1757
1758 throw new AssertionError(e);
1759 }
1760 throw throwNewUnknownLeaseException(cookie);
1761 }
1762 handle.provisionallyRemove();
1763 }
1764
1765 cancelOp(cookie, false);
1766
1767 synchronized (handle) {
1768 contents.remove(handle);
1769 }
1770
1771 return;
1772 }
1773
1774
1775 final EventRegistrationRecord reg =
1776 (EventRegistrationRecord)eventRegistrations.get(cookie);
1777 if (reg != null && reg.cancel()) {
1778 return;
1779
1780
1781
1782
1783 }
1784
1785 final ContentsQuery contentsQuery =
1786 (ContentsQuery)contentsQueries.get(cookie);
1787 if (contentsQuery != null && contentsQuery.cancel()) {
1788 return;
1789 }
1790
1791 throw throwNewUnknownLeaseException(cookie);
1792 }
1793
1794
1795 public long renew(Uuid cookie, long extension)
1796 throws UnknownLeaseException, LeaseDeniedException
1797 {
1798 leaseLogger.entering("OutriggerServerImpl","renew");
1799
1800 LeasedResource resource;
1801 LeasePeriodPolicy policy;
1802
1803 if (null != (resource = contents.getLeasedResource(cookie)))
1804 policy = entryLeasePolicy;
1805 else if (null !=
1806 (resource = (LeasedResource)eventRegistrations.get(cookie)))
1807 policy = eventLeasePolicy;
1808 else if (null !=
1809 (resource = (LeasedResource)contentsQueries.get(cookie)))
1810 policy = contentsLeasePolicy;
1811 else
1812 throw throwNewUnknownLeaseException(cookie);
1813
1814 synchronized (resource) {
1815 if (resource.getExpiration() <= System.currentTimeMillis()) {
1816
1817 throw throwNewUnknownLeaseException(cookie);
1818 }
1819
1820
1821 final LeasePeriodPolicy.Result r =
1822 policy.renew(resource, extension);
1823
1824 if (log != null)
1825 log.renewOp((Uuid)cookie, r.expiration);
1826
1827 resource.setExpiration(r.expiration);
1828
1829 if (leaseLogger.isLoggable(Level.FINER)) {
1830 leaseLogger.log(Level.FINER, "renew({0},{1}) returns {2}",
1831 new Object[]{cookie, Long.valueOf(extension),
1832 Long.valueOf(r.duration)});
1833 }
1834
1835 return r.duration;
1836 }
1837 }
1838
1839
1840 public Landlord.RenewResults renewAll(Uuid[] cookies,
1841 long[] extensions)
1842 {
1843 leaseLogger.entering("OutriggerServerImpl","renewAll");
1844
1845 if (leaseLogger.isLoggable(Level.FINER)) {
1846 leaseLogger.log(Level.FINER, "renewAll:{0} leases",
1847 Long.valueOf(cookies.length));
1848 }
1849
1850 return LandlordUtil.renewAll(this, cookies, extensions);
1851 }
1852
1853
1854 public Map cancelAll(Uuid[] cookies) {
1855 leaseLogger.entering("OutriggerServerImpl", "cancelAll");
1856 return LandlordUtil.cancelAll(this, cookies);
1857 }
1858
1859
1860 public Object read(EntryRep tmpl, Transaction txn, long timeout,
1861 QueryCookie cookie)
1862 throws TransactionException, RemoteException, InterruptedException
1863 {
1864 if (opsLogger.isLoggable(Level.FINER)) {
1865 opsLogger.log(Level.FINER,
1866 "read:tmpl = {0}, timeout = {1}, cookie = {2}",
1867 new Object[]{tmpl, Long.valueOf(timeout), cookie});
1868 }
1869 return getMatch(tmpl, txn, timeout, false, false, cookie);
1870 }
1871
1872 public Object take(EntryRep tmpl, Transaction txn, long timeout,
1873 QueryCookie cookie)
1874 throws TransactionException, RemoteException, InterruptedException
1875 {
1876 if (opsLogger.isLoggable(Level.FINER)) {
1877 opsLogger.log(Level.FINER,
1878 "take:tmpl = {0}, timeout = {1}, cookie = {2}",
1879 new Object[]{tmpl, Long.valueOf(timeout), cookie});
1880 }
1881 return getMatch(tmpl, txn, timeout, true, false, cookie);
1882 }
1883
1884 public Object readIfExists(EntryRep tmpl, Transaction txn, long timeout,
1885 QueryCookie cookie)
1886 throws TransactionException, RemoteException, InterruptedException
1887 {
1888 if (opsLogger.isLoggable(Level.FINER)) {
1889 opsLogger.log(Level.FINER,
1890 "readIfExists:tmpl = {0}, timeout = {1}, cookie = {2}",
1891 new Object[]{tmpl, Long.valueOf(timeout), cookie});
1892 }
1893 return getMatch(tmpl, txn, timeout, false, true, cookie);
1894 }
1895
1896 public Object takeIfExists(EntryRep tmpl, Transaction txn, long timeout,
1897 QueryCookie cookie)
1898 throws TransactionException, RemoteException, InterruptedException
1899 {
1900 if (opsLogger.isLoggable(Level.FINER)) {
1901 opsLogger.log(Level.FINER,
1902 "takeIfExists:tmpl = {0}, timeout = {1}, cookie = {2}",
1903 new Object[]{tmpl, Long.valueOf(timeout), cookie});
1904 }
1905 return getMatch(tmpl, txn, timeout, true, true, cookie);
1906 }
1907
1908 public Object take(EntryRep[] tmpls, Transaction tr, long timeout,
1909 int limit, QueryCookie queryCookieFromClient)
1910 throws TransactionException, RemoteException
1911 {
1912 if (opsLogger.isLoggable(Level.FINER)) {
1913 opsLogger.log(Level.FINER,
1914 "take<multiple>:timeout = {1}, limit{2} = cookie = {3}",
1915 new Object[]{Long.valueOf(timeout), Integer.valueOf(limit),
1916 queryCookieFromClient});
1917 }
1918
1919 checkForEmpty(tmpls, "Must provide at least one template");
1920
1921 for (int i=0; i<tmpls.length; i++) {
1922 typeCheck(tmpls[i]);
1923 if (tmpls[i] == null)
1924 tmpls[i] = EntryRep.matchAnyEntryRep();
1925 }
1926
1927 checkLimit(limit);
1928 checkTimeout(timeout);
1929
1930 ServerTransaction str = serverTransaction(tr);
1931 Txn txn = enterTxn(tr);
1932
1933
1934
1935
1936
1937 if (txn != null) {
1938 synchronized(txn) {
1939 if (txn.getState() != ACTIVE)
1940 throw throwNewCannotJoinException();
1941 }
1942 }
1943
1944 final long start = System.currentTimeMillis();
1945 final long endTime;
1946 if (Long.MAX_VALUE - timeout <= start)
1947 endTime = Long.MAX_VALUE;
1948 else
1949 endTime = start + timeout;
1950
1951 final OutriggerQueryCookie queryCookie;
1952 if (queryCookieFromClient == null ||
1953 !(queryCookieFromClient instanceof OutriggerQueryCookie))
1954 {
1955 queryCookie = new OutriggerQueryCookie(start);
1956 } else {
1957 queryCookie = (OutriggerQueryCookie)queryCookieFromClient;
1958 }
1959
1960
1961
1962
1963
1964
1965
1966
1967 final OperationJournal.TransitionIterator transitionIterator =
1968 operationJournal.newTransitionIterator();
1969
1970
1971
1972
1973 final Set<String> classes = new java.util.HashSet<String>();
1974 for (int i=0; i<tmpls.length; i++) {
1975 final String whichClass = tmpls[i].classFor();
1976 final Iterator<String> subtypes = types.subTypes(whichClass);
1977 while (subtypes.hasNext()) {
1978 classes.add(subtypes.next());
1979 }
1980 }
1981
1982 limit = Math.min(limit, takeLimit);
1983 EntryHandle[] handles = new EntryHandle[limit];
1984 int found = 0;
1985 final Set conflictSet = new java.util.HashSet();
1986 final Set<EntryHandle> provisionallyRemovedEntrySet =
1987 Collections.newSetFromMap(
1988 RC.map(
1989 new ConcurrentHashMap<Referrer<EntryHandle>,Referrer<Boolean>>(),
1990 Ref.WEAK_IDENTITY,
1991 Ref.STRONG,
1992 10000L,
1993 10000L
1994 ));
1995
1996 for (Iterator<String> i=classes.iterator();
1997 i.hasNext() && found < handles.length;)
1998 {
1999 final String clazz = i.next();
2000 final EntryHolder.ContinuingQuery query =
2001 createQuery(tmpls, clazz, txn, true, start);
2002
2003 if (query == null)
2004 continue;
2005
2006 while (found < handles.length) {
2007 final EntryHandle handle =
2008 query.next(conflictSet, null, provisionallyRemovedEntrySet);
2009 if (handle == null)
2010 break;
2011 handles[found++] = handle;
2012 }
2013 }
2014
2015 if (found > 0) {
2016
2017 return completeTake(handles, found, txn);
2018 }
2019
2020 final long time = System.currentTimeMillis();
2021
2022 if (time >= endTime) {
2023
2024
2025
2026 monitor(conflictSet);
2027
2028
2029
2030 try {
2031 waitOnProvisionallyRemovedEntries(provisionallyRemovedEntrySet);
2032 } catch (InterruptedException e) {
2033
2034 throw new AssertionError(e);
2035 }
2036
2037 return queryCookie;
2038 }
2039
2040
2041
2042
2043
2044
2045 final long startOrdinal =
2046 transitionIterator.currentOrdinalAtCreation();
2047 final TakeMultipleWatcher watcher = new TakeMultipleWatcher(limit, endTime,
2048 queryCookie.startTime, startOrdinal, provisionallyRemovedEntrySet,
2049 txn);
2050
2051
2052
2053
2054
2055 if (txn != null) {
2056 synchronized(txn) {
2057 if (txn.getState() != ACTIVE)
2058 throw throwNewCannotJoinException();
2059
2060 txn.add((Transactable)watcher);
2061 }
2062 }
2063
2064 monitor(watcher, conflictSet);
2065 for (int i=0; i<tmpls.length; i++) {
2066 templates.add(watcher, tmpls[i]);
2067 }
2068
2069
2070
2071
2072
2073
2074 transitionIterator.watcherRegistered();
2075
2076 transitions:
2077 for (EntryTransition i = transitionIterator.next();
2078 i != null;
2079 i = transitionIterator.next())
2080 {
2081 final EntryRep rep = i.getHandle().rep();
2082
2083 for (int j=0; j<tmpls.length; j++) {
2084 final EntryRep tmpl = tmpls[j];
2085 if (rep.isAtLeastA(tmpl.classFor()) && tmpl.matches(rep)) {
2086
2087 if (watcher.catchUp(i, time)) {
2088 break transitions;
2089 }
2090 }
2091 }
2092 }
2093
2094
2095 try {
2096 watcher.waitOnResolution();
2097 } catch (InterruptedException e) {
2098
2099 throw new AssertionError(e);
2100 }
2101
2102 handles = watcher.resolvedWithEntries();
2103 if (handles != null) {
2104 return completeTake(handles, handles.length, txn);
2105 }
2106
2107 final Throwable t = watcher.resolvedWithThrowable();
2108 if (t != null) {
2109 if (opsLogger.isLoggable(Levels.FAILED))
2110 opsLogger.log(Levels.FAILED, t.getMessage(), t);
2111
2112 if (t instanceof RemoteException)
2113 throw (RemoteException)t;
2114
2115 if (t instanceof TransactionException)
2116 throw (TransactionException)t;
2117
2118 if (t instanceof RuntimeException)
2119 throw (RuntimeException)t;
2120
2121 if (t instanceof Error)
2122 throw (Error)t;
2123
2124 throw new InternalSpaceException(
2125 "Query threw unexpected exception", t);
2126 }
2127
2128
2129
2130
2131 try {
2132 waitOnProvisionallyRemovedEntries(provisionallyRemovedEntrySet);
2133 } catch (InterruptedException e) {
2134
2135 throw new AssertionError(e);
2136 }
2137
2138 return queryCookie;
2139 }
2140
2141 private EntryRep[] completeTake(EntryHandle[] handles, int found, Txn txn)
2142 throws TransactionException
2143 {
2144 final EntryRep[] reps = new EntryRep[found];
2145
2146 if (log == null) {
2147 for (int i=0; i<found; i++) {
2148 reps[i] = handles[i].rep();
2149 }
2150 } else {
2151 final Uuid[] uuids = new Uuid[found];
2152 for (int i=0; i<found; i++) {
2153 final EntryRep rep = handles[i].rep();
2154 reps[i] = rep;
2155 uuids[i] = rep.id();
2156 }
2157
2158 if (txn == null) {
2159 log.takeOp(uuids, null);
2160 } else {
2161
2162
2163
2164
2165 try {
2166 txn.ensureActive();
2167 log.takeOp(uuids, txn.getId());
2168 } finally {
2169 txn.allowStateChange();
2170 }
2171 }
2172 }
2173
2174
2175 if (txn == null) {
2176 for (int i=0; i<found; i++) {
2177 synchronized (handles[i]) {
2178 contents.remove(handles[i]);
2179 }
2180 }
2181 }
2182
2183 return reps;
2184 }
2185
2186 private EntryRep completeTake(EntryHandle handle, Txn txn)
2187 throws TransactionException
2188 {
2189 final EntryRep rep = handle.rep();
2190
2191 if (log != null) {
2192 if (txn == null) {
2193 log.takeOp(rep.id(), null);
2194 } else {
2195
2196
2197
2198
2199 try {
2200 txn.ensureActive();
2201 log.takeOp(rep.id(), txn.getId());
2202 } finally {
2203 txn.allowStateChange();
2204 }
2205 }
2206 }
2207
2208
2209 if (txn == null) {
2210 synchronized (handle) {
2211 contents.remove(handle);
2212 }
2213 }
2214
2215 return rep;
2216 }
2217
2218
2219
2220
2221 private EntryHolder.ContinuingQuery createQuery(EntryRep[] tmpls,
2222 String clazz, Txn txn, boolean takeIt, long now)
2223 {
2224 final EntryHolder holder = contents.holderFor(clazz);
2225 final String[] supertypes = holder.supertypes();
2226
2227 if (supertypes == null)
2228 return null;
2229
2230 final List<EntryRep> tmplsToCheck = new java.util.LinkedList<EntryRep>();
2231 for (int i=0; i<tmpls.length; i++) {
2232 final EntryRep tmpl = tmpls[i];
2233 final String tmplClass = tmpl.classFor();
2234 if (tmplClass.equals(clazz) || tmpl == EntryRep.matchAnyEntryRep()) {
2235 tmplsToCheck.add(tmpl);
2236 } else {
2237 for (int j=0; j<supertypes.length; j++) {
2238 if (tmplClass.equals(supertypes[j])) {
2239 tmplsToCheck.add(tmpl);
2240 break;
2241 }
2242 }
2243 }
2244 }
2245
2246 return holder.continuingQuery(
2247 tmplsToCheck.toArray(new EntryRep[tmplsToCheck.size()]),
2248 txn, takeIt, now);
2249 }
2250
2251
2252
2253
2254
2255
2256 private static void waitOnProvisionallyRemovedEntries(
2257 Set<EntryHandle> provisionallyRemovedEntrySet)
2258 throws InterruptedException
2259 {
2260 if (provisionallyRemovedEntrySet.isEmpty())
2261 return;
2262
2263 final Set<EntryHandle> keys = provisionallyRemovedEntrySet;
2264
2265 for (Iterator i=keys.iterator(); i.hasNext();) {
2266 final EntryHandle handle = (EntryHandle)i.next();
2267 if (handle == null)
2268 continue;
2269 synchronized (handle) {
2270 handle.waitOnCompleteRemoval();
2271 }
2272 }
2273 }
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306 private Object
2307 getMatch(EntryRep tmpl, Transaction tr, long timeout, boolean takeIt,
2308 boolean ifExists, QueryCookie queryCookieFromClient)
2309 throws RemoteException, InterruptedException, TransactionException
2310 {
2311 typeCheck(tmpl);
2312 checkTimeout(timeout);
2313
2314
2315 final long startTime = System.currentTimeMillis();
2316 final long endTime;
2317 if (Long.MAX_VALUE - timeout <= startTime)
2318 endTime = Long.MAX_VALUE;
2319 else
2320 endTime = startTime + timeout;
2321
2322
2323
2324
2325 Txn txn = enterTxn(tr);
2326
2327
2328
2329
2330
2331 if (txn != null) {
2332 synchronized(txn) {
2333 if (txn.getState() != ACTIVE)
2334 throw throwNewCannotJoinException();
2335 }
2336 }
2337
2338
2339
2340
2341
2342
2343
2344
2345 final OperationJournal.TransitionIterator transitionIterator =
2346 operationJournal.newTransitionIterator();
2347
2348
2349 if (tmpl == null)
2350 tmpl = EntryRep.matchAnyEntryRep();
2351
2352 EntryHandle handle = null;
2353 final Set conflictSet = new java.util.HashSet();
2354
2355 final Set<Uuid> lockedEntrySet =
2356 (ifExists? Collections.newSetFromMap( new ConcurrentHashMap<Uuid,Boolean>()):null);
2357
2358
2359 final Set<EntryHandle> provisionallyRemovedEntrySet =
2360 Collections.newSetFromMap(
2361 RC.map(
2362 new ConcurrentHashMap<Referrer<EntryHandle>,Referrer<Boolean>>(),
2363 Ref.WEAK_IDENTITY,
2364 Ref.STRONG,
2365 10000L,
2366 10000L
2367 ));
2368
2369
2370
2371
2372 handle = find(tmpl, txn, takeIt, conflictSet, lockedEntrySet,
2373 provisionallyRemovedEntrySet);
2374 opsLogger.log(Level.FINEST, "getMatch, initial search found {0}", handle);
2375
2376 if (handle != null) {
2377 if (takeIt)
2378 return completeTake(handle, txn);
2379 else
2380 return handle.rep();
2381 }
2382
2383 if (opsLogger.isLoggable(Level.FINEST)) {
2384 opsLogger.log(Level.FINEST, "{0} conflicts, endTime = {1}",
2385 new Object[] {Integer.valueOf(conflictSet.size()),
2386 Long.valueOf(endTime)});
2387 }
2388
2389 final OutriggerQueryCookie queryCookie;
2390 if (queryCookieFromClient == null ||
2391 !(queryCookieFromClient instanceof OutriggerQueryCookie))
2392 {
2393 queryCookie = new OutriggerQueryCookie(startTime);
2394 } else {
2395 queryCookie = (OutriggerQueryCookie)queryCookieFromClient;
2396 }
2397
2398
2399 final long time = System.currentTimeMillis();
2400 if (time >= endTime) {
2401
2402
2403
2404 monitor(conflictSet);
2405
2406
2407 waitOnProvisionallyRemovedEntries(provisionallyRemovedEntrySet);
2408 return queryCookie;
2409 }
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436 final SingletonQueryWatcher watcher;
2437 final long startOrdinal =
2438 transitionIterator.currentOrdinalAtCreation();
2439 if (!ifExists && !takeIt && txn == null) {
2440 watcher =
2441 new ReadWatcher(endTime, queryCookie.startTime,
2442 startOrdinal);
2443 } else if (ifExists && !takeIt) {
2444 if (txn == null) {
2445 watcher = new ReadIfExistsWatcher(endTime, queryCookie.startTime,
2446 startOrdinal, lockedEntrySet);
2447 } else {
2448 watcher = new TransactableReadIfExistsWatcher(endTime,
2449 queryCookie.startTime, startOrdinal, lockedEntrySet,
2450 provisionallyRemovedEntrySet, txn);
2451 }
2452 } else if (!ifExists && (takeIt || txn != null)) {
2453 watcher = new ConsumingWatcher(endTime, queryCookie.startTime,
2454 startOrdinal, provisionallyRemovedEntrySet, txn, takeIt);
2455 } else if (ifExists && takeIt) {
2456 watcher = new TakeIfExistsWatcher(endTime,
2457 queryCookie.startTime, startOrdinal, lockedEntrySet,
2458 provisionallyRemovedEntrySet, txn);
2459 } else {
2460 throw new AssertionError("Can't create watcher for query");
2461 }
2462
2463
2464
2465
2466
2467 if (txn != null) {
2468 synchronized(txn) {
2469 if (txn.getState() != ACTIVE)
2470 throw throwNewCannotJoinException();
2471
2472 txn.add((Transactable)watcher);
2473 }
2474 }
2475
2476 monitor(watcher, conflictSet);
2477
2478 templates.add(watcher, tmpl);
2479
2480
2481
2482
2483
2484
2485 transitionIterator.watcherRegistered();
2486 final String tmplClass = tmpl.classFor();
2487
2488 for (EntryTransition i = transitionIterator.next();
2489 i != null;
2490 i = transitionIterator.next())
2491 {
2492 final EntryRep rep = i.getHandle().rep();
2493 if (rep.isAtLeastA(tmplClass) && tmpl.matches(rep)) {
2494
2495 if (watcher.catchUp(i, time)) {
2496 break;
2497 }
2498 }
2499 }
2500
2501
2502
2503
2504
2505 if (ifExists)
2506 operationJournal.markCaughtUp((IfExistsWatcher)watcher);
2507
2508 watcher.waitOnResolution();
2509 handle = watcher.resolvedWithEntry();
2510 if (handle != null) {
2511 if (takeIt)
2512 return completeTake(handle, txn);
2513 else
2514 return handle.rep();
2515 }
2516
2517 final Throwable t = watcher.resolvedWithThrowable();
2518 if (t != null) {
2519 if (opsLogger.isLoggable(Levels.FAILED))
2520 opsLogger.log(Levels.FAILED, t.getMessage(), t);
2521
2522 if (t instanceof RemoteException)
2523 throw (RemoteException)t;
2524
2525 if (t instanceof InterruptedException)
2526 throw (InterruptedException)t;
2527
2528 if (t instanceof TransactionException)
2529 throw (TransactionException)t;
2530
2531 if (t instanceof RuntimeException)
2532 throw (RuntimeException)t;
2533
2534 if (t instanceof Error)
2535 throw (Error)t;
2536
2537 throw new InternalSpaceException(
2538 "Query threw unexpected exception", t);
2539 }
2540
2541
2542
2543 waitOnProvisionallyRemovedEntries(provisionallyRemovedEntrySet);
2544 if (ifExists && ((IfExistsWatcher)watcher).isLockedEntrySetEmpty())
2545 return null;
2546
2547 return queryCookie;
2548 }
2549
2550
2551
2552
2553
2554 private void monitor(QueryWatcher watcher, Collection<Txn> toMonitor) {
2555 if (!toMonitor.isEmpty())
2556 txnMonitor.add(watcher, toMonitor);
2557 }
2558
2559
2560
2561
2562
2563
2564 void monitor(Collection<Txn> toMonitor) {
2565 if (!toMonitor.isEmpty())
2566 txnMonitor.add(toMonitor);
2567 }
2568
2569
2570
2571
2572
2573 void dump(String name, EntryRep rep) {
2574 dump(contents.holderFor(rep), name, rep);
2575 }
2576
2577
2578
2579
2580
2581 static void dump(EntryHolder holder, String name, EntryRep rep) {
2582 try {
2583 holder.dump(name + " " + rep.entry());
2584 } catch (Exception e) {
2585 e.printStackTrace();
2586 }
2587 }
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597 private EntryHandle
2598 find(EntryRep tmplRep, Txn txn, boolean takeIt, Set conflictSet,
2599 Set lockedEntrySet, Set<EntryHandle> provisionallyRemovedEntrySet)
2600 throws TransactionException
2601 {
2602 final String whichClass = tmplRep.classFor();
2603
2604
2605
2606
2607
2608 Iterator subtypes = types.subTypes(whichClass);
2609 String className = null;
2610 EntryHandle result = null;
2611 boolean foundConflicts = false;
2612 EntryHolder holder = null;
2613
2614 while (subtypes.hasNext()) {
2615 className = (String) subtypes.next();
2616 opsLogger.log(Level.FINEST,
2617 "OutriggerServerImpl: find: className = {0}", className);
2618
2619 holder = contents.holderFor(className);
2620 result = holder.hasMatch(tmplRep, txn, takeIt, conflictSet,
2621 lockedEntrySet, provisionallyRemovedEntrySet);
2622 if (result != null) {
2623 return result;
2624 }
2625 }
2626
2627
2628 return null;
2629 }
2630
2631
2632
2633
2634
2635
2636
2637 static long nextID() {
2638 return idGen.nextLong();
2639 }
2640
2641
2642
2643
2644 public MatchSetData contents(EntryRep[] tmpls, Transaction tr,
2645 long leaseTime, long limit)
2646 throws TransactionException, RemoteException
2647 {
2648 if (opsLogger.isLoggable(Level.FINER)) {
2649 opsLogger.log(Level.FINER,
2650 "contents:tmpls = {0}, tr = {1}, leaseTime = {2}, " +
2651 "limit = {3}",
2652 new Object[]{tmpls, tr, Long.valueOf(leaseTime), Long.valueOf(limit)});
2653 }
2654
2655 checkForEmpty(tmpls, "Must provide at least one template");
2656 checkLimit(limit);
2657
2658 for (int i=0; i<tmpls.length; i++) {
2659 typeCheck(tmpls[i]);
2660 if (tmpls[i] == null)
2661 tmpls[i] = EntryRep.matchAnyEntryRep();
2662 }
2663
2664
2665
2666
2667
2668 if (leaseTime < 1 && leaseTime != Lease.ANY) {
2669 throw logAndThrowIllegalArg(
2670 "leaseTime = " + leaseTime + ", must be postive or Lease.ANY");
2671 }
2672
2673 ServerTransaction str = serverTransaction(tr);
2674 Txn txn = enterTxn(tr);
2675
2676
2677
2678
2679
2680 if (txn != null) {
2681 synchronized(txn) {
2682 if (txn.getState() != ACTIVE)
2683 throw throwNewCannotJoinException();
2684 }
2685 }
2686
2687 final Uuid uuid = UuidFactory.generate();
2688 final ContentsQuery contentsQuery = new ContentsQuery(uuid, tmpls,
2689 txn, limit);
2690 final EntryRep[] reps = contentsQuery.nextBatch(null,
2691 System.currentTimeMillis());
2692
2693 if (reps[reps.length-1] == null) {
2694
2695 return new MatchSetData(null, reps, -1);
2696 }
2697
2698
2699 final LeasePeriodPolicy.Result r =
2700 grant(contentsQuery, leaseTime, contentsLeasePolicy,
2701 "contentsLeasePolicy");
2702
2703 contentsQueries.put(uuid, contentsQuery);
2704 return new MatchSetData(uuid, reps, r.duration);
2705 }
2706
2707 public EntryRep[] nextBatch(Uuid contentsQueryUuid, Uuid entryUuid)
2708 throws NoSuchObjectException
2709 {
2710 opsLogger.entering("OutriggerServerImpl", "nextBatch");
2711 final ContentsQuery contentsQuery =
2712 (ContentsQuery)contentsQueries.get(contentsQueryUuid);
2713 if (contentsQuery == null)
2714 throw throwNewNoSuchObjectException("Unkown MatchSet", opsLogger);
2715
2716 final long now = System.currentTimeMillis();
2717 synchronized (contentsQuery) {
2718 if (contentsQuery.getExpiration() <= now) {
2719 contentsQuery.cancel();
2720 throwNewNoSuchObjectException("Contents query expired", opsLogger);
2721 }
2722 }
2723
2724 try {
2725 return contentsQuery.nextBatch(entryUuid, now);
2726 } catch (TransactionException e) {
2727 synchronized (contentsQuery) {
2728 contentsQuery.cancel();
2729 }
2730 throw throwNewNoSuchObjectException("Transaction no longer active",
2731 e, opsLogger);
2732 }
2733 }
2734
2735
2736
2737
2738
2739 private class ContentsQuery implements LeasedResource {
2740
2741 final private Uuid uuid;
2742
2743
2744
2745
2746
2747 final private Iterator classesIterator;
2748
2749
2750 final private EntryRep[] tmpls;
2751
2752
2753 final private Txn txn;
2754
2755
2756 final private Object lock = new Object();
2757
2758
2759 private volatile long expiration;
2760
2761
2762 private EntryHolder.ContinuingQuery currentQuery;
2763
2764
2765
2766
2767
2768 private long remaining;
2769
2770
2771 private Uuid lastEntry;
2772
2773
2774 private EntryRep[] lastBatch;
2775
2776
2777
2778
2779
2780 final private Set<EntryHandle> provisionallyRemovedEntrySet
2781 = Collections.newSetFromMap(
2782 RC.map(
2783 new ConcurrentHashMap<Referrer<EntryHandle>,Referrer<Boolean>>(),
2784 Ref.WEAK_IDENTITY,
2785 Ref.STRONG,
2786 10000L,
2787 10000L
2788 )) ;
2789
2790 private ContentsQuery(Uuid uuid, EntryRep[] tmpls, Txn txn, long limit) {
2791 this.uuid = uuid;
2792 this.tmpls = tmpls;
2793 this.txn = txn;
2794 remaining = limit;
2795 Set classes = new java.util.HashSet(128);
2796 for (int i=0; i<tmpls.length; i++) {
2797 final String whichClass = tmpls[i].classFor();
2798 final Iterator subtypes = types.subTypes(whichClass);
2799 while (subtypes.hasNext()) {
2800 classes.add(subtypes.next());
2801 }
2802 }
2803
2804 classesIterator = classes.iterator();
2805 }
2806
2807 private boolean advanceCurrentQuery(long now) {
2808 while (classesIterator.hasNext()) {
2809 currentQuery = createQuery(tmpls, (String)classesIterator.next(),
2810 txn, false, now);
2811 if (currentQuery == null)
2812 continue;
2813
2814 return true;
2815 }
2816
2817 return false;
2818 }
2819
2820 private EntryRep[] nextBatch(Uuid lastReceived, long now)
2821 throws TransactionException
2822 {
2823 synchronized (lock) {
2824 if (lastReceived != null && !lastReceived.equals(lastEntry)) {
2825
2826 return lastBatch;
2827 }
2828
2829
2830 if (currentQuery == null) {
2831 if (!advanceCurrentQuery(now)) {
2832
2833
2834
2835 try {
2836 waitOnProvisionallyRemovedEntries(
2837 provisionallyRemovedEntrySet);
2838 } catch (InterruptedException e) {
2839
2840 throw new AssertionError(e);
2841 }
2842 return new EntryRep[1];
2843 }
2844 } else {
2845 currentQuery.restart(now);
2846 }
2847
2848 final Set conflictSet = new java.util.HashSet();
2849
2850 lastBatch = new EntryRep[nextLimit];
2851 int i = 0;
2852 while (remaining > 0 && i < lastBatch.length) {
2853 final EntryHandle handle =
2854 currentQuery.next(conflictSet, null,
2855 provisionallyRemovedEntrySet);
2856
2857 if (handle == null) {
2858 if (advanceCurrentQuery(now)) {
2859
2860 continue;
2861 } else {
2862
2863
2864 currentQuery = null;
2865 break;
2866 }
2867 }
2868
2869 lastBatch[i] = handle.rep();
2870 i++;
2871 remaining--;
2872 }
2873
2874 monitor(conflictSet);
2875 if (i == 0) {
2876
2877
2878 lastEntry = null;
2879 } else {
2880 lastEntry = lastBatch[i-1].id();
2881 }
2882 return lastBatch;
2883 }
2884 }
2885
2886
2887
2888
2889
2890
2891 private boolean cancel() {
2892 if (contentsQueries.remove(uuid) == null)
2893 return false;
2894
2895 expiration = Long.MIN_VALUE;
2896 return true;
2897 }
2898
2899 public void setExpiration(long newExpiration) {
2900 expiration = newExpiration;
2901 }
2902
2903 public long getExpiration() {
2904 return expiration;
2905 }
2906
2907 public Uuid getCookie() {
2908 return uuid;
2909 }
2910 }
2911
2912
2913
2914
2915 public Object getServiceProxy() {
2916 opsLogger.entering("OutriggerServerImpl", "getServiceProxy");
2917 return spaceProxy;
2918 }
2919
2920 synchronized Object getProxy() {
2921 return ourRemoteRef;
2922 }
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934 @Override
2935 public Object getAdmin() {
2936 opsLogger.entering("OutriggerServerImpl", "getAdmin");
2937 return adminProxy;
2938 }
2939
2940
2941
2942 @Override
2943 public JavaSpace space() {
2944 opsLogger.entering("OutriggerServerImpl", "space");
2945 return spaceProxy;
2946 }
2947
2948
2949 @Override
2950 public Uuid contents(EntryRep tmpl, Transaction tr)
2951 throws TransactionException, RemoteException
2952 {
2953 iteratorLogger.entering("OutriggerServerImpl", "contents");
2954
2955 typeCheck(tmpl);
2956 ServerTransaction str = serverTransaction(tr);
2957
2958 Txn txn = enterTxn(tr);
2959
2960
2961
2962
2963
2964 if (txn != null) {
2965 synchronized(txn) {
2966 if (txn.getState() != ACTIVE)
2967 throw throwNewCannotJoinException();
2968 }
2969 }
2970
2971 final Uuid uuid = UuidFactory.generate();
2972 iterations.put(uuid, new IteratorImpl(tmpl, txn));
2973 return uuid;
2974 }
2975
2976 @Override
2977 public EntryRep[] nextReps(Uuid iterationUuid, int max,
2978 Uuid entryUuid)
2979 throws NoSuchObjectException
2980 {
2981 iteratorLogger.entering("OutriggerServerImpl", "nextReps");
2982
2983 final IteratorImpl iterImpl =
2984 (IteratorImpl)iterations.get(iterationUuid);
2985 if (iterImpl == null)
2986 throw throwNewNoSuchObjectException("Unknown iteration",
2987 iteratorLogger);
2988
2989 return iterImpl.nextReps(max, entryUuid);
2990 }
2991
2992 @Override
2993 public void delete(Uuid iterationUuid, Uuid entryUuid)
2994 throws NoSuchObjectException
2995 {
2996 iteratorLogger.entering("OutriggerServerImpl", "delete");
2997
2998 final IteratorImpl iterImpl =
2999 (IteratorImpl)iterations.get(iterationUuid);
3000 if (iterImpl == null)
3001 throw throwNewNoSuchObjectException("Unknown iteration",
3002 iteratorLogger);
3003
3004 iterImpl.delete(entryUuid);
3005 }
3006
3007 @Override
3008 public void close(Uuid iterationUuid) throws NoSuchObjectException {
3009 iteratorLogger.entering("OutriggerServerImpl", "close");
3010
3011 final IteratorImpl iterImpl =
3012 (IteratorImpl)iterations.remove(iterationUuid);
3013 if (iterImpl == null)
3014 throw throwNewNoSuchObjectException("Unknown iteration",
3015 iteratorLogger);
3016
3017 iterImpl.close();
3018 }
3019
3020
3021
3022
3023
3024
3025 @Override
3026 public void destroy() {
3027 iteratorLogger.entering("OutriggerServerImpl", "destroy");
3028
3029 serverGate.rejectCalls(
3030 new NoSuchObjectException("Service is destroyed"));
3031 (new DestroyThread()).start();
3032 lifecycleLogger.log(Level.INFO,
3033 "Outrigger server destroy thread started: {0}", this);
3034 }
3035
3036
3037
3038
3039
3040 private class DestroyThread extends Thread {
3041
3042
3043 public DestroyThread() {
3044 super("DestroyThread");
3045
3046 setDaemon(false);
3047 }
3048
3049 public void run() {
3050 lifecycleLogger.log(Level.FINE,
3051 "Outrigger server destroy thread running: {0}", this);
3052
3053
3054
3055 try {
3056 logDestroyPhase("destroying JoinManager");
3057 joinStateManager.destroy();
3058 } catch (Exception t) {
3059 logDestroyProblem("destroying JoinManager", t);
3060 }
3061
3062
3063
3064 if (activationID != null) {
3065 try {
3066
3067 logDestroyPhase("unregistering object");
3068 activationSystem.unregisterObject(activationID);
3069 } catch (Exception t) {
3070 logDestroyProblem("unregistering server", t);
3071 }
3072 }
3073
3074
3075 logDestroyPhase("unexporting force = false");
3076 long now = System.currentTimeMillis();
3077 long end_time = now + maxUnexportDelay;
3078 if (end_time < 0) {
3079
3080 end_time = Long.MAX_VALUE;
3081 }
3082
3083 boolean unexported = false;
3084 try {
3085 while ((!unexported) && (now < end_time)) {
3086
3087 unexported = exporter.unexport(false);
3088
3089 if (!unexported) {
3090 try {
3091
3092
3093
3094
3095
3096
3097
3098 final long sleepTime =
3099 Math.min(unexportRetryDelay, end_time - now);
3100
3101
3102
3103
3104
3105 sleep(sleepTime);
3106 now = System.currentTimeMillis();
3107 } catch (InterruptedException e) {
3108
3109
3110 logDestroyProblem("unexport retry delay sleep", e);
3111 break;
3112 }
3113 }
3114 }
3115 } catch (Throwable t) {
3116 logDestroyProblem(
3117 "trying \"nice\" unexport, will try forceful unexport", t);
3118 }
3119
3120
3121
3122
3123
3124 if (!unexported) {
3125
3126 logDestroyPhase("unexporting force = true");
3127 try {
3128 unexported = exporter.unexport(true);
3129 } catch (Throwable t) {
3130 logDestroyProblem("trying forceful unexport", t);
3131 }
3132 }
3133
3134
3135
3136
3137
3138 try {
3139 logDestroyPhase("destroying txnMonitor");
3140 txnMonitor.destroy();
3141 } catch (Exception t) {
3142 logDestroyProblem("destroying txnMonitor", t);
3143 }
3144
3145 try {
3146 logDestroyPhase("terminating notifier");
3147 notifier.terminate();
3148 } catch (Exception t) {
3149 logDestroyProblem("terminating notifier ", t);
3150 }
3151
3152 logDestroyPhase("terminating operation journal");
3153 operationJournal.terminate();
3154
3155 destroyReaper(templateReaperThread);
3156 destroyReaper(entryReaperThread);
3157 destroyReaper(contentsQueryReaperThread);
3158
3159 if (expirationOpQueue != null) {
3160 logDestroyPhase("terminating expiration op queue");
3161 expirationOpQueue.terminate();
3162 }
3163
3164 try {
3165 logDestroyPhase("joining operation journal");
3166 operationJournal.join();
3167 } catch (InterruptedException ie) {
3168 logDestroyProblem("joining operation journal", ie);
3169 }
3170
3171 joinThread(operationJournal);
3172 joinThread(templateReaperThread);
3173 joinThread(entryReaperThread);
3174 joinThread(contentsQueryReaperThread);
3175
3176 if (expirationOpQueue != null) {
3177 joinThread(expirationOpQueue);
3178 }
3179
3180 if (store != null) {
3181 try {
3182 logDestroyPhase("destroying store");
3183 store.destroy();
3184 } catch (Exception t) {
3185 logDestroyProblem("destroying store", t);
3186 }
3187 }
3188
3189 if (activationID != null) {
3190 logDestroyPhase("calling ActivationGroup.inactive");
3191 try {
3192 ActivationGroup.inactive(activationID, exporter);
3193 } catch (RemoteException e) {
3194 logDestroyProblem("calling ActivationGroup.inactive", e);
3195 } catch (ActivationException e) {
3196 logDestroyProblem("calling ActivationGroup.inactive", e);
3197 }
3198 }
3199
3200 if (lifeCycle != null) {
3201
3202 logDestroyPhase("calling lifeCycle.unregister");
3203 lifeCycle.unregister(serverGate);
3204 }
3205
3206 if (loginContext != null) {
3207 try {
3208 logDestroyPhase("logging out");
3209 loginContext.logout();
3210 } catch (Exception e) {
3211 logDestroyProblem("logging out", e);
3212 }
3213 }
3214
3215 lifecycleLogger.log(Level.INFO,
3216 "Outrigger server destroy thread finished: {0}", this);
3217 }
3218 }
3219
3220
3221 private void destroyReaper(Reaper r) {
3222 logDestroyPhase("stopping " + r.getName());
3223 r.kill();
3224 }
3225
3226
3227 private void joinThread(Thread t) {
3228 try {
3229 logDestroyPhase("joining " + t.getName());
3230 t.join();
3231 } catch (InterruptedException ie) {
3232 logDestroyProblem("joining " + t.getName(), ie);
3233 }
3234 }
3235
3236
3237 private void logDestroyProblem(String part, Throwable t) {
3238 lifecycleLogger.log(Level.INFO, "exception encountered " + part +
3239 ", continuing", t);
3240 }
3241
3242
3243 private void logDestroyPhase(String part) {
3244 if (lifecycleLogger.isLoggable(Level.FINER))
3245 lifecycleLogger.log(Level.FINER,
3246 "outrigger server:" + part + ":" + this);
3247 }
3248
3249
3250
3251 public Entry[] getLookupAttributes() {
3252 joinLogger.entering("OutriggerServerImpl", "getLookupAttributes");
3253 return joinStateManager.getLookupAttributes();
3254 }
3255
3256
3257 public void addLookupAttributes(Entry[] attrSets) {
3258 joinLogger.entering("OutriggerServerImpl", "addLookupAttributes");
3259 joinStateManager.addLookupAttributes(attrSets);
3260 }
3261
3262
3263 public void modifyLookupAttributes(Entry[] attrSetTemplates,
3264 Entry[] attrSets)
3265 {
3266 joinLogger.entering("OutriggerServerImpl", "modifyLookupAttributes");
3267 joinStateManager.modifyLookupAttributes(attrSetTemplates, attrSets);
3268 }
3269
3270
3271 public String[] getLookupGroups() {
3272 joinLogger.entering("OutriggerServerImpl", "getLookupGroups");
3273 return joinStateManager.getLookupGroups();
3274 }
3275
3276
3277 public void addLookupGroups(String[] groups) {
3278 joinLogger.entering("OutriggerServerImpl", "addLookupGroups");
3279 joinStateManager.addLookupGroups(groups);
3280 }
3281
3282
3283 public void removeLookupGroups(String[] groups) {
3284 joinLogger.entering("OutriggerServerImpl", "removeLookupGroups");
3285 joinStateManager.removeLookupGroups(groups);
3286 }
3287
3288
3289 public void setLookupGroups(String[] groups) {
3290 joinLogger.entering("OutriggerServerImpl", "setLookupGroups");
3291 joinStateManager.setLookupGroups(groups);
3292 }
3293
3294
3295 public LookupLocator[] getLookupLocators() {
3296 joinLogger.entering("OutriggerServerImpl", "getLookupLocators");
3297 return joinStateManager.getLookupLocators();
3298 }
3299
3300
3301 public void addLookupLocators(LookupLocator[] locators)
3302 throws RemoteException
3303 {
3304 joinLogger.entering("OutriggerServerImpl", "addLookupLocators");
3305 joinStateManager.addLookupLocators(locators);
3306 }
3307
3308
3309 public void removeLookupLocators(LookupLocator[] locators)
3310 throws RemoteException
3311 {
3312 joinLogger.entering("OutriggerServerImpl", "removeLookupLocators");
3313 joinStateManager.removeLookupLocators(locators);
3314 }
3315
3316
3317 public void setLookupLocators(LookupLocator[] locators)
3318 throws RemoteException
3319 {
3320 joinLogger.entering("OutriggerServerImpl", "setLookupLocators");
3321 joinStateManager.setLookupLocators(locators);
3322 }
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332 private class AllReps implements RepEnum {
3333 RepEnum curEnum;
3334 final Stack toDo;
3335 final Txn txn;
3336
3337
3338 AllReps(String classFor, Txn txn) {
3339 toDo = new Stack();
3340 this.txn = txn;
3341 setup(classFor);
3342 }
3343
3344
3345
3346
3347
3348
3349
3350 private synchronized void setup(String classFor) {
3351 if (classFor == null)
3352 return;
3353
3354
3355
3356
3357
3358
3359 final Iterator matchingTypes = types.subTypes(classFor);
3360 while (matchingTypes.hasNext()) {
3361 toDo.push((String)matchingTypes.next());
3362 }
3363
3364 if (!toDo.isEmpty())
3365 curEnum = enumFor((String)toDo.pop());
3366 }
3367
3368
3369
3370
3371 private RepEnum enumFor(String classFor) {
3372 EntryHolder holder = contents.holderFor(classFor);
3373 return holder.contents(txn);
3374 }
3375
3376
3377 public synchronized EntryRep nextRep() {
3378
3379
3380
3381
3382
3383 for (;;) {
3384 if (curEnum == null) {
3385 return null;
3386 }
3387
3388 EntryRep rep = curEnum.nextRep();
3389 if (rep != null) {
3390 return rep;
3391 }
3392
3393
3394 if (toDo.isEmpty()) {
3395 curEnum = null;
3396 return null;
3397 }
3398
3399 curEnum = enumFor((String)toDo.pop());
3400 }
3401 }
3402 }
3403
3404
3405
3406
3407 private class IteratorImpl {
3408
3409 private final EntryRep tmpl;
3410
3411
3412 private RepEnum repEnum;
3413
3414
3415 boolean closed;
3416
3417
3418
3419
3420
3421 private EntryRep lastBatch[] = null;
3422
3423
3424 private Uuid lastId = null;
3425
3426
3427
3428
3429
3430 IteratorImpl(EntryRep tmpl, Txn txn) {
3431 if (tmpl == null)
3432 tmpl = EntryRep.matchAnyEntryRep();
3433 this.tmpl = tmpl;
3434 repEnum = new AllReps(tmpl.classFor(), txn);
3435 }
3436
3437
3438
3439
3440 private void rememberLast(EntryRep[] newLast) {
3441 lastBatch = newLast;
3442 if (newLast == null)
3443 lastId = null;
3444 else
3445 lastId = lastBatch[lastBatch.length-1].id();
3446 }
3447
3448
3449 public synchronized EntryRep[] nextReps(int max, Uuid id) {
3450 if (closed && id != null && lastId == null)
3451
3452 return null;
3453
3454 assertOpen();
3455 if (id != null && lastId == null) {
3456 throw logAndThrow(new InternalSpaceException("First call to " +
3457 "RemoteIter.next() should have id == null"),
3458 iteratorLogger);
3459 }
3460
3461
3462 if (id != null && lastId != null && !id.equals(lastId))
3463
3464
3465 return lastBatch;
3466
3467
3468
3469
3470
3471 if (repEnum == null) {
3472 close();
3473 return null;
3474 }
3475
3476 if (max <= 0 && max != JavaSpaceAdmin.USE_DEFAULT)
3477 throw new AssertionError("Invalid iterator proxy");
3478
3479 if (max == JavaSpaceAdmin.USE_DEFAULT)
3480 max = 128;
3481
3482 final int limit = Math.min(max, 512);
3483 EntryRep[] reps = new EntryRep[limit];
3484
3485
3486 int i = 0;
3487
3488 while (i < reps.length) {
3489 reps[i] = repEnum.nextRep();
3490
3491 if (reps[i] == null) {
3492 repEnum = null;
3493
3494 if (i == 0) {
3495 close();
3496 return null;
3497 }
3498 EntryRep[] r = new EntryRep[i];
3499 System.arraycopy(reps, 0, r, 0, r.length);
3500 rememberLast(r);
3501 return r;
3502 }
3503
3504
3505 if (tmpl.matches(reps[i])) {
3506 i++;
3507 }
3508 }
3509
3510 rememberLast(reps);
3511 return reps;
3512 }
3513
3514
3515
3516
3517
3518 public synchronized void delete(Uuid id) {
3519 assertOpen();
3520 try {
3521 boolean found = false;
3522 for (int i=0; i<lastBatch.length; i++) {
3523 if (lastBatch[i].id().equals(id)) {
3524 found = true;
3525 break;
3526 }
3527 }
3528
3529 if (!found) {
3530 throw logAndThrow(
3531 new InternalSpaceException("Asked to delete entry " +
3532 "not returned by last nextReps() call"),
3533 iteratorLogger);
3534 }
3535
3536 cancel(id);
3537 } catch (UnknownLeaseException e) {
3538
3539
3540 }
3541 }
3542
3543
3544
3545
3546 public synchronized void close() {
3547 closed = true;
3548 repEnum = null;
3549 rememberLast(null);
3550 }
3551
3552
3553
3554
3555
3556 private void assertOpen() throws IllegalStateException {
3557 if (closed) {
3558 throw logAndThrow(new IllegalStateException(
3559 "closed AdminIterator"), iteratorLogger);
3560 }
3561 }
3562 }
3563
3564
3565
3566
3567
3568
3569
3570
3571
3572
3573
3574
3575
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586 public void recoverSessionId(long sessionId) {
3587 long bumpValue = Integer.MAX_VALUE;
3588
3589 this.sessionId.addAndGet(bumpValue);
3590 }
3591
3592 public void recoverJoinState(StoredObject state) throws Exception {
3593 state.restore(joinStateManager);
3594 }
3595
3596 public void recoverWrite(StoredResource entry, Long txnId)
3597 throws Exception
3598 {
3599 EntryRep rep = new EntryRep();
3600 Txn txn = getRecoveredTxn(txnId);
3601 entry.restore(rep);
3602
3603
3604
3605
3606 typeCheck(rep);
3607
3608 final EntryHolder holder = contents.holderFor(rep);
3609 final EntryHandle handle = holder.newEntryHandle(rep, txn);
3610 addWrittenRep(handle, holder, txn);
3611 }
3612
3613 public void recoverTake(Uuid cookie, Long txnId) throws Exception {
3614 final EntryHandle handle = contents.handleFor(cookie);
3615 EntryHolder holder = contents.holderFor(handle.rep());
3616 Txn txn = getRecoveredTxn(txnId);
3617 holder.recoverTake(handle, txn);
3618 }
3619
3620 public void recoverTransaction(Long txnId, StoredObject transaction)
3621 throws Exception
3622 {
3623 Txn txn = new Txn(txnId.longValue());
3624 transaction.restore(txn);
3625 recoveredTxns.put(txnId, txn);
3626 }
3627
3628 private Txn getRecoveredTxn(Long txnId) {
3629 if (txnId == null)
3630 return null;
3631
3632 Txn txn = recoveredTxns.get(txnId);
3633 if (txn == null)
3634 throw new InternalSpaceException("recover of write/take with " +
3635 "unknown txnId" );
3636 return txn;
3637 }
3638
3639 public void recoverRegister(StoredResource registration, String type,
3640 StoredObject[] storedTemplates)
3641 throws Exception
3642 {
3643 final StorableResource reg;
3644 if (type.equals("StorableEventWatcher")) {
3645 assert storedTemplates.length == 1;
3646 reg = new StorableEventWatcher(0, operationJournal.currentOrdinal(),
3647 getSessionId());
3648 } else if (type.equals("StorableAvailabilityWatcher")) {
3649 reg = new StorableAvailabilityWatcher(0,
3650 operationJournal.currentOrdinal(), getSessionId());
3651 } else {
3652 throw new AssertionError("Unknown registration type (" + type +
3653 ") while recovering event registration");
3654 }
3655
3656 registration.restore(reg);
3657
3658 for (int i=0; i<storedTemplates.length; i++) {
3659 final EntryRep templ = new EntryRep();
3660 storedTemplates[i].restore(templ);
3661 templates.add((TransitionWatcher)reg, setupTmpl(templ));
3662 }
3663
3664 eventRegistrations.put(reg.getCookie(), reg);
3665 }
3666
3667 public void recoverUuid(Uuid uuid) {
3668 topUuid = uuid;
3669 }
3670
3671
3672
3673
3674 public TrustVerifier getProxyVerifier() {
3675 opsLogger.entering("OutriggerServerImpl", "getProxyVerifier");
3676 return new ProxyVerifier(ourRemoteRef, topUuid);
3677 }
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692 private Txn enterTxn(Transaction baseTr)
3693 throws TransactionException, RemoteException
3694 {
3695 txnLogger.entering("OutriggerServerImpl", "enterTxn");
3696 if (baseTr == null)
3697 return null;
3698
3699 ServerTransaction tr = serverTransaction(baseTr);
3700 if (tr.isNested()) {
3701 final String msg = "subtransactions not supported";
3702 final CannotNestException cne = new CannotNestException(msg);
3703 txnLogger.log(Levels.FAILED, msg, cne);
3704 throw cne;
3705 }
3706
3707
3708 Txn txn = null;
3709 try {
3710 txn = txnTable.get(tr.mgr, tr.id);
3711 } catch (IOException e) {
3712 } catch (ClassNotFoundException e) {
3713 } catch (SecurityException e) {
3714
3715
3716
3717
3718
3719
3720
3721
3722 }
3723
3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736 if (txn == null) {
3737 final TransactionManager mgr =
3738 (TransactionManager)
3739 transactionManagerPreparer.prepareProxy(tr.mgr);
3740 tr = new ServerTransaction(mgr, tr.id);
3741 tr.join(participantProxy, crashCount);
3742 txn = txnTable.put(tr);
3743 }
3744
3745 return txn;
3746 }
3747
3748
3749
3750
3751
3752
3753 private Txn getTxn(TransactionManager mgr, long id)
3754 throws UnknownTransactionException, UnmarshalException
3755 {
3756 Txn txn;
3757 try {
3758 txn = (Txn) txnTable.get(mgr, id);
3759 } catch (IOException e) {
3760 throw brokenTxn(mgr, id, e);
3761 } catch (ClassNotFoundException e) {
3762 throw brokenTxn(mgr, id, e);
3763 }
3764
3765 if (txnLogger.isLoggable(Level.FINEST)) {
3766 txnLogger.log(Level.FINEST,
3767 "OutriggerServerImpl: getTxn got Txn={0}", txn);
3768 }
3769
3770 if (txn == null) {
3771 final String msg = "unknown transaction [mgr:" + mgr +
3772 ", id:" + id + "], passed to abort/prepare/commit";
3773 final UnknownTransactionException ute =
3774 new UnknownTransactionException(msg);
3775 if (txnLogger.isLoggable(Levels.FAILED))
3776 txnLogger.log(Levels.FAILED, msg, ute);
3777 throw ute;
3778 }
3779 return txn;
3780 }
3781
3782
3783
3784
3785
3786 private UnmarshalException brokenTxn(TransactionManager mgr, long id,
3787 Exception nested)
3788 throws UnmarshalException
3789 {
3790 final UnmarshalException ue = new UnmarshalException(
3791 "Outrigger has a transaction with this id(" + id + "), but can't" +
3792 "unmarshal its copy of manager to confirm it is the same " +
3793 "transaction",
3794 nested);
3795
3796 final String msg = "the unmarshalling/preparation failure of one " +
3797 "or more transaction managers has prevented outrigger from " +
3798 "processing an abort/prepare/commit for transaction [mgr:" +
3799 mgr + ", id:" + id + "]";
3800
3801 txnLogger.log(Level.INFO, msg, ue);
3802 throw ue;
3803 }
3804
3805
3806
3807
3808
3809
3810
3811
3812 private ServerTransaction serverTransaction(Transaction baseTr)
3813 throws UnknownTransactionException
3814 {
3815 try {
3816 return (ServerTransaction) baseTr;
3817 } catch (ClassCastException e) {
3818 final String msg = "unexpected transaction type:" +
3819 baseTr.getClass();
3820 final UnknownTransactionException ute =
3821 new UnknownTransactionException(msg);
3822 if (txnLogger.isLoggable(Levels.FAILED))
3823 txnLogger.log(Levels.FAILED, msg, ute);
3824 throw ute;
3825 }
3826 }
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873 public int prepare(TransactionManager mgr, long id)
3874 throws UnknownTransactionException, UnmarshalException
3875 {
3876
3877
3878
3879
3880 txnLogger.entering("OutriggerServerImpl", "prepare");
3881
3882 Txn txn = getTxn(mgr, id);
3883
3884
3885
3886
3887
3888 synchronized (txn){
3889 if (txn.getState() == PREPARED)
3890 return PREPARED;
3891 }
3892
3893
3894
3895 txn.makeInactive();
3896
3897 if (log != null)
3898 log.prepareOp(txn.getId(), txn);
3899
3900 int result = txn.prepare(this);
3901 if (result == NOTCHANGED || result == ABORTED) {
3902
3903
3904 if (log != null)
3905 log.abortOp(txn.getId());
3906
3907 txnTable.remove(mgr, id);
3908 }
3909 return result;
3910 }
3911
3912
3913 public void commit(TransactionManager mgr, long id)
3914 throws UnknownTransactionException, UnmarshalException
3915 {
3916
3917
3918 txnLogger.entering("OutriggerServerImpl", "commit");
3919
3920 Txn txn = getTxn(mgr, id);
3921
3922
3923
3924
3925 txn.makeInactive();
3926 try {
3927
3928 if (log != null)
3929 log.commitOp(txn.getId());
3930
3931 txn.commit(this);
3932 } finally {
3933 txnTable.remove(mgr, id);
3934 }
3935 }
3936
3937
3938 public void abort(TransactionManager mgr, long id)
3939 throws UnknownTransactionException, UnmarshalException
3940 {
3941 txnLogger.entering("OutriggerServerImpl", "abort");
3942
3943 Txn txn = getTxn(mgr, id);
3944
3945
3946
3947 txn.makeInactive();
3948 try {
3949 if (log != null)
3950 log.abortOp(txn.getId());
3951
3952 txn.abort(this);
3953 } finally {
3954 txnTable.remove(mgr, id);
3955 }
3956 }
3957
3958
3959 public int prepareAndCommit(TransactionManager mgr, long id)
3960 throws UnknownTransactionException, UnmarshalException
3961 {
3962 txnLogger.entering("OutriggerServerImpl", "prepareAndCommit");
3963 Txn txn = getTxn(mgr, id);
3964
3965
3966
3967
3968 txn.makeInactive();
3969
3970 int result = txn.prepare(this);
3971 if (result == PREPARED) {
3972
3973 if (log != null)
3974 log.commitOp(txn.getId());
3975
3976 txn.commit(this);
3977 result = COMMITTED;
3978 }
3979 txnTable.remove(mgr, id);
3980 return result;
3981 }
3982
3983
3984
3985
3986
3987
3988 ProxyPreparer getRecoveredTransactionManagerPreparer() {
3989 return recoveredTransactionManagerPreparer;
3990 }
3991
3992
3993
3994
3995
3996
3997
3998
3999
4000 private final void debug(Object obj, String str) {
4001 String name = obj.getClass().getName();
4002 int dollar = name.indexOf('$');
4003 if (dollar > 0)
4004 name = name.substring(dollar + 1);
4005 System.out.print(name);
4006 System.out.print(':');
4007 System.out.println(str);
4008 }
4009
4010
4011
4012
4013 private static Entry[] attributesFor() {
4014 final Entry info = new ServiceInfo("JavaSpace",
4015 "Sun Microsystems, Inc.", "Sun Microsystems, Inc.",
4016 org.apache.river.constants.VersionConstants.SERVER_VERSION, "", "");
4017
4018 final Entry type =
4019 new org.apache.river.lookup.entry.BasicServiceType("JavaSpace");
4020
4021 return new Entry[]{info, type};
4022 }
4023
4024
4025
4026
4027
4028
4029
4030 private static abstract class Reaper extends Thread {
4031 final private long interval;
4032 private volatile boolean dead = false;
4033
4034 private Reaper(String name, long interval) {
4035 super(name);
4036 this.interval = interval;
4037 }
4038
4039 public void run() {
4040 boolean goOn;
4041 synchronized(this) {
4042 goOn = !dead;
4043 }
4044
4045 while (goOn) {
4046 reap();
4047 synchronized(this) {
4048 try {
4049 wait(interval);
4050 } catch (InterruptedException e) {
4051 return;
4052 }
4053 goOn = !dead;
4054 }
4055 }
4056 }
4057
4058 abstract void reap();
4059
4060 private synchronized void kill() {
4061 dead = true;
4062 notifyAll();
4063 }
4064 }
4065
4066
4067 private class EntryReaper extends Reaper {
4068
4069
4070
4071
4072 private EntryReaper(long reapingInterval) {
4073 super("Entry Reaping Thread", reapingInterval);
4074 }
4075
4076 protected void reap() {
4077 contents.reap();
4078 }
4079 }
4080
4081
4082 private class TemplateReaper extends Reaper {
4083
4084
4085
4086
4087 private TemplateReaper(long reapingInterval) {
4088 super("Template Reaping Thread", reapingInterval);
4089 }
4090
4091 protected void reap() {
4092 templates.reap();
4093 }
4094 }
4095
4096
4097 private class ContentsQueryReaper extends Reaper {
4098
4099
4100
4101
4102 private ContentsQueryReaper(long reapingInterval) {
4103 super("Contents Query Reaping Thread", reapingInterval);
4104 }
4105
4106 protected void reap() {
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124
4125
4126
4127
4128
4129 final long now = System.currentTimeMillis();
4130 Iterator<LeasedResource> it = contentsQueries.values().iterator();
4131 while (it.hasNext()){
4132 final LeasedResource query = it.next();
4133 synchronized(query){
4134 if (query.getExpiration() <= now) {
4135 if (query instanceof ContentsQuery ) {
4136 ((ContentsQuery)query).cancel();
4137 }
4138 }
4139 }
4140 }
4141 }
4142 }
4143
4144
4145 private RuntimeException logAndThrow(RuntimeException e, Logger logger) {
4146 if (logger.isLoggable(Levels.FAILED))
4147 logger.log(Levels.FAILED, e.getMessage(), e);
4148 throw e;
4149 }
4150
4151
4152 private IllegalArgumentException logAndThrowIllegalArg(String msg) {
4153 final IllegalArgumentException e = new IllegalArgumentException(msg);
4154 throw (IllegalArgumentException)logAndThrow(e, opsLogger);
4155 }
4156
4157
4158 private UnknownLeaseException throwNewUnknownLeaseException(
4159 Object cookie)
4160 throws UnknownLeaseException
4161 {
4162 final UnknownLeaseException ule = new UnknownLeaseException();
4163 if (leaseLogger.isLoggable(Levels.FAILED)) {
4164 leaseLogger.log(Levels.FAILED, "unable to find lease for " +
4165 cookie, ule);
4166 }
4167
4168 throw ule;
4169 }
4170
4171
4172 private CannotJoinException throwNewCannotJoinException()
4173 throws CannotJoinException
4174 {
4175 final String msg = "transaction is not active";
4176 final CannotJoinException cje = new CannotJoinException(msg);
4177 txnLogger.log(Levels.FAILED, msg, cje);
4178 throw cje;
4179 }
4180
4181
4182 private NoSuchObjectException throwNewNoSuchObjectException(
4183 String msg, Logger logger)
4184 throws NoSuchObjectException
4185 {
4186 throw throwNewNoSuchObjectException(msg, null, logger);
4187 }
4188
4189
4190 private NoSuchObjectException throwNewNoSuchObjectException(
4191 String msg, Throwable t, Logger logger)
4192 throws NoSuchObjectException
4193 {
4194 final NoSuchObjectException nsoe = new net.jini.export.NonExistantObjectException(msg, t);
4195 logger.log(Levels.FAILED, msg, nsoe);
4196 throw nsoe;
4197 }
4198 }
4199