1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.archive.hcc.client;
25
26 import java.io.IOException;
27 import java.net.InetSocketAddress;
28 import java.security.InvalidParameterException;
29 import java.util.Collection;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.logging.Level;
37 import java.util.logging.Logger;
38
39 import javax.management.InstanceNotFoundException;
40 import javax.management.MBeanException;
41 import javax.management.MBeanServer;
42 import javax.management.MBeanServerConnection;
43 import javax.management.MBeanServerFactory;
44 import javax.management.MalformedObjectNameException;
45 import javax.management.Notification;
46 import javax.management.ObjectName;
47 import javax.management.ReflectionException;
48 import javax.management.openmbean.CompositeData;
49 import javax.management.openmbean.SimpleType;
50 import javax.naming.InsufficientResourcesException;
51 import javax.swing.event.EventListenerList;
52
53 import org.archive.hcc.util.ClusterControllerNotification;
54 import org.archive.hcc.util.NotificationDelegatableBase;
55 import org.archive.hcc.util.NotificationDelegator;
56 import org.archive.hcc.util.jmx.MBeanServerConnectionFactory;
57 import org.archive.util.JmxUtils;
58
59 /***
60 * As the workhorse of the cluster controller client, this class is responsible
61 * for connecting to the local or remote <code>ClusterControllerBean</code> via
62 * its <code>DynamicMBean</code> interface. It hides all the details of
63 * connecting to the remote MBean, ie invocations and notifications.
64 * @author Daniel Bernstein (dbernstein@archive.org)
65 */
66 class ClusterControllerClientImpl implements ClusterControllerClient{
67
68 private static final Logger log = Logger.getLogger(
69 ClusterControllerClientImpl.class.getName());
70
71 private MBeanServerConnection connection;
72
73 private ObjectName name;
74
75 private NotificationDelegator notificationDelegator;
76
77 private EventListenerList listenerList;
78
79 /***
80 * Constructs a client running on a remote machine.
81 *
82 * @param address
83 * @throws InstanceNotFoundException
84 * @throws IOException
85 */
86 ClusterControllerClientImpl(InetSocketAddress address)
87 throws InstanceNotFoundException,
88 IOException {
89 init(MBeanServerConnectionFactory.createConnection(address));
90 }
91
92 private void init(MBeanServerConnection connection)
93 throws InstanceNotFoundException {
94 try {
95 this.connection = connection;
96 ObjectName query = new ObjectName(
97 "org.archive.hcc:type=ClusterControllerBean,*");
98 Set<ObjectName> names = connection.queryNames(query, null);
99 if (names.size() < 1) {
100 throw new InstanceNotFoundException(
101 "no mbean found matching query:" + query);
102 }
103 this.name = names.iterator().next();
104 this.notificationDelegator = createNotificationDelegator();
105 this.connection.addNotificationListener(
106 this.name,
107 this.notificationDelegator,
108 null,
109 new Object());
110 this.listenerList = new EventListenerList();
111 } catch (Exception e) {
112 e.printStackTrace();
113 throw new RuntimeException(e);
114 }
115 }
116
117 /***
118 * Creates a local instance of the ClusterControllerBean and attaches to it.
119 */
120 ClusterControllerClientImpl() {
121 try {
122 init(createMBeanServer());
123 } catch (InstanceNotFoundException e) {
124 throw new RuntimeException(e);
125 }
126 }
127
128 public void addCrawlerLifecycleListener(CrawlerLifecycleListener l) {
129 this.listenerList.add(CrawlerLifecycleListener.class, l);
130 }
131
132 public void removeCrawlerLifecycleListener(CrawlerLifecycleListener l) {
133 this.listenerList.remove(CrawlerLifecycleListener.class, l);
134 }
135
136 public void addCrawlJobListener(CurrentCrawlJobListener l) {
137 this.listenerList.add(CurrentCrawlJobListener.class, l);
138 }
139
140 public void removeCrawlJobListener(CurrentCrawlJobListener l) {
141 this.listenerList.remove(CurrentCrawlJobListener.class, l);
142 }
143
144 private NotificationDelegator createNotificationDelegator() {
145 NotificationDelegator d = new NotificationDelegator();
146 d.addDelegatable(new NotificationDelegatableBase() {
147 protected boolean delegate(Notification n, Object handbac) {
148 if (n
149 .getType()
150 .equals(
151 ClusterControllerNotification.
152 CRAWL_SERVICE_CREATED_NOTIFICATION.getKey())) {
153
154 if (log.isLoggable(Level.FINE)) {
155 log.fine("crawler service created: " + n.getUserData());
156 }
157 handleCrawlServiceCreated((ObjectName) n.getUserData());
158 return true;
159 }
160 return false;
161 }
162 });
163
164 d.addDelegatable(new NotificationDelegatableBase() {
165 protected boolean delegate(Notification n, Object handbac) {
166 if (n
167 .getType()
168 .equals(
169 ClusterControllerNotification.
170 CRAWL_SERVICE_DESTROYED_NOTIFICATION.getKey())) {
171
172 if (log.isLoggable(Level.INFO)) {
173 log.info("crawler service destroyed: "
174 + n.getUserData());
175 }
176
177 handleCrawlServiceDestroyed((ObjectName) n.getUserData());
178 return true;
179 }
180 return false;
181 }
182 });
183
184 d.addDelegatable(new NotificationDelegatableBase() {
185 protected boolean delegate(Notification n, Object handbac) {
186 if (n.getType().equals(
187 ClusterControllerNotification.
188 CRAWL_SERVICE_JOB_STARTED_NOTIFICATION.getKey())){
189 handleCrawlServiceJobStarted((ObjectName) n.getUserData());
190 return true;
191 }
192 return false;
193 }
194 });
195
196 d.addDelegatable(new NotificationDelegatableBase() {
197 protected boolean delegate(Notification n, Object handbac) {
198 if (n.getType().equals(
199 ClusterControllerNotification.
200 CRAWL_SERVICE_JOB_COMPLETED_NOTIFICATION.getKey())){
201 handleCrawlServiceJobCompleted((ObjectName)n.getUserData());
202 return true;
203 }
204 return false;
205 }
206 });
207
208 d.addDelegatable(new NotificationDelegatableBase() {
209 protected boolean delegate(Notification n, Object handbac) {
210 if (n.getType().equals("progressStatistics")) {
211 ObjectName source = (ObjectName) n.getSource();
212 if (source.getKeyProperty(JmxUtils.TYPE).equals(
213 JmxUtils.JOB)) {
214 Map statistics = toMap(n.getUserData());
215 handleCrawlServiceJobAttributesChanged(
216 source,
217 statistics,
218 statistics);
219 return true;
220 }
221 }
222 return false;
223 }
224 });
225
226 d.addDelegatable(new NotificationDelegatableBase() {
227 protected boolean delegate(Notification n, Object handbac) {
228 if (n.getType().equals("crawlResuming")) {
229 ObjectName source = (ObjectName) n.getSource();
230 try {
231 fireCrawlJobResumed(new CurrentCrawlJobImpl(
232 source,
233 findCrawlJobParentInternal(
234 JmxUtils.getUid(source),
235 JmxUtils.extractAddress(source)),
236 connection));
237 } catch (ClusterException e) {
238 e.printStackTrace();
239 }
240 return true;
241 }
242 return false;
243 }
244 });
245
246 d.addDelegatable(new NotificationDelegatableBase() {
247 protected boolean delegate(Notification n, Object handbac) {
248 if (n.getType().equals("crawlPaused")) {
249 ObjectName source = (ObjectName) n.getSource();
250 try {
251 fireCrawlJobPaused(new CurrentCrawlJobImpl(
252 source,
253 findCrawlJobParentInternal(
254 JmxUtils.getUid(source),
255 JmxUtils.extractAddress(source)),
256 connection));
257 return true;
258 } catch (ClusterException e) {
259 e.printStackTrace();
260 }
261 }
262 return false;
263 }
264 });
265
266
267 d.addDelegatable(new NotificationDelegatableBase() {
268 protected boolean delegate(Notification n, Object handbac) {
269 if (n.getType().equals("crawlEnding")) {
270 ObjectName source = (ObjectName) n.getSource();
271 try {
272 fireCrawlJobStopping(createCurrentCrawlJob(source, connection));
273 return true;
274 } catch (ClusterException e) {
275 e.printStackTrace();
276 }
277 }
278 return false;
279 }
280 });
281 return d;
282 }
283
284 private CurrentCrawlJobImpl createCurrentCrawlJob(
285 ObjectName job,
286 MBeanServerConnection connection)
287 throws ClusterException {
288 return new CurrentCrawlJobImpl(
289 job,
290 findCrawlJobParentInternal(
291 JmxUtils.getUid(job),
292 org.archive.hcc.util.JmxUtils.extractRemoteAddress(job)),
293 connection);
294 }
295
296 private static Map toMap(Object object) {
297 if (object instanceof Map) {
298 return (Map) object;
299 } else if (object instanceof CompositeData) {
300 CompositeData cd = (CompositeData) object;
301 String[] keys = new String[] { "deepestUri", "downloadedUriCount",
302 "queuedUriCount", "docsPerSecond", "freeMemory",
303 "downloadFailures", "totalKBPerSec", "totalMemory",
304 "currentKBPerSec", "currentDocsPerSecond", "busyThreads",
305 "averageDepth", "discoveredUriCount", "congestionRatio","totalProcessedBytes" };
306 Map map = new HashMap();
307
308 Object[] values = cd.getAll(keys);
309 for (int i = 0; i < keys.length; i++) {
310 map.put(keys[i], values[i]);
311 }
312
313 return map;
314 } else {
315 throw new InvalidParameterException("unable to convert to map: "
316 + object.getClass());
317 }
318 }
319
320 private void handleCrawlServiceJobAttributesChanged(
321 ObjectName source,
322 Map oldValue,
323 Map newValue) {
324 try {
325 CurrentCrawlJobImpl cj = createCurrentCrawlJob(
326 source,this.connection);
327 CurrentCrawlJobListener[] listener = this.listenerList
328 .getListeners(CurrentCrawlJobListener.class);
329 for (int i = 0; i < listener.length; i++) {
330 listener[i].statisticsChanged(cj, newValue);
331 }
332 } catch (ClusterException e) {
333
334 e.printStackTrace();
335 }
336 }
337
338 private void handleCrawlServiceCreated(ObjectName crawlerName) {
339 Crawler c = new CrawlerImpl(crawlerName, this.connection);
340 fireCrawlerCreated(c);
341 }
342
343 private void handleCrawlServiceDestroyed(ObjectName crawlerName) {
344 Crawler c = new CrawlerImpl(crawlerName, this.connection);
345 fireCrawlerDestroyed(c);
346 }
347
348 private void handleCrawlServiceJobStarted(ObjectName crawlJob) {
349 try {
350 CurrentCrawlJob job = createCurrentCrawlJob(
351 crawlJob,this.connection);
352 fireCrawlJobStarted(job);
353 } catch (ClusterException e) {
354
355 e.printStackTrace();
356 }
357 }
358
359
360
361 private void handleCrawlServiceJobCompleted(ObjectName crawlJob) {
362 try {
363 CurrentCrawlJob job = createCurrentCrawlJob(
364 crawlJob,this.connection);
365 fireCrawlJobCompleted(job);
366
367 } catch (ClusterException e) {
368 e.printStackTrace();
369 }
370 }
371
372 private void fireCrawlerCreated(Crawler crawler) {
373 CrawlerLifecycleListener[] listener = this.listenerList
374 .getListeners(CrawlerLifecycleListener.class);
375 for (int i = 0; i < listener.length; i++) {
376 listener[i].crawlerCreated(crawler);
377 }
378 }
379
380 private void fireCrawlerDestroyed(Crawler crawler) {
381 CrawlerLifecycleListener[] listener = this.listenerList
382 .getListeners(CrawlerLifecycleListener.class);
383 for (int i = 0; i < listener.length; i++) {
384 listener[i].crawlerDestroyed(crawler);
385 }
386 }
387
388 private void fireCrawlJobStarted(CurrentCrawlJob job) {
389 CurrentCrawlJobListener[] listener = this.listenerList
390 .getListeners(CurrentCrawlJobListener.class);
391 for (int i = 0; i < listener.length; i++) {
392 listener[i].crawlJobStarted(job);
393 }
394 }
395
396 private void fireCrawlJobPaused(CurrentCrawlJob job) {
397 CurrentCrawlJobListener[] listener = this.listenerList
398 .getListeners(CurrentCrawlJobListener.class);
399 for (int i = 0; i < listener.length; i++) {
400 listener[i].crawlJobPaused(job);
401 }
402 }
403
404 private void fireCrawlJobStopping(CurrentCrawlJob job) {
405 CurrentCrawlJobListener[] listener = this.listenerList
406 .getListeners(CurrentCrawlJobListener.class);
407 for (int i = 0; i < listener.length; i++) {
408 listener[i].crawlJobStopping(job);
409 }
410 }
411
412 private void fireCrawlJobResumed(CurrentCrawlJob job) {
413 CurrentCrawlJobListener[] listener = this.listenerList
414 .getListeners(CurrentCrawlJobListener.class);
415 for (int i = 0; i < listener.length; i++) {
416 listener[i].crawlJobResumed(job);
417 }
418 }
419
420 private void fireCrawlJobCompleted(CurrentCrawlJob job) {
421 CurrentCrawlJobListener[] listener = this.listenerList
422 .getListeners(CurrentCrawlJobListener.class);
423 CompletedCrawlJobImpl cj =
424 new CompletedCrawlJobImpl(
425 job.getUid(),
426 job.getJobName(),
427 (CrawlerImpl)job.getMother(),
428 this.connection);
429
430 for (int i = 0; i < listener.length; i++) {
431 listener[i].crawlJobCompleted(cj);
432 }
433 }
434
435
436 public void destroyAllCrawlers() throws ClusterException{
437 try {
438 ObjectName parent = (ObjectName) this.connection.invoke(
439 this.name,
440 "destroyAllCrawlers",
441 new Object[] {},
442 new String[] {});
443
444
445 } catch (Exception e) {
446 e.printStackTrace();
447 throw new ClusterException(e);
448 }
449 }
450
451 public boolean pauseAllJobs() throws ClusterException {
452 try {
453 return (Boolean) this.connection.invoke(
454 this.name,
455 "pauseAllJobs",
456 new Object[] {},
457 new String[] {});
458
459
460 } catch (Exception e) {
461 e.printStackTrace();
462 throw new ClusterException(e);
463 }
464 }
465
466 public boolean resumeAllPausedJobs() throws ClusterException {
467 try {
468 return (Boolean) this.connection.invoke(
469 this.name,
470 "resumeAllPausedJobs",
471 new Object[] {},
472 new String[] {});
473
474
475 } catch (Exception e) {
476 e.printStackTrace();
477 throw new ClusterException(e);
478 }
479 }
480
481 public Crawler findCrawlJobParent(String uid, InetSocketAddress address)
482 throws ClusterException {
483 return findCrawlJobParentInternal(uid, address);
484 }
485
486
487 public CrawlerImpl findCrawlJobParentInternal(String uid, InetSocketAddress address)
488 throws ClusterException {
489 try {
490 ObjectName parent = (ObjectName) this.connection.invoke(
491 this.name,
492 "findCrawlServiceJobParent",
493 new Object[] { uid, address.getHostName(),
494 new Integer(address.getPort()) },
495 new String[] { "java.lang.String", "java.lang.String",
496 "java.lang.Integer" });
497
498 if (parent != null) {
499 return new CrawlerImpl(parent, this.connection);
500 }
501
502 return null;
503 } catch (Exception e) {
504 e.printStackTrace();
505 throw new ClusterException(e);
506 }
507
508 }
509
510 private MBeanServer createMBeanServer() {
511 MBeanServer result = null;
512 List servers = MBeanServerFactory.findMBeanServer(null);
513 if (servers == null) {
514 return result;
515 }
516 for (Iterator i = servers.iterator(); i.hasNext();) {
517 MBeanServer server = (MBeanServer) i.next();
518 if (server == null) {
519 continue;
520 }
521 result = server;
522 break;
523 }
524 return result;
525 }
526
527 public Crawler createCrawler() throws
528 InsufficientCrawlingResourcesException, ClusterException {
529 try {
530 ObjectName crawler = (ObjectName) this.connection.invoke(
531 this.name,
532 "createCrawler",
533 new Object[0],
534 new String[0]);
535 return new CrawlerImpl(crawler, this.connection);
536 }catch (MBeanException e) {
537 if("insufficent crawler resources".equals(e.getMessage())){
538 throw new InsufficientCrawlingResourcesException(e);
539 }
540 e.printStackTrace();
541 throw new ClusterException(e);
542 }
543
544 catch (Exception e) {
545 e.printStackTrace();
546 throw new ClusterException(e);
547 }
548 }
549
550 public Collection<Crawler> listCrawlers() throws ClusterException {
551 try {
552 ObjectName[] crawlers = (ObjectName[]) this.connection.invoke(
553 this.name,
554 "listCrawlers",
555 new Object[0],
556 new String[0]);
557 Collection<Crawler> crawlerList = new LinkedList<Crawler>();
558 for (int i = 0; i < crawlers.length; i++) {
559 crawlerList.add(new CrawlerImpl(crawlers[i], this.connection));
560 }
561
562 return crawlerList;
563 } catch (Exception e) {
564 e.printStackTrace();
565 throw new ClusterException(e);
566 }
567
568 }
569
570 public void destroy() {
571 try {
572 this.connection.invoke(
573 this.name,
574 "destroy",
575 new Object[0],
576 new String[0]);
577 } catch (InstanceNotFoundException e) {
578
579 e.printStackTrace();
580 } catch (MBeanException e) {
581
582 e.printStackTrace();
583 } catch (ReflectionException e) {
584
585 e.printStackTrace();
586 } catch (IOException e) {
587
588 e.printStackTrace();
589 }
590 }
591
592
593
594
595 public CurrentCrawlJob getCurrentCrawlJob(Crawler crawler) throws ClusterException {
596 try {
597 ObjectName currentCrawlJob = (ObjectName) this.connection.invoke(
598 this.name,
599 "getCurrentCrawlJob",
600 new Object[]{crawler.getName()},
601 new String[]{SimpleType.OBJECTNAME.getClassName()});
602
603 if(currentCrawlJob == null){
604 return null;
605 }
606
607 return new CurrentCrawlJobImpl(currentCrawlJob, (CrawlerImpl)crawler, this.connection);
608 }catch (Exception e) {
609 e.printStackTrace();
610 throw new ClusterException(e);
611 }
612 }
613
614 /***
615 * Returns the maximum number of instances allowed for this container.
616 * If the container does not exist, -1 is returned.
617 * @param hostname
618 * @param port
619 * @return
620 */
621 public int getMaxInstances(String hostname, int port) throws ClusterException{
622 try {
623 Integer maxInstances = (Integer) this.connection.invoke(
624 this.name,
625 "getMaxInstances",
626 new Object[]{hostname, new Integer(port)},
627 new String[]{SimpleType.STRING.getClassName(),
628 SimpleType.INTEGER.getClassName()});
629 return maxInstances;
630
631 }catch (Exception e) {
632 e.printStackTrace();
633 throw new ClusterException(e);
634 }
635 }
636
637 /***
638 * Sets the maximum number of instances that may run on a
639 * specified container defined by a host and port.
640 * @param hostname
641 * @param port
642 * @param maxInstances
643 */
644 public void setMaxInstances(String hostname, int port, int maxInstances)
645 throws ClusterException{
646 try {
647 this.connection.invoke(
648 this.name,
649 "setMaxInstances",
650 new Object[]{
651 hostname,
652 new Integer(port),
653 new Integer(maxInstances)},
654 new String[]{
655 SimpleType.STRING.getClassName(),
656 SimpleType.INTEGER.getClassName(),
657 SimpleType.INTEGER.getClassName()});
658
659
660 }catch (Exception e) {
661 e.printStackTrace();
662 throw new ClusterException(e);
663 }
664 }
665 }