1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.river.outrigger.proxy;
19
20 import java.io.IOException;
21 import java.io.InvalidObjectException;
22 import java.io.ObjectInputStream;
23 import java.io.Serializable;
24 import java.rmi.MarshalException;
25 import java.rmi.MarshalledObject;
26 import java.rmi.RemoteException;
27 import java.security.PrivilegedAction;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36 import net.jini.admin.Administrable;
37 import net.jini.core.entry.Entry;
38 import net.jini.core.entry.UnusableEntryException;
39 import net.jini.core.event.EventRegistration;
40 import net.jini.core.event.RemoteEventListener;
41 import net.jini.core.lease.Lease;
42 import net.jini.core.transaction.Transaction;
43 import net.jini.core.transaction.TransactionException;
44 import net.jini.entry.UnusableEntriesException;
45 import net.jini.export.ProxyAccessor;
46 import net.jini.id.ReferentUuid;
47 import net.jini.id.ReferentUuids;
48 import net.jini.id.Uuid;
49 import net.jini.id.UuidFactory;
50 import net.jini.security.Security;
51 import net.jini.space.JavaSpace05;
52 import net.jini.space.MatchSet;
53 import org.apache.river.api.io.AtomicSerial;
54 import org.apache.river.api.io.AtomicSerial.GetArg;
55 import org.apache.river.landlord.LandlordLease;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 @AtomicSerial
74 public class SpaceProxy2 implements JavaSpace05, Administrable, ReferentUuid,
75 Serializable, ProxyAccessor
76 {
77 static final long serialVersionUID = 1L;
78
79
80
81
82
83
84 final OutriggerServer space;
85
86
87
88
89
90
91 final Uuid spaceUuid;
92
93
94
95
96
97
98
99 final long serverMaxServerQueryTimeout;
100
101
102
103
104 private transient volatile long maxServerQueryTimeout;
105
106
107
108
109
110
111
112 private static final long maxServerQueryTimeoutPropertyValue =
113 getMaxServerQueryTimeoutPropertyValue();
114
115
116
117
118
119
120 private static final Logger logger =
121 Logger.getLogger("org.apache.river.outrigger.proxy");
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 public SpaceProxy2(OutriggerServer space, Uuid spaceUuid,
143 long serverMaxServerQueryTimeout)
144 {
145 this (notNull(space),
146 notNull(spaceUuid),
147 serverMaxServerQueryTimeout,
148 setMaxServerQueryTimeout(serverMaxServerQueryTimeout));
149 }
150
151 private static <T> T notNull(T value){
152 if (value == null) throw new NullPointerException();
153 return value;
154 }
155
156 private SpaceProxy2( OutriggerServer space, Uuid spaceUuid,
157 long serverMaxServerQueryTimeout, long maxServerQueryTimeout ){
158 this.space = space;
159 this.spaceUuid = spaceUuid;
160 this.serverMaxServerQueryTimeout = serverMaxServerQueryTimeout;
161 this.maxServerQueryTimeout = maxServerQueryTimeout;
162 }
163
164 SpaceProxy2(GetArg arg) throws IOException {
165 this((OutriggerServer) arg.get("space", null),
166 (Uuid) arg.get("spaceUuid", null),
167 arg.get("serverMaxServerQueryTimeout", -1L));
168 }
169
170 @Override
171 public String toString() {
172 return getClass().getName() + " for " + spaceUuid +
173 " (through " + space + ")";
174 }
175
176
177 @Override
178 public boolean equals(Object other) {
179 return ReferentUuids.compare(this, other);
180 }
181
182
183 @Override
184 public int hashCode() {
185 return spaceUuid.hashCode();
186 }
187
188 public Uuid getReferentUuid() {
189 return spaceUuid;
190 }
191
192
193
194
195
196
197 private static long getMaxServerQueryTimeoutPropertyValue() {
198 try {
199 final String propValue =
200 (String)Security.doPrivileged(
201 new ReadProperityPrivilegedAction(
202 "org.apache.river.outrigger.maxServerQueryTimeout"));
203 if (propValue == null)
204 return -1;
205
206
207 return Long.parseLong(propValue);
208 } catch (Throwable t) {
209 return -1;
210 }
211 }
212
213 @Override
214 public Object getProxy() {
215 return space;
216 }
217
218
219
220
221 private static class ReadProperityPrivilegedAction
222 implements PrivilegedAction
223 {
224
225 final private String propName;
226
227
228
229
230
231 ReadProperityPrivilegedAction(String propName) {
232 this.propName = propName;
233 }
234
235 public Object run() {
236 try {
237 return System.getProperty(propName);
238 } catch (SecurityException e) {
239 return null;
240 }
241 }
242 }
243
244
245
246
247
248
249 private static long setMaxServerQueryTimeout(long serverMaxServerQueryTimeout) {
250 if (serverMaxServerQueryTimeout <= 0)
251 throw new
252 IllegalArgumentException("serverMaxServerQueryTimeout " +
253 "must be positive");
254
255
256
257
258 long maxServerQueryTimeout;
259 if (maxServerQueryTimeoutPropertyValue > 0)
260 maxServerQueryTimeout = maxServerQueryTimeoutPropertyValue;
261 else if (serverMaxServerQueryTimeout > 0)
262 maxServerQueryTimeout = serverMaxServerQueryTimeout;
263 else
264
265
266
267
268 throw new
269 AssertionError("serverMaxServerQueryTimeout invalid:" +
270 serverMaxServerQueryTimeout);
271
272 if (logger.isLoggable(Level.CONFIG)) {
273 logger.log(Level.CONFIG,
274 "Outrigger proxy using {0} ms for maxServerQueryTimeout",
275 Long.valueOf(maxServerQueryTimeout));
276 }
277 return maxServerQueryTimeout;
278 }
279
280
281
282
283
284 private void readObject(ObjectInputStream in)
285 throws IOException, ClassNotFoundException
286 {
287 in.defaultReadObject();
288
289 if (space == null)
290 throw new InvalidObjectException("null server reference");
291
292 if (spaceUuid == null)
293 throw new InvalidObjectException("null Uuid");
294
295 if (serverMaxServerQueryTimeout <= 0)
296 throw new
297 InvalidObjectException("Bad serverMaxServerQueryTimeout " +
298 "value:" + serverMaxServerQueryTimeout);
299
300 maxServerQueryTimeout = setMaxServerQueryTimeout(serverMaxServerQueryTimeout);
301 }
302
303
304
305
306
307 private void readObjectNoData() throws InvalidObjectException {
308 throw new
309 InvalidObjectException("SpaceProxy2 should always have data");
310 }
311
312
313
314
315
316
317 public Lease write(Entry entry, Transaction txn, long lease)
318 throws TransactionException, RemoteException
319 {
320 if (entry == null)
321 throw new NullPointerException("Cannot write null Entry");
322 long[] leaseData = space.write(repFor(entry), txn, lease);
323 if (leaseData == null || leaseData.length != 3){
324 StringBuilder sb = new StringBuilder(180);
325 sb.append("space.write returned malformed data \n");
326 int l = leaseData == null? 0 : leaseData.length;
327 for (int i =0; i < l; i++){
328 sb.append(leaseData[i]).append("\n");
329 }
330 throw new AssertionError(sb);
331 }
332 return newLease(UuidFactory.create(leaseData[1], leaseData[2]),
333 leaseData[0]);
334 }
335
336 public Entry read(Entry tmpl, Transaction txn, long timeout)
337 throws UnusableEntryException, TransactionException,
338 InterruptedException, RemoteException
339 {
340
341 final long endTime = calcEndTime(timeout);
342
343 long remaining = timeout;
344 OutriggerServer.QueryCookie queryCookie = null;
345
346
347 do {
348 final long serverTimeout =
349 Math.min(remaining, maxServerQueryTimeout);
350 logQuery("read", serverTimeout, queryCookie, remaining);
351
352 final Object rslt =
353 space.read(repFor(tmpl), txn, serverTimeout, queryCookie);
354 if (rslt == null) {
355
356 throw new AssertionError("space.read() returned null");
357 } else if (rslt instanceof EntryRep) {
358
359 return entryFrom((EntryRep)rslt);
360 } else if (rslt instanceof OutriggerServer.QueryCookie) {
361
362
363
364 queryCookie = (OutriggerServer.QueryCookie)rslt;
365 } else {
366 throw new AssertionError(
367 "Unexpected return type from space.read()");
368 }
369
370
371
372
373 remaining = endTime - System.currentTimeMillis();
374 } while (remaining > 0);
375
376
377
378
379 return null;
380 }
381
382
383 public Entry readIfExists(Entry tmpl, Transaction txn, long timeout)
384 throws UnusableEntryException, TransactionException,
385 InterruptedException, RemoteException
386 {
387
388 final long endTime = calcEndTime(timeout);
389
390 long remaining = timeout;
391 OutriggerServer.QueryCookie queryCookie = null;
392
393
394 do {
395 final long serverTimeout =
396 Math.min(remaining, maxServerQueryTimeout);
397 logQuery("readIfExists", serverTimeout, queryCookie, remaining);
398
399 final Object rslt =
400 space.readIfExists(repFor(tmpl), txn, serverTimeout,
401 queryCookie);
402 if (rslt == null) {
403
404 return null;
405 } else if (rslt instanceof EntryRep) {
406
407 return entryFrom((EntryRep)rslt);
408 } else if (rslt instanceof OutriggerServer.QueryCookie) {
409
410
411
412 queryCookie = (OutriggerServer.QueryCookie)rslt;
413 } else {
414 throw new AssertionError(
415 "Unexpected return type from space.readIfExists()");
416 }
417
418
419
420
421 remaining = endTime - System.currentTimeMillis();
422 } while (remaining > 0);
423
424
425
426
427 return null;
428 }
429
430
431 public Entry take(Entry tmpl, Transaction txn, long timeout)
432 throws UnusableEntryException, TransactionException,
433 InterruptedException, RemoteException
434 {
435
436 final long endTime = calcEndTime(timeout);
437
438 long remaining = timeout;
439 OutriggerServer.QueryCookie queryCookie = null;
440
441
442 do {
443 final long serverTimeout =
444 Math.min(remaining, maxServerQueryTimeout);
445 logQuery("take", serverTimeout, queryCookie, remaining);
446
447 final Object rslt =
448 space.take(repFor(tmpl), txn, serverTimeout, queryCookie);
449 if (rslt == null) {
450
451 throw new AssertionError("space.take() returned null");
452 } else if (rslt instanceof EntryRep) {
453
454 return entryFrom((EntryRep)rslt);
455 } else if (rslt instanceof OutriggerServer.QueryCookie) {
456
457
458
459 queryCookie = (OutriggerServer.QueryCookie)rslt;
460 } else {
461 throw new AssertionError(
462 "Unexpected return type from space.take()");
463 }
464
465
466
467
468 remaining = endTime - System.currentTimeMillis();
469 } while (remaining > 0);
470
471
472
473
474 return null;
475 }
476
477
478 public Entry takeIfExists(Entry tmpl, Transaction txn, long timeout)
479 throws UnusableEntryException, TransactionException,
480 InterruptedException, RemoteException
481 {
482
483 final long endTime = calcEndTime(timeout);
484
485 long remaining = timeout;
486 OutriggerServer.QueryCookie queryCookie = null;
487
488
489 do {
490 final long serverTimeout =
491 Math.min(remaining, maxServerQueryTimeout);
492 logQuery("takeIfExists", serverTimeout, queryCookie, remaining);
493
494 final Object rslt =
495 space.takeIfExists(repFor(tmpl), txn, serverTimeout,
496 queryCookie);
497 if (rslt == null) {
498
499 return null;
500 } else if (rslt instanceof EntryRep) {
501
502 return entryFrom((EntryRep)rslt);
503 } else if (rslt instanceof OutriggerServer.QueryCookie) {
504
505
506
507 queryCookie = (OutriggerServer.QueryCookie)rslt;
508 } else {
509 throw new AssertionError(
510 "Unexpected return type from space.takeIfExists()");
511 }
512
513
514
515
516 remaining = endTime - System.currentTimeMillis();
517 } while (remaining > 0);
518
519
520
521
522 return null;
523 }
524
525
526 public Entry snapshot(Entry entry) throws MarshalException {
527 if (entry == null)
528 return null;
529 else
530 return new SnapshotRep(entry);
531 }
532
533
534 public EventRegistration
535 notify(Entry tmpl, Transaction txn, RemoteEventListener listener,
536 long lease, MarshalledObject handback)
537 throws TransactionException, RemoteException
538 {
539 return space.notify(repFor(tmpl), txn, listener, lease, handback);
540 }
541
542 public List write(List entries, Transaction txn, List leaseDurations)
543 throws RemoteException, TransactionException
544 {
545 final long[] leases = new long[leaseDurations.size()];
546 int j = 0;
547 for (Iterator i=leaseDurations.iterator(); i.hasNext(); ) {
548 final Object l = i.next();
549
550 if (l == null)
551 throw new NullPointerException(
552 "leaseDurations contatins a null element");
553
554 if (!(l instanceof Long))
555 throw new IllegalArgumentException(
556 "leaseDurations contatins an element which is not a Long");
557
558 leases[j++] = ((Long)l).longValue();
559 }
560
561 long[] leaseData = space.write(repFor(entries, "entries"), txn, leases);
562 if (leaseData == null)
563 throw new AssertionError("space.write<multiple> returned null");
564
565 final List rslt = new ArrayList(leaseData.length/3);
566 try {
567 int m=0;
568 while (m<leaseData.length) {
569 final long duration = leaseData[m++];
570 final long high = leaseData[m++];
571 final long low = leaseData[m++];
572 final Uuid uuid = UuidFactory.create(high, low);
573 rslt.add(newLease(uuid, duration));
574 }
575 } catch (ArrayIndexOutOfBoundsException e) {
576 throw new
577 AssertionError("space.write<multiple> returned malformed data");
578 }
579
580 return rslt;
581 }
582
583 public Collection take(Collection tmpls, Transaction txn,
584 long timeout, long maxEntries)
585 throws UnusableEntriesException, TransactionException, RemoteException
586 {
587
588 final long endTime = calcEndTime(timeout);
589
590 long remaining = timeout;
591 OutriggerServer.QueryCookie queryCookie = null;
592 final EntryRep[] treps = repFor(tmpls, "tmpls");
593
594 final int limit;
595 if (maxEntries < 1) {
596 throw new IllegalArgumentException("maxEntries must be positive");
597 } else if (maxEntries <= Integer.MAX_VALUE) {
598 limit = (int)maxEntries;
599 } else {
600 limit = Integer.MAX_VALUE;
601 }
602
603
604 do {
605 final long serverTimeout =
606 Math.min(remaining, maxServerQueryTimeout);
607 logQuery("take(multiple)", serverTimeout, queryCookie, remaining);
608
609
610 final Object rslt =
611 space.take(treps, txn, serverTimeout, limit, queryCookie);
612 if (rslt == null) {
613
614 throw new AssertionError("space.take<multiple>() returned null");
615 } else if (rslt instanceof EntryRep[]) {
616 EntryRep[] reps = (EntryRep[])rslt;
617
618 final Collection entries = new LinkedList();
619 Collection exceptions = null;
620
621 for (int i=0,l=reps.length; i<l; i++) {
622 try {
623 entries.add(entryFrom(reps[i]));
624 } catch (UnusableEntryException e) {
625 if (exceptions == null)
626 exceptions = new LinkedList();
627
628 exceptions.add(e);
629 }
630 }
631
632 if (exceptions == null) {
633 return entries;
634 } else {
635 throw new UnusableEntriesException(
636 "some of the removed entries could not be unmarshalled",
637 entries, exceptions);
638 }
639 } else if (rslt instanceof OutriggerServer.QueryCookie) {
640
641
642
643 queryCookie = (OutriggerServer.QueryCookie)rslt;
644 } else {
645 throw new AssertionError(
646 "Unexpected return type from space.take<multiple>()");
647 }
648
649
650
651
652 remaining = endTime - System.currentTimeMillis();
653 } while (remaining > 0);
654
655
656
657
658 return Collections.EMPTY_LIST;
659 }
660
661 public EventRegistration
662 registerForAvailabilityEvent(Collection tmpls,
663 Transaction txn,
664 boolean visibilityOnly,
665 RemoteEventListener listener,
666 long leaseDuration,
667 MarshalledObject handback)
668 throws TransactionException, RemoteException
669 {
670 return space.registerForAvailabilityEvent(
671 repFor(tmpls, "tmpls"), txn, visibilityOnly, listener,
672 leaseDuration, handback);
673 }
674
675
676
677 public MatchSet contents(Collection tmpls,
678 Transaction txn,
679 long leaseDuration,
680 long maxEntries)
681 throws RemoteException, TransactionException
682 {
683 final MatchSetData msd =
684 space.contents(repFor(tmpls, "tmpls"), txn, leaseDuration, maxEntries);
685 return new MatchSetProxy(msd, this, space);
686 }
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701 final protected Lease newLease(Uuid uuid, long duration) {
702 long expiration = duration + System.currentTimeMillis();
703
704
705
706 if (expiration < 0)
707 expiration = Long.MAX_VALUE;
708 return constructLease(uuid, expiration);
709 }
710
711
712
713
714
715
716 protected Lease constructLease(Uuid uuid, long expiration) {
717 return new LandlordLease(uuid, space, spaceUuid, expiration);
718 }
719
720
721
722
723
724 public Object getAdmin() throws RemoteException {
725 return space.getAdmin();
726 }
727
728
729
730
731
732
733
734
735
736
737
738
739 private long calcEndTime(long timeout) {
740 if (timeout < 0)
741 throw new IllegalArgumentException("timeout must be non-negative");
742 final long now = System.currentTimeMillis();
743 if (Long.MAX_VALUE - timeout <= now)
744 return Long.MAX_VALUE;
745 else
746 return now + timeout;
747 }
748
749 static EntryRep[] repFor(Collection entries, String argName)
750 throws MarshalException
751 {
752 final EntryRep[] reps = new EntryRep[entries.size()];
753 int j = 0;
754 for (Iterator i=entries.iterator(); i.hasNext(); ) {
755 final Object e = i.next();
756 if (!(e == null || e instanceof Entry))
757 throw new IllegalArgumentException(
758 argName + " contatins an element which is not an Entry");
759
760 reps[j++] = repFor((Entry)e);
761 }
762
763 return reps;
764 }
765
766
767
768
769
770 static EntryRep repFor(Entry entry) throws MarshalException {
771 if (entry == null)
772 return null;
773 if (entry instanceof SnapshotRep)
774 return ((SnapshotRep) entry).rep();
775 return new EntryRep(entry);
776 }
777
778
779
780
781 static Entry entryFrom(EntryRep rep) throws UnusableEntryException {
782 if (rep == null)
783 return null;
784 return rep.entry();
785 }
786
787
788 private void logQuery(String op, long serverTimeout,
789 OutriggerServer.QueryCookie cookie, long remaining)
790 {
791 if (logger.isLoggable(Level.FINER)) {
792 logger.log(Level.FINER, "Outrigger calling {0} on server with " +
793 "timeout of {1} ms for serverTimeout, using QueryCookie " +
794 "{2}, {3} ms remaining on query",
795 new Object[] {op, Long.valueOf(serverTimeout), cookie,
796 Long.valueOf(remaining)});
797 }
798 }
799 }