1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.hcc;
24
25 import java.io.IOException;
26 import java.lang.reflect.Proxy;
27 import java.net.InetSocketAddress;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.Comparator;
31 import java.util.Date;
32 import java.util.HashMap;
33 import java.util.Hashtable;
34 import java.util.Iterator;
35 import java.util.LinkedList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Properties;
39 import java.util.Set;
40 import java.util.Timer;
41 import java.util.TimerTask;
42 import java.util.concurrent.TimeUnit;
43 import java.util.logging.Level;
44 import java.util.logging.Logger;
45
46 import javax.management.Attribute;
47 import javax.management.AttributeList;
48 import javax.management.AttributeNotFoundException;
49 import javax.management.DynamicMBean;
50 import javax.management.InstanceAlreadyExistsException;
51 import javax.management.InstanceNotFoundException;
52 import javax.management.InvalidAttributeValueException;
53 import javax.management.ListenerNotFoundException;
54 import javax.management.MBeanException;
55 import javax.management.MBeanInfo;
56 import javax.management.MBeanNotificationInfo;
57 import javax.management.MBeanRegistration;
58 import javax.management.MBeanRegistrationException;
59 import javax.management.MBeanServer;
60 import javax.management.MBeanServerConnection;
61 import javax.management.MBeanServerFactory;
62 import javax.management.MalformedObjectNameException;
63 import javax.management.NotCompliantMBeanException;
64 import javax.management.Notification;
65 import javax.management.NotificationBroadcasterSupport;
66 import javax.management.NotificationEmitter;
67 import javax.management.NotificationFilter;
68 import javax.management.NotificationListener;
69 import javax.management.ObjectName;
70 import javax.management.ReflectionException;
71 import javax.management.openmbean.ArrayType;
72 import javax.management.openmbean.CompositeData;
73 import javax.management.openmbean.OpenDataException;
74 import javax.management.openmbean.OpenMBeanAttributeInfo;
75 import javax.management.openmbean.OpenMBeanAttributeInfoSupport;
76 import javax.management.openmbean.OpenMBeanConstructorInfo;
77 import javax.management.openmbean.OpenMBeanInfoSupport;
78 import javax.management.openmbean.OpenMBeanOperationInfo;
79 import javax.management.openmbean.OpenMBeanOperationInfoSupport;
80 import javax.management.openmbean.OpenMBeanParameterInfo;
81 import javax.management.openmbean.OpenMBeanParameterInfoSupport;
82 import javax.management.openmbean.SimpleType;
83 import javax.management.openmbean.TabularData;
84 import javax.naming.Context;
85 import javax.naming.NameClassPair;
86 import javax.naming.NamingEnumeration;
87 import javax.naming.NamingException;
88
89 import org.archive.hcc.util.ClusterControllerNotification;
90 import org.archive.hcc.util.NotificationDelegator;
91 import org.archive.hcc.util.SmartPropertiesResolver;
92 import org.archive.hcc.util.Delegator.DelegatorPolicy;
93 import org.archive.hcc.util.jmx.MBeanFutureTask;
94 import org.archive.hcc.util.jmx.MBeanOperation;
95 import org.archive.hcc.util.jmx.MBeanServerConnectionFactory;
96 import org.archive.hcc.util.jmx.OpenMBeanInvocationManager;
97 import org.archive.hcc.util.jmx.RegistrationNotificationHandler;
98 import org.archive.hcc.util.jmx.SimpleReflectingMBeanOperation;
99 import org.archive.util.JmxUtils;
100 import org.archive.util.JndiUtils;
101
102 /***
103 * As the main workhorse of the package, the <code>ClusterControllerBean</code>
104 * provides a unified view of any number of Heritrix instances and all related
105 * objects within a JNDI scope.
106 *
107 * @author Daniel Bernstein (dbernstein@archive.org)
108 */
109 public class ClusterControllerBean implements
110 DynamicMBean,
111 NotificationEmitter,
112 MBeanRegistration {
113
114 /***
115 * The Jndi context
116 */
117 private Context context;
118
119 /***
120 * logger
121 */
122 private static Logger log = Logger.getLogger(ClusterControllerBean.class
123 .getName());
124
125 /***
126 * A timer thread that polls the jndi for new containers.
127 */
128 private Timer jndiPoller;
129
130 /***
131 * A single instance of notification listener that simply forwards messages
132 * from source beans to listeners.
133 */
134 private NotificationDelegator remoteNotificationDelegator;
135
136 /***
137 * poll period in seconds.
138 */
139 private static final int JNDI_POLL_PERIOD_IN_SECONDS = 60;
140
141 /***
142 * A map of mbean server connections mapped by address. TODO make
143 * configuratable
144 */
145 private Map<InetSocketAddress, MBeanServerConnection> connections =
146 new HashMap<InetSocketAddress, MBeanServerConnection>();
147
148 /***
149 * A list of remote container references
150 */
151 private Map<ObjectName, Container> containers;
152
153 private NotificationBroadcasterSupport broadCaster;
154
155 /***
156 * Manages and delegates OpenMBean invocations
157 */
158 private OpenMBeanInvocationManager invocationManager;
159
160 /***
161 * Defines the open mbean interface for the bean.
162 */
163 private MBeanInfo info;
164
165 /***
166 * A local mbean server
167 */
168 private MBeanServer mbeanServer;
169
170 /***
171 * The MBean name of the controller.
172 */
173 private ObjectName name;
174
175 /***
176 * A map of the remote crawler handles to the local Crawler dynamic proxy
177 * instances.
178 */
179 private Map<RemoteMapKey, Crawler> remoteNameToCrawlerMap =
180 new HashMap<RemoteMapKey, Crawler>();
181
182 /***
183 * Watches all traffic (invocations and notifications) moving between the
184 * remote Heritrix instances and the heritrix cluster controller.
185 */
186 private NotificationListener spyListener;
187
188 /***
189 * Upperbound on crawlers per container.
190 */
191 private int defaultMaxPerContainer = 1;
192
193 /***
194 * Creates a cluster controller bean. This object uses a two step
195 * initialization pattern. Therefore you must call init() before invoking
196 * any options.
197 */
198 public ClusterControllerBean() {
199 this.remoteNotificationDelegator = buildRemoteNotificationDelegator();
200 jndiPoller = new Timer();
201 this.broadCaster = new NotificationBroadcasterSupport();
202 this.invocationManager = new OpenMBeanInvocationManager();
203 this.info = buildOpenMBeanInfo();
204 this.mbeanServer = createMBeanServer();
205 this.containers = new HashMap<ObjectName, Container>();
206 this.spyListener = new NotificationListener() {
207 public void handleNotification(
208 Notification notification,
209 Object handback) {
210
211 if (log.isLoggable(Level.FINER)) {
212 log.finer(">>>>>>>>>spyListener: notification="
213 + notification);
214 }
215
216 fireNotificationEvent(notification);
217 }
218 };
219 }
220
221 private class RemoteMapKey {
222 private ObjectName name;
223 private String id;
224 public ObjectName getObjectName(){
225 return name;
226
227 }
228
229 public RemoteMapKey(ObjectName on){
230 this.name = on;
231 this.id = extractId(on);
232 }
233
234 @Override
235 public boolean equals(Object obj) {
236 if(!(obj instanceof RemoteMapKey)){
237 return false;
238 }
239
240 RemoteMapKey o = (RemoteMapKey)obj;
241
242 return (o.id.equals(id));
243 }
244
245 private String extractId(ObjectName name){
246 StringBuffer b = new StringBuffer();
247 b.append(name.getKeyProperty(JmxUtils.NAME));
248 b.append(name.getKeyProperty(JmxUtils.HOST));
249 b.append(name.getKeyProperty(JmxUtils.JMX_PORT));
250 b.append(name.getKeyProperty(JmxUtils.TYPE));
251 return b.toString();
252 }
253 @Override
254 public int hashCode() {
255
256 return id.hashCode();
257 }
258 }
259
260 private static MBeanServer createMBeanServer() {
261 MBeanServer result = null;
262 List servers = MBeanServerFactory.findMBeanServer(null);
263 if (servers == null) {
264 return result;
265 }
266 for (Iterator i = servers.iterator(); i.hasNext();) {
267 MBeanServer server = (MBeanServer) i.next();
268 if (server == null) {
269 continue;
270 }
271 result = server;
272 break;
273 }
274 return result;
275 }
276
277 /***
278 * @return Returns the total count of all crawlers within the cluster.
279 */
280 public Integer getTotalCrawlerCount() {
281 return this.remoteNameToCrawlerMap.size();
282 }
283
284 private OpenMBeanInfoSupport buildOpenMBeanInfo() {
285 return new OpenMBeanInfoSupport(
286 ClusterControllerBean.class.getName(),
287 "A controller for a pool of Heritrix "
288 + "containers and their contents.",
289 buildAttributes(),
290 buildConstructors(),
291 buildOperations(),
292 buildNotifications());
293 }
294
295 private OpenMBeanAttributeInfo[] buildAttributes() {
296 try {
297 return new OpenMBeanAttributeInfo[] {
298 new OpenMBeanAttributeInfoSupport(
299 "TotalCrawlerCount",
300 "Total number of crawlers that are "
301 + "currently initialized in the system.",
302 SimpleType.INTEGER,
303 true,
304 false,
305 false,
306 new Integer(0)) };
307 } catch (OpenDataException e) {
308 throw new RuntimeException(e);
309 }
310 }
311
312 private OpenMBeanConstructorInfo[] buildConstructors() {
313 return new OpenMBeanConstructorInfo[0];
314 }
315
316 private boolean uidMatches(TabularData td, String jobUid) {
317 if (jobUid == null) {
318 throw new NullPointerException("jobUid=" + jobUid);
319 }
320
321 if (td == null) {
322 return false;
323 }
324
325 for (Object o : td.values()) {
326 CompositeData cd = (CompositeData) o;
327 if (jobUid.equals(cd.get("uid"))) {
328 return true;
329 }
330 }
331
332 return false;
333 }
334
335
336 public ObjectName findCrawlServiceJobParent(
337 String jobUid,
338 String host,
339 Integer port) {
340
341 InetSocketAddress remoteAddress = new InetSocketAddress(host, port);
342 Container container = getContainerOn(remoteAddress);
343
344 if (container == null) {
345 return null;
346 }
347
348 List<Crawler> crawlers =
349 new LinkedList<Crawler>(container.getCrawlers());
350 for (Crawler crawler : crawlers) {
351 DynamicMBean p = crawler.getCrawlServiceProxy();
352 try {
353
354
355 ObjectName cjp = crawler.getCrawlJobProxyObjectName();
356 ObjectName csp = crawler.getCrawlServiceProxyObjectName();
357
358 if (cjp != null) {
359 if (JmxUtils.getUid(cjp).equals(jobUid)) {
360 return csp;
361 }
362 }
363
364
365 TabularData td = (TabularData) p.invoke(
366 "completedJobs",
367 new Object[0],
368 new String[0]);
369
370 if (uidMatches(td, jobUid)) {
371 return csp;
372 }
373
374
375 td = (TabularData) p.invoke(
376 "pendingJobs",
377 new Object[0],
378 new String[0]);
379
380 if (uidMatches(td, jobUid)) {
381 return csp;
382 }
383 } catch (MBeanException e) {
384 e.printStackTrace();
385 } catch (ReflectionException e) {
386 e.printStackTrace();
387 }
388 }
389
390 return null;
391 }
392
393
394 /***
395 * Returns the current job object name associated with the specified crawler.
396 * Returns null if no job was running.
397 * @param mother
398 * @return
399 */
400 public ObjectName getCurrentCrawlJob(ObjectName mother) {
401 InetSocketAddress remoteAddress = org.archive.hcc.util.JmxUtils.extractRemoteAddress(mother);
402 Container container = getContainerOn(remoteAddress);
403
404 if (container == null) {
405 return null;
406 }
407
408 for (Crawler crawler : new LinkedList<Crawler>(container.getCrawlers())) {
409 if(crawler.getCrawlServiceProxyObjectName().equals(mother)){
410 return crawler.getCrawlJobProxyObjectName();
411 }
412 }
413 return null;
414 }
415
416 /***
417 * Returns the maximum number of instances allowed for this container.
418 * If the container does not exist, -1 is returned.
419 * @param hostname
420 * @param port
421 * @return
422 */
423 public int getMaxInstances(String hostname, Integer port){
424 Collection<Container> list =
425 new LinkedList<Container>(this.containers.values());
426
427 for(Container c : list){
428 InetSocketAddress a = JmxUtils.extractAddress(c.getName());
429 if(a.getHostName().equals(hostname) && port == a.getPort()){
430 return c.getMaxInstances();
431 }
432 }
433
434 return -1;
435 }
436
437 /***
438 * Sets the maximum number of instances that may run on a
439 * specified container defined by a host and port.
440 * @param hostname
441 * @param port
442 * @param maxInstances
443 */
444 public void setMaxInstances(String hostname, Integer port, Integer maxInstances){
445 Collection<Container> list =
446 new LinkedList<Container>(this.containers.values());
447
448 if(maxInstances < -1){
449 maxInstances = -1;
450 }
451
452 for(Container c : list){
453 InetSocketAddress a = JmxUtils.extractAddress(c.getName());
454 if(a.getHostName().equals(hostname) && port == a.getPort()){
455 c.setMaxInstances(maxInstances);
456 break;
457 }
458 }
459 }
460
461 /***
462 * Creates a new crawler on the least loaded machine on the cluster.
463 *
464 * @return ObjectName of created crawler.
465 * @throws MBeanException
466 */
467 public ObjectName createCrawler() throws MBeanException {
468 List<Container> containers = resolveLeastLoadedContainers();
469
470 if (containers != null) {
471 for(Container container : containers){
472 try {
473 log.info("attempting to create crawler on container: " + container.getName());
474 return createCrawlerIn(container);
475 } catch (Exception e) {
476 log.warning("unexpected error!!! failed to create crawler as expected on " + container.getName());
477 e.printStackTrace();
478 }
479
480 }
481
482 log.severe("unable to start any crawlers on container due to communication " +
483 "failure with all available containers.");
484 }
485
486 MBeanException e = new MBeanException(
487 new Exception(
488 "No space available in remote"+
489 " containers for new crawler " + "instances."),
490 "insufficent crawler resources"
491 );
492
493 throw e;
494
495 }
496
497 /***
498 // * @return Currently returns the least loaded going by a dumb count.
499 // */
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520 /***
521 * @return A list of available (ie not fully loaded) containers sorted from least loaded to most loaded. If no
522 * containers are available, returns null.
523 */
524 protected List<Container> resolveLeastLoadedContainers() {
525
526 List<Container>leastLoaded = null;
527 Container last = null;
528
529 List<Container> currentContainers = new LinkedList<Container>(this.containers.values());
530
531 Collections.sort(currentContainers, new Comparator(){
532 public int compare(Object o1, Object o2) {
533 Container c1 = (Container)o1;
534 Container c2 = (Container)o2;
535 return new Integer(c1.getCrawlers().size()).compareTo(new Integer(c2.getCrawlers().size()));
536 }
537 });
538
539 for (Container n : currentContainers) {
540 if (n.getCrawlers().size() >= n.getMaxInstances()) {
541 continue;
542 }
543
544 if(leastLoaded == null){
545 leastLoaded = new LinkedList<Container>();
546 }
547
548 leastLoaded.add(n);
549 }
550
551 return leastLoaded;
552 }
553
554 private OpenMBeanOperationInfo[] buildOperations() {
555 try {
556
557 addOperation(new SimpleReflectingMBeanOperation(
558 ClusterControllerBean.this,
559 new OpenMBeanOperationInfoSupport(
560 "createCrawler",
561 "creates a new crawler on the cluster",
562 null,
563 SimpleType.OBJECTNAME,
564 OpenMBeanOperationInfoSupport.ACTION_INFO)));
565
566 addOperation(new SimpleReflectingMBeanOperation(
567 ClusterControllerBean.this,
568 new OpenMBeanOperationInfoSupport(
569 "destroy",
570 "Effectively \"detaches\" the bean from the " +
571 "containers, crawl services and jobs that it " +
572 "is managing. The remote objects are not " +
573 "affected. ",
574 null,
575 SimpleType.VOID,
576 OpenMBeanOperationInfoSupport.ACTION)));
577
578 addOperation(new SimpleReflectingMBeanOperation(
579 ClusterControllerBean.this,
580 new OpenMBeanOperationInfoSupport(
581 "listCrawlers",
582 "lists crawlers associated with the cluster",
583 null,
584 new ArrayType(1, SimpleType.OBJECTNAME),
585 OpenMBeanOperationInfoSupport.INFO)));
586
587
588 addOperation(new SimpleReflectingMBeanOperation(
589 ClusterControllerBean.this,
590 new OpenMBeanOperationInfoSupport(
591 "destroyAllCrawlers",
592 "destroys all crawlers that are managed by the cluster" +
593 " controller",
594 null,
595 SimpleType.VOID,
596 OpenMBeanOperationInfoSupport.ACTION)));
597
598
599 addOperation(new SimpleReflectingMBeanOperation(
600 ClusterControllerBean.this,
601 new OpenMBeanOperationInfoSupport(
602 "pauseAllJobs",
603 "pauses all jobs that are managed by the cluster" +
604 " controller",
605 null,
606 SimpleType.BOOLEAN,
607 OpenMBeanOperationInfoSupport.ACTION)));
608
609 addOperation(new SimpleReflectingMBeanOperation(
610 ClusterControllerBean.this,
611 new OpenMBeanOperationInfoSupport(
612 "resumeAllPausedJobs",
613 "resumes crawling of all paused jobs that are managed by the cluster" +
614 " controller",
615 null,
616 SimpleType.BOOLEAN,
617 OpenMBeanOperationInfoSupport.ACTION)));
618
619
620 addOperation(new SimpleReflectingMBeanOperation(
621 ClusterControllerBean.this,
622 new OpenMBeanOperationInfoSupport(
623 "findCrawlServiceJobParent", "returns the parent" +
624 "name of the specified crawl job.",
625 new OpenMBeanParameterInfo[] {
626 new OpenMBeanParameterInfoSupport(
627 "uid",
628 "The job's uid",
629 SimpleType.STRING),
630 new OpenMBeanParameterInfoSupport(
631 "remoteHost",
632 "The remote host name",
633 SimpleType.STRING),
634 new OpenMBeanParameterInfoSupport(
635 "remotePort",
636 "The remote port",
637 SimpleType.INTEGER) },
638 SimpleType.OBJECTNAME,
639 OpenMBeanOperationInfoSupport.INFO)));
640
641 addOperation(new SimpleReflectingMBeanOperation(
642 ClusterControllerBean.this,
643 new OpenMBeanOperationInfoSupport(
644 "getCurrentCrawlJob", "returns the current " +
645 "crawl job name of the crawl job running" +
646 " on the specified crawler Returns null" +
647 " if either the mother is not found or " +
648 "if the crawler does not have a current job.",
649 new OpenMBeanParameterInfo[] {
650 new OpenMBeanParameterInfoSupport(
651 "mother",
652 "The job's mother (CrawlService)",
653 SimpleType.OBJECTNAME)},
654 SimpleType.OBJECTNAME,
655 OpenMBeanOperationInfoSupport.INFO)));
656
657
658 addOperation(new SimpleReflectingMBeanOperation(
659 ClusterControllerBean.this,
660 new OpenMBeanOperationInfoSupport(
661 "setMaxInstances", "sets the max number of " +
662 "instances of heritrix that a heritrix " +
663 "enabled jvm can serve.",
664 new OpenMBeanParameterInfo[] {
665 new OpenMBeanParameterInfoSupport(
666 "host",
667 "The jvm's host",
668 SimpleType.STRING),
669 new OpenMBeanParameterInfoSupport(
670 "port",
671 "The jvm's jmx port",
672 SimpleType.INTEGER),
673 new OpenMBeanParameterInfoSupport(
674 "maxInstances",
675 "The max number of instances",
676 SimpleType.INTEGER) },
677 SimpleType.VOID,
678 OpenMBeanOperationInfoSupport.ACTION)));
679
680
681 addOperation(new SimpleReflectingMBeanOperation(
682 ClusterControllerBean.this,
683 new OpenMBeanOperationInfoSupport(
684 "getMaxInstances", "returns the max number of " +
685 "instances of heritrix that a heritrix " +
686 "enabled jvm can serve.",
687 new OpenMBeanParameterInfo[] {
688 new OpenMBeanParameterInfoSupport(
689 "host",
690 "The jvm's host",
691 SimpleType.STRING),
692 new OpenMBeanParameterInfoSupport(
693 "port",
694 "The jvm's jmx port",
695 SimpleType.INTEGER)
696 },
697 SimpleType.INTEGER,
698 OpenMBeanOperationInfoSupport.INFO)));
699
700 } catch (OpenDataException e) {
701 e.printStackTrace();
702 }
703
704 return this.invocationManager.getInfo();
705 }
706
707 /***
708 * @return Returns a list of all crawler instances within the hcc's jndi
709 * scope.
710 */
711 public ObjectName[] listCrawlers() {
712 int size = this.remoteNameToCrawlerMap.keySet().size();
713 ObjectName[] crawlers = new ObjectName[size];
714 int count = 0;
715 for (Crawler c : this.remoteNameToCrawlerMap.values()) {
716 crawlers[count] = c.getCrawlServiceProxyObjectName();
717 count++;
718 }
719 return crawlers;
720 }
721
722 public void destroyAllCrawlers(){
723 List<Crawler> list =
724 new LinkedList<Crawler>(this.remoteNameToCrawlerMap.values());
725 for(Crawler c : list){
726 try {
727 c.getCrawlServiceProxy().invoke("destroy", new Object[0], new String[0]);
728 } catch (MBeanException e) {
729
730 e.printStackTrace();
731 } catch (ReflectionException e) {
732
733 e.printStackTrace();
734 }
735 }
736 }
737
738
739 public boolean pauseAllJobs(){
740 List<Crawler> list =
741 new LinkedList<Crawler>(this.remoteNameToCrawlerMap.values());
742
743 boolean success = true;
744 for(Crawler c : list){
745 try {
746 DynamicMBean m = c.getCrawlJobProxy();
747 if(m == null){
748 continue;
749 }
750 String status = (String)m.getAttribute("Status");
751
752 if("RUNNING".equals(status)){
753 m.invoke("pause", new Object[0], new String[0]);
754 }
755 } catch (AttributeNotFoundException e) {
756 e.printStackTrace();
757 } catch (MBeanException e) {
758 e.printStackTrace();
759 success = false;
760 } catch (ReflectionException e) {
761 e.printStackTrace();
762 success = false;
763 }
764 }
765
766 return success;
767 }
768
769 public boolean resumeAllPausedJobs(){
770 List<Crawler> list =
771 new LinkedList<Crawler>(this.remoteNameToCrawlerMap.values());
772
773 boolean success = true;
774 for(Crawler c : list){
775 try {
776 DynamicMBean m = c.getCrawlJobProxy();
777 if(m == null){
778 continue;
779 }
780
781 String status = (String)m.getAttribute("Status");
782 if("PAUSED".equals(status) || "PAUSING".equals(status)){
783 m.invoke("resume", new Object[0], new String[0]);
784 }
785 } catch (AttributeNotFoundException e) {
786 e.printStackTrace();
787 } catch (MBeanException e) {
788 e.printStackTrace();
789 success = false;
790 } catch (ReflectionException e) {
791 e.printStackTrace();
792 success = false;
793 }
794 }
795
796 return success;
797 }
798
799 private void addOperation(MBeanOperation operation) {
800 this.invocationManager.addMBeanOperation(operation);
801 }
802
803 private MBeanNotificationInfo[] buildNotifications() {
804 List<MBeanNotificationInfo> info =
805 new LinkedList<MBeanNotificationInfo>();
806
807 info.add(new MBeanNotificationInfo(
808 new String[] {ClusterControllerNotification.
809 CRAWL_SERVICE_CREATED_NOTIFICATION.getKey() },
810 ClusterControllerBean.class.getName(),
811 "Notifies when a new instance of the crawl service comes up"));
812
813 info.add(new MBeanNotificationInfo(
814 new String[] { ClusterControllerNotification.
815 CRAWL_SERVICE_DESTROYED_NOTIFICATION.getKey() },
816 ClusterControllerBean.class.getName(),
817 "Notifies when an instance of the crawl service goes away"));
818
819 info.add(new MBeanNotificationInfo(
820 new String[] { ClusterControllerNotification.
821 CRAWL_SERVICE_JOB_STARTED_NOTIFICATION.getKey() },
822 ClusterControllerBean.class.getName(),
823 "Notifies when a new crawl service job starts"));
824
825 info.add(new MBeanNotificationInfo(
826 new String[] { ClusterControllerNotification.
827 CRAWL_SERVICE_JOB_COMPLETED_NOTIFICATION.getKey() },
828 ClusterControllerBean.class.getName(),
829 "Notifies when a new instance of the crawl service job " +
830 "completes"));
831
832 return info.toArray(new MBeanNotificationInfo[0]);
833 }
834
835 /***
836 * Initializes the cluster controller.
837 */
838 public void init() {
839 try {
840 Properties p =
841 SmartPropertiesResolver.getProperties("hcc.properties");
842 this.defaultMaxPerContainer = Integer.parseInt(
843 p.getProperty(ClusterControllerBean.class.getName() +
844 ".maxPerContainer", "1"));
845 log.info("maxPerContainer setting: " + this.defaultMaxPerContainer);
846
847 context = JndiUtils.getSubContext("org.archive.crawler");
848 this.name = new ObjectName("org.archive.hcc:"
849 + "type=ClusterControllerBean"
850 + ",host="
851 + System.getenv("HOSTNAME")
852 + ",jmxport="
853 + System.getProperty(
854 "com.sun.management.jmxremote.port",
855 "8849"));
856
857 refreshRegistry();
858
859 initializeJndiPoller();
860
861 registerMBean();
862
863 } catch (MalformedObjectNameException e) {
864
865 e.printStackTrace();
866 } catch (NullPointerException e) {
867
868 e.printStackTrace();
869 } catch (NamingException e) {
870 e.printStackTrace();
871 throw new RuntimeException(e);
872 }
873 }
874
875 private void registerMBean() {
876 try {
877 this.mbeanServer.registerMBean(this, this.name);
878 } catch (InstanceAlreadyExistsException e) {
879
880 e.printStackTrace();
881 } catch (MBeanRegistrationException e) {
882
883 e.printStackTrace();
884 } catch (NotCompliantMBeanException e) {
885
886 e.printStackTrace();
887 }
888 }
889
890 /***
891 * Disconnects the cluster controller from the network. It unhooks the
892 * controller without acting on the remote instances.
893 */
894 public void destroy() {
895 try {
896 if (log.isLoggable(Level.INFO)) {
897 log.info("destroying cluster controller.");
898 }
899
900 jndiPoller.cancel();
901
902 if (log.isLoggable(Level.INFO)) {
903 log.info("cancelled jndi poller");
904 }
905
906 jndiPoller = null;
907
908 this.broadCaster = null;
909
910 List<Container> containers = new LinkedList<Container>(
911 this.containers.values());
912 for (Container container : containers) {
913 dereferenceContainer(container);
914 }
915
916 context.close();
917 } catch (NamingException e) {
918 e.printStackTrace();
919 } finally {
920 try {
921 this.mbeanServer.unregisterMBean(this.name);
922 } catch (InstanceNotFoundException e) {
923
924 e.printStackTrace();
925 } catch (MBeanRegistrationException e) {
926
927 e.printStackTrace();
928 }
929 }
930 }
931
932 private void dereferenceCrawler(Crawler crawler) {
933
934 crawler.removeFromParent();
935 if(crawler.getCrawlJobRemoteObjectName()!= null){
936 this.remoteNameToCrawlerMap.remove(
937 new RemoteMapKey(crawler.getCrawlJobRemoteObjectName()));
938 }
939
940 try {
941 this.mbeanServer.unregisterMBean(crawler
942 .getCrawlServiceProxyObjectName());
943 } catch (InstanceNotFoundException e) {
944 if (log.isLoggable(Level.WARNING)) {
945 log.warning(e.getMessage());
946 }
947
948 e.printStackTrace();
949
950 } catch (MBeanRegistrationException e) {
951 if (log.isLoggable(Level.WARNING)) {
952 log.warning(e.getMessage());
953 }
954
955 e.printStackTrace();
956 }
957
958 try {
959 ObjectName crawlJob = crawler.getCrawlJobProxyObjectName();
960 if (crawlJob != null) {
961 this.mbeanServer.unregisterMBean(crawlJob);
962 }
963 } catch (InstanceNotFoundException e) {
964 if (log.isLoggable(Level.WARNING)) {
965 log.warning(e.getMessage());
966 }
967
968 } catch (MBeanRegistrationException e) {
969 if (log.isLoggable(Level.WARNING)) {
970 log.warning(e.getMessage());
971 }
972
973 e.printStackTrace();
974 }
975 }
976
977 /***
978 * Initializes the notification forwarder.
979 * @return Returns notification delegator.
980 */
981 private NotificationDelegator buildRemoteNotificationDelegator() {
982 NotificationDelegator d = new NotificationDelegator(
983 DelegatorPolicy.ACCEPT_FIRST) {
984 protected boolean delegate(Notification n, Object handback) {
985 return super.delegate(n, handback);
986 }
987 };
988
989 d.addDelegatable(new CrawlJobLifecycleNotificationHandler());
990 d.addDelegatable(new CrawlerLifecycleNotificationHandler());
991 d.addDelegatable(new ContainerLifecycleNotificationHandler());
992
993 return d;
994 }
995
996 private void fireNotificationEvent(Notification n) {
997 this.broadCaster.sendNotification(n);
998 }
999
1000 private class CrawlerLifecycleNotificationHandler
1001 extends
1002 RegistrationNotificationHandler {
1003 @Override
1004 protected String getType() {
1005 return JmxUtils.SERVICE;
1006 }
1007
1008 @Override
1009 protected void handleRegistered(ObjectName name) {
1010 handleCrawlerCreated(name);
1011 }
1012
1013 @Override
1014 protected void handleUnregistered(ObjectName name) {
1015 handleCrawlerRemoved(name);
1016 }
1017 }
1018
1019 private class ContainerLifecycleNotificationHandler
1020 extends
1021 RegistrationNotificationHandler {
1022 @Override
1023 protected String getType() {
1024 return "container";
1025 }
1026
1027 @Override
1028 protected void handleRegistered(ObjectName name) {
1029
1030 }
1031
1032 @Override
1033 protected void handleUnregistered(ObjectName name) {
1034 handleContainerRemoved(name);
1035 }
1036 }
1037
1038 private class CrawlJobLifecycleNotificationHandler
1039 extends
1040 RegistrationNotificationHandler {
1041 @Override
1042 protected String getType() {
1043 return JmxUtils.JOB;
1044 }
1045
1046 @Override
1047 protected void handleRegistered(ObjectName name) {
1048 handleJobAdded(name);
1049 }
1050
1051 @Override
1052 protected void handleUnregistered(ObjectName name) {
1053 handleJobRemoved(name);
1054 }
1055 }
1056
1057 protected void handleJobRemoved(ObjectName job) {
1058 if(log.isLoggable(Level.INFO)){
1059 log.info("entering: job=" + job);
1060 }
1061
1062 Crawler c = getJobContext(job);
1063
1064 if(c == null){
1065 if(log.isLoggable(Level.WARNING)){
1066 log.warning("no crawler context found for job=" + job);
1067 }
1068
1069 return;
1070 }
1071
1072
1073 ObjectName jobProxy = c.getCrawlJobProxyObjectName();
1074 if(jobProxy == null){
1075 if(log.isLoggable(Level.WARNING)){
1076 log.warning("jobProxy was not found on crawler=" + c.getCrawlJobProxyObjectName());
1077 }
1078 return;
1079 }
1080
1081 try {
1082 this.mbeanServer.unregisterMBean(jobProxy);
1083 } catch (InstanceNotFoundException e) {
1084 log.severe("failed to unregister job proxy: " + jobProxy + "; remote job=" + job + "; error.class=" + e.getClass() + "; error.message=" + e.getMessage());
1085
1086 e.printStackTrace();
1087 } catch (MBeanRegistrationException e) {
1088 log.severe("failed to unregister job proxy: " + jobProxy + "; remote job=" + job + "; error.class=" + e.getClass() + "; message=" + e.getMessage());
1089 e.printStackTrace();
1090 }
1091
1092 c.setCrawlJobProxy(null);
1093 fireNotification(
1094 jobProxy,
1095 ClusterControllerNotification.
1096 CRAWL_SERVICE_JOB_COMPLETED_NOTIFICATION.getKey());
1097
1098 if(log.isLoggable(Level.INFO)){
1099 log.info("exitting successfully: job=" + job);
1100 }
1101
1102 }
1103
1104 private boolean isJobOnCrawler(ObjectName job, ObjectName crawler) {
1105
1106 return equals(job, crawler, JmxUtils.JMX_PORT)
1107 && equals(job, crawler, JmxUtils.HOST)
1108 && job.getKeyProperty(JmxUtils.MOTHER).equals(
1109 crawler.getKeyProperty(JmxUtils.NAME));
1110 }
1111
1112 private Crawler getJobContext(ObjectName job) {
1113 for (RemoteMapKey key : this.remoteNameToCrawlerMap.keySet()) {
1114 if (isJobOnCrawler(job, key.getObjectName())) {
1115 return this.remoteNameToCrawlerMap.get(key);
1116 }
1117 }
1118
1119 throw new NullPointerException(
1120 "shouldn't happen: no crawler found for job: " + job);
1121 }
1122
1123 protected void handleJobAdded(ObjectName job) {
1124 try {
1125 ObjectName proxyName = addCrawlJob(job);
1126
1127 fireNotification(
1128 proxyName,
1129 ClusterControllerNotification.
1130 CRAWL_SERVICE_JOB_STARTED_NOTIFICATION.getKey());
1131 } catch (RuntimeException e) {
1132
1133 e.printStackTrace();
1134 }
1135 }
1136
1137 private ObjectName addCrawlJob(ObjectName job) {
1138 InetSocketAddress address = JmxUtils.extractAddress(job);
1139 Crawler crawler = getJobContext(job);
1140 ObjectName proxyName = createClientProxyName(job);
1141 DynamicMBean proxy = (DynamicMBean) Proxy.newProxyInstance(
1142 DynamicMBean.class.getClassLoader(),
1143 new Class[] { DynamicMBean.class, NotificationEmitter.class },
1144 new RemoteMBeanInvocationHandler(
1145 job,
1146 proxyName,
1147 this.connections.get(address),
1148 spyListener));
1149
1150 crawler.setCrawlJobProxy(proxy);
1151 try {
1152 this.mbeanServer.registerMBean(proxy, proxyName);
1153 } catch (InstanceAlreadyExistsException e) {
1154
1155 e.printStackTrace();
1156 } catch (MBeanRegistrationException e) {
1157
1158 e.printStackTrace();
1159 } catch (NotCompliantMBeanException e) {
1160
1161 e.printStackTrace();
1162 }
1163
1164 ((NotificationEmitter) proxy).addNotificationListener(
1165 this.remoteNotificationDelegator,
1166 null,
1167 new Object());
1168
1169 return proxyName;
1170 }
1171
1172 protected boolean equals(ObjectName a, ObjectName b, String key) {
1173 Object va = a.getKeyProperty(key);
1174 Object vb = b.getKeyProperty(key);
1175
1176 if (!(va != null && vb != null)) {
1177 return false;
1178 }
1179
1180 return va.equals(vb);
1181 }
1182
1183 protected void handleContainerRemoved(ObjectName name) {
1184
1185 for (Container c : new LinkedList<Container>(containers.values())) {
1186 if (c.getName().equals(name)) {
1187 for (Crawler crawler : c.getCrawlers()) {
1188 removeCrawlerAndNotify(crawler);
1189 }
1190 dereferenceContainer(c);
1191 break;
1192 }
1193 }
1194 }
1195
1196 private void removeCrawlerAndNotify(Crawler crawler) {
1197 dereferenceCrawler(crawler);
1198 fireCrawlerDestroyed(crawler.getCrawlServiceProxyObjectName());
1199 }
1200
1201 protected void handleCrawlerRemoved(ObjectName name) {
1202 Crawler c = this.remoteNameToCrawlerMap.remove(new RemoteMapKey(name));
1203 if (c != null) {
1204 removeCrawlerAndNotify(c);
1205 }
1206 }
1207
1208 /***
1209 * @return Returns a list of containers registered with the jndi service.
1210 */
1211 protected final List<ObjectName> retrieveContainerListFromJndi() {
1212 List<ObjectName> list = new LinkedList<ObjectName>();
1213
1214 try {
1215 NamingEnumeration<NameClassPair> e = context.list("");
1216 while (e.hasMore()) {
1217 NameClassPair ncp = e.next();
1218 String jndiName = ncp.getName();
1219
1220 try {
1221 ObjectName on = new ObjectName(":" + jndiName);
1222 Hashtable ht = on.getKeyPropertyList();
1223 if (ht.get("type").equals("container")) {
1224 list.add(on);
1225 }
1226
1227 } catch (MalformedObjectNameException e1) {
1228
1229 e1.printStackTrace();
1230 }
1231 }
1232
1233 } catch (NamingException e) {
1234
1235 e.printStackTrace();
1236 }
1237
1238 return list;
1239 }
1240
1241 protected final void refreshRegistry() {
1242 List<ObjectName> containerNameList = this
1243 .retrieveContainerListFromJndi();
1244 this.containers = synchronizeContainers(
1245 this.containers,
1246 containerNameList);
1247 }
1248
1249 /***
1250 * Synchronizes the container list with the fresh list (fresh meaning, last
1251 * polled from jndi), removing those that went away, and adding any newly
1252 * discovered containers.
1253 *
1254 * @param containers
1255 * @param freshContainers
1256 * @return Map of container object names.
1257 */
1258 protected final Map<ObjectName, Container> synchronizeContainers(
1259 Map<ObjectName, Container> containers,
1260 List<ObjectName> freshContainers) {
1261
1262 Map<ObjectName, Container> staleContainers = new HashMap<ObjectName, Container>(
1263 containers);
1264
1265
1266 for (ObjectName n : staleContainers.keySet()) {
1267 if (!freshContainers.contains(n)) {
1268 handleContainerRemoved(n);
1269 }
1270 }
1271
1272
1273 for (ObjectName n : freshContainers) {
1274 if (!containers.keySet().contains(n)) {
1275 try {
1276 InetSocketAddress address = JmxUtils.extractAddress(n);
1277 registerAddress(address);
1278 synchronizeContainer(n);
1279
1280 attachMBeanServerDelegateNotificationListener(
1281 address,
1282 this.remoteNotificationDelegator);
1283 } catch (IOException e) {
1284 e.printStackTrace();
1285 }
1286 }
1287 }
1288
1289 return containers;
1290 }
1291
1292 /***
1293 * Attaches a notification listener to the remote mbean server delegate at
1294 * the specified address..
1295 * @param address
1296 * @param listener
1297 */
1298 protected void attachMBeanServerDelegateNotificationListener(
1299 InetSocketAddress address,
1300 NotificationListener listener) {
1301
1302 MBeanServerConnection mbc = this.connections.get(address);
1303
1304 if (mbc == null) {
1305 throw new NullPointerException(
1306 "no mbean server connection found on " + address);
1307 }
1308
1309 try {
1310 mbc.addNotificationListener(
1311 JmxUtils.MBEAN_SERVER_DELEGATE,
1312 listener,
1313 null,
1314 null);
1315 } catch (InstanceNotFoundException e) {
1316
1317 e.printStackTrace();
1318 } catch (IOException e) {
1319
1320 e.printStackTrace();
1321 }
1322 }
1323
1324 protected void registerAddress(InetSocketAddress isa) throws IOException {
1325 if (!this.connections.keySet().contains(isa)) {
1326
1327 MBeanServerConnection mbc = MBeanServerConnectionFactory.createConnection(isa);
1328 this.connections.put(isa, mbc);
1329 }
1330 }
1331
1332 /***
1333 * Unhooks container from the bus.
1334 * @param c Container to remove.
1335 */
1336 protected void dereferenceContainer(Container c) {
1337 Container removed = containers.remove(c.getName());
1338 if (removed != null) {
1339 InetSocketAddress address = JmxUtils.extractAddress(c.getName());
1340 for (Crawler b : new LinkedList<Crawler>(c.getCrawlers())) {
1341 dereferenceCrawler(b);
1342 }
1343
1344 removeMBeanServerNotificationListener(address);
1345 this.connections.remove(address);
1346 }
1347 }
1348
1349 protected void removeMBeanServerNotificationListener(
1350 InetSocketAddress address) {
1351
1352 MBeanServerConnection mbc = this.connections.get(address);
1353
1354 if (mbc == null) {
1355 throw new NullPointerException(
1356 "no mbean server connection found on " + address);
1357 }
1358
1359 try {
1360 mbc.removeNotificationListener(
1361 JmxUtils.MBEAN_SERVER_DELEGATE,
1362 this.remoteNotificationDelegator);
1363 } catch (InstanceNotFoundException e) {
1364
1365 e.printStackTrace();
1366 } catch (ListenerNotFoundException e) {
1367
1368 e.printStackTrace();
1369 } catch (IOException e) {
1370
1371 e.printStackTrace();
1372 }
1373 }
1374
1375 /***
1376 * Synchronizes the state of an individual container. This means that the
1377 * container is polled for instances of mbeans, listeners are attached, to
1378 * the remote mbeans,
1379 *
1380 * @param c Container to check.
1381 */
1382 protected void synchronizeContainer(ObjectName c) {
1383 log.info("synchonizing container:" + c.toString());
1384
1385 InetSocketAddress address = JmxUtils.extractAddress(c);
1386 MBeanServerConnection mbc = this.connections.get(address);
1387
1388 if (mbc == null) {
1389 throw new NullPointerException(
1390 "no mbean server connection found on " + address);
1391 }
1392 Container container = new Container(c, this.defaultMaxPerContainer);
1393
1394 this.containers.put(c, container);
1395 try {
1396
1397 Set<ObjectName> names = mbc.queryNames(null, null);
1398 for (ObjectName n : names) {
1399 synchronizeMBean(n);
1400 }
1401
1402 } catch (IOException e) {
1403
1404 e.printStackTrace();
1405 }
1406 }
1407
1408 /***
1409 * synchronizes state of beans according to their types. E.g. Heritrix
1410 * Service beans are added if it hasn't already been created.
1411 *
1412 * @param name
1413 */
1414 protected void synchronizeMBean(ObjectName name) {
1415 String theType = name.getKeyProperty(JmxUtils.TYPE);
1416 if (JmxUtils.SERVICE.equals(theType)) {
1417 log.info("crawler service found:" + name.toString());
1418 handleCrawlerCreated(name);
1419 }
1420 }
1421
1422 /***
1423 * defines and starts the jndi poller timer task
1424 */
1425 private void initializeJndiPoller() {
1426 this.jndiPoller.schedule(
1427
1428 new TimerTask() {
1429 public void run() {
1430 if (log.isLoggable(Level.INFO)) {
1431 log.info("running poll task...");
1432 }
1433
1434 if (jndiPoller == null) {
1435 return;
1436 }
1437
1438 refreshRegistry();
1439 if (log.isLoggable(Level.INFO)) {
1440 log.info("poll task done.");
1441 }
1442 }
1443
1444 },
1445
1446 new Date(),
1447 JNDI_POLL_PERIOD_IN_SECONDS * 1000);
1448 }
1449
1450 public Object getAttribute(String attribute)
1451 throws AttributeNotFoundException,
1452 MBeanException,
1453 ReflectionException {
1454
1455 return null;
1456 }
1457
1458 public AttributeList getAttributes(String[] attributes) {
1459
1460 return null;
1461 }
1462
1463 public MBeanInfo getMBeanInfo() {
1464 return this.info;
1465 }
1466
1467 public Object invoke(String actionName, Object[] params, String[] signature)
1468 throws MBeanException,
1469 ReflectionException {
1470 return this.invocationManager.invoke(actionName, params, signature);
1471 }
1472
1473 public void setAttribute(Attribute attribute)
1474 throws AttributeNotFoundException,
1475 InvalidAttributeValueException,
1476 MBeanException,
1477 ReflectionException {
1478
1479
1480 }
1481
1482 public AttributeList setAttributes(AttributeList attributes) {
1483
1484 return null;
1485 }
1486
1487 public void addNotificationListener(
1488 NotificationListener listener,
1489 NotificationFilter filter,
1490 Object handback) throws IllegalArgumentException {
1491 this.broadCaster.addNotificationListener(listener, filter, handback);
1492 }
1493
1494 public MBeanNotificationInfo[] getNotificationInfo() {
1495 return this.broadCaster.getNotificationInfo();
1496 }
1497
1498 public void removeNotificationListener(
1499 NotificationListener listener,
1500 NotificationFilter filter,
1501 Object handback) throws ListenerNotFoundException {
1502 this.broadCaster.removeNotificationListener(listener, filter, handback);
1503 }
1504
1505 public void removeNotificationListener(NotificationListener listener)
1506 throws ListenerNotFoundException {
1507 this.broadCaster.removeNotificationListener(listener);
1508 }
1509
1510 public void postDeregister() {
1511
1512
1513 }
1514
1515 public void postRegister(Boolean registrationDone) {
1516
1517
1518 }
1519
1520 public void preDeregister() throws Exception {
1521
1522
1523 }
1524
1525 public ObjectName preRegister(MBeanServer server, ObjectName name)
1526 throws Exception {
1527 return name;
1528 }
1529
1530 private ObjectName createServiceBeanName(
1531 String name,
1532 InetSocketAddress address,
1533 String guiPort) {
1534 try {
1535 Hashtable<String, String> ht = new Hashtable<String, String>();
1536
1537 ht.put(JmxUtils.HOST, address.getHostName());
1538 ht.put(JmxUtils.TYPE, JmxUtils.SERVICE);
1539 ht.put(JmxUtils.NAME, name);
1540 ht.put(JmxUtils.JMX_PORT, String.valueOf(address.getPort()));
1541 if(guiPort != null){
1542 ht.put(JmxUtils.GUI_PORT, guiPort);
1543 }
1544
1545 return new ObjectName("org.archive.crawler", ht);
1546 } catch (Exception e) {
1547 e.printStackTrace();
1548 throw new RuntimeException(e);
1549 }
1550 }
1551
1552
1553 private ObjectName createCrawlerIn(Container container) throws Exception {
1554 InetSocketAddress address = JmxUtils
1555 .extractAddress(container.getName());
1556 MBeanServerConnection c = this.connections.get(address);
1557 if (c == null) {
1558 throw new Exception("No connection found on " + address);
1559 }
1560
1561 MBeanFutureTask t = null;
1562 try {
1563 String newBeanName = "h" + System.currentTimeMillis();
1564 final ObjectName beanName = createServiceBeanName(
1565 newBeanName,
1566 address,
1567 container.getName().getKeyProperty(JmxUtils.GUI_PORT));
1568
1569 t = new MBeanFutureTask("create crawler:" + address) {
1570 public boolean isNotificationEnabled(Notification notification) {
1571 if (notification
1572 .getType()
1573 .equals(ClusterControllerNotification.
1574 CRAWL_SERVICE_CREATED_NOTIFICATION.getKey())) {
1575 Crawler c = remoteNameToCrawlerMap.get(new RemoteMapKey(beanName));
1576 ObjectName proxy = c.getCrawlServiceProxyObjectName();
1577 return (proxy != null && notification
1578 .getUserData()
1579 .equals(proxy));
1580 }
1581 return false;
1582 }
1583 };
1584
1585 this.broadCaster.addNotificationListener(t, t, new Object());
1586
1587 c.createMBean("org.archive.crawler.Heritrix", beanName);
1588
1589 return (ObjectName) t.get(30 * 1000, TimeUnit.MILLISECONDS);
1590
1591 } catch (Exception e) {
1592 e.printStackTrace();
1593 throw new Exception(e);
1594 } finally {
1595 if (t != null) {
1596 this.broadCaster.removeNotificationListener(t);
1597 }
1598 }
1599 }
1600
1601 private Container getContainerOn(InetSocketAddress address) {
1602 for (ObjectName n : this.containers.keySet()) {
1603 if (address.equals(JmxUtils.extractAddress(n))) {
1604 return this.containers.get(n);
1605 }
1606 }
1607 return null;
1608 }
1609
1610 protected ObjectName addCrawler(ObjectName newCrawler) {
1611 try {
1612
1613 boolean contains = (remoteNameToCrawlerMap.containsKey(newCrawler));
1614 if (!contains) {
1615 ObjectName proxyName = createClientProxyName(newCrawler);
1616
1617 InetSocketAddress address = JmxUtils.extractAddress(newCrawler);
1618 MBeanServerConnection c = this.connections.get(address);
1619 if (c == null) {
1620 throw new RuntimeException("No connection found on "
1621 + address + "- this should never happen");
1622 }
1623
1624
1625
1626
1627
1628 DynamicMBean proxy = (DynamicMBean) Proxy.newProxyInstance(
1629 DynamicMBean.class.getClassLoader(),
1630 new Class[] { DynamicMBean.class },
1631 new RemoteMBeanInvocationHandler(
1632 newCrawler,
1633 proxyName,
1634 c,
1635 spyListener));
1636
1637 Container container = getContainerOn(address);
1638
1639 if (container == null) {
1640 throw new RuntimeException(
1641 "Container should never be null: " + address);
1642 }
1643
1644 Crawler crawler = new Crawler(proxy, null, container);
1645 container.addCrawler(crawler);
1646 remoteNameToCrawlerMap.put(new RemoteMapKey(newCrawler), crawler);
1647 this.mbeanServer.registerMBean(proxy, proxyName);
1648
1649 Hashtable props = new Hashtable();
1650 props.put(JmxUtils.TYPE, JmxUtils.JOB);
1651 props.put(JmxUtils.MOTHER, newCrawler
1652 .getKeyProperty(JmxUtils.NAME));
1653
1654 try {
1655
1656 Set<ObjectName> jobs = c.queryNames(null,null);
1657 for(ObjectName job : jobs){
1658 if (JmxUtils.JOB.equals(job.getKeyProperty(JmxUtils.TYPE)) &&
1659 newCrawler
1660 .getKeyProperty(JmxUtils.NAME).equals(
1661 job.getKeyProperty(JmxUtils.MOTHER))
1662
1663 ) {
1664 addCrawlJob(job);
1665 break;
1666 }
1667
1668 }
1669 } catch (Exception e) {
1670 e.printStackTrace();
1671 }
1672 return proxyName;
1673 }
1674 } catch (InstanceAlreadyExistsException e) {
1675 e.printStackTrace();
1676 } catch (MBeanRegistrationException e) {
1677 e.printStackTrace();
1678 } catch (NotCompliantMBeanException e) {
1679 e.printStackTrace();
1680 } catch (IllegalArgumentException e) {
1681 e.printStackTrace();
1682 }
1683 return null;
1684 }
1685
1686 protected void handleCrawlerCreated(ObjectName newCrawler) {
1687 ObjectName proxyName = addCrawler(newCrawler);
1688 if (proxyName != null) {
1689 fireCrawlerCreated(proxyName);
1690 }
1691 }
1692
1693 private void fireCrawlerDestroyed(ObjectName proxyName) {
1694 fireNotification(
1695 proxyName,
1696 ClusterControllerNotification.CRAWL_SERVICE_DESTROYED_NOTIFICATION.getKey());
1697 }
1698
1699 private void fireNotification(ObjectName name, String type) {
1700 Notification n = new Notification(type, this.name, System
1701 .currentTimeMillis());
1702 n.setUserData(name);
1703 fireNotificationEvent(n);
1704 if (log.isLoggable(Level.INFO)) {
1705 log.info("name=" + name + "; type=" + type);
1706 }
1707 }
1708
1709 private void fireCrawlerCreated(ObjectName proxyName) {
1710 fireNotification(
1711 proxyName,
1712 ClusterControllerNotification.
1713 CRAWL_SERVICE_CREATED_NOTIFICATION.getKey());
1714 }
1715
1716 private ObjectName createClientProxyName(ObjectName remote) {
1717 try {
1718
1719 Hashtable lp = this.name.getKeyPropertyList();
1720 Hashtable cp = new Hashtable();
1721 Hashtable rp = remote.getKeyPropertyList();
1722 cp.put(JmxUtils.TYPE, rp.get(JmxUtils.TYPE));
1723 cp.put(JmxUtils.NAME, rp.get(JmxUtils.NAME));
1724 cp.put(JmxUtils.HOST, lp.get(JmxUtils.HOST));
1725 cp.put(JmxUtils.JMX_PORT, lp.get(JmxUtils.JMX_PORT));
1726 cp.put("remoteHost", rp.get(JmxUtils.HOST));
1727 cp.put("remoteJmxPort", rp.get(JmxUtils.JMX_PORT));
1728
1729 return new ObjectName(this.name.getDomain(), cp);
1730 } catch (MalformedObjectNameException e) {
1731 e.printStackTrace();
1732 throw new RuntimeException(e);
1733 } catch (NullPointerException e) {
1734 e.printStackTrace();
1735 throw new RuntimeException(e);
1736 }
1737
1738 }
1739
1740 public static void main(String[] args) {
1741 ClusterControllerBean b = new ClusterControllerBean();
1742 b.init();
1743 }
1744 }