1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.hcc.util.jmx;
24
25 import java.io.Serializable;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import java.util.logging.Level;
34 import java.util.logging.Logger;
35
36 import javax.management.Notification;
37 import javax.management.NotificationFilter;
38 import javax.management.NotificationListener;
39
40 public class MBeanFutureTask implements
41 NotificationListener,
42 NotificationFilter,
43 Future,
44 Serializable {
45
46 private static Logger log = Logger.getLogger(MBeanFutureTask.class.getName());
47
48 private CountDownLatch latch;
49
50 private Notification notification = null;
51
52 private String name;
53
54 private Timer timer;
55
56 private boolean done = false;
57
58 public MBeanFutureTask(String name) {
59 this.latch = new CountDownLatch(1);
60 this.name = name;
61 }
62
63 public boolean isNotificationEnabled(Notification notification) {
64
65 return false;
66 }
67
68 public boolean isCancelled() {
69 return false;
70 }
71
72 public boolean isDone() {
73 return done;
74 }
75
76 public void handleNotification(Notification notification, Object handback) {
77 this.notification = notification;
78 this.latch.countDown();
79 if (log.isLoggable(Level.FINE)) {
80 log.fine("this.name=" + name + "; thread.name="
81 + Thread.currentThread().getName() + "; latch.count="
82 + latch.getCount() + "; notification.type="
83 + notification.getType());
84 }
85 }
86
87 public boolean cancel(boolean mayInterruptIfRunning) {
88 return false;
89 }
90
91 public Object get() throws InterruptedException, ExecutionException {
92 try {
93 await(-1, TimeUnit.MILLISECONDS);
94 return getData();
95 } catch (TimeoutException ex) {
96 throw new RuntimeException("This should never happen!");
97 }
98 }
99
100 private Object getData() throws ExecutionException {
101 return (this.notification.getUserData());
102 }
103
104 public Object get(long timeout, TimeUnit unit)
105 throws InterruptedException,
106 ExecutionException,
107 TimeoutException {
108 await(timeout, unit);
109 return getData();
110 }
111
112 private boolean await(long timeout, TimeUnit unit)
113 throws InterruptedException,
114 TimeoutException {
115 try {
116 if (log.isLoggable(Level.FINE)) {
117 log.fine("this.name=" + name + "; thread.name="
118 + Thread.currentThread().getName());
119 }
120
121 if (timer == null) {
122 timer = new Timer();
123 timer.schedule(new TimerTask() {
124 @Override
125 public void run() {
126 if (log.isLoggable(Level.FINE)) {
127 log.fine(name + ": waiting for notification");
128 }
129 }
130
131 }, 60 * 1000, 60 * 1000);
132 }
133
134 boolean executed = false;
135 if (timeout == -1) {
136 this.latch.await();
137 executed = true;
138 } else {
139 executed = this.latch.await(timeout, TimeUnit.MILLISECONDS);
140 }
141
142 done = true;
143 if (!executed) {
144 throw new TimeoutException(
145 "Notification was not received within the timeout period ("
146 + timeout + ")");
147 }
148
149 return executed;
150 } finally {
151 timer.cancel();
152 }
153 }
154 }