View Javadoc

1   /* $Id: ClusterControllerBean.java 5979 2008-08-22 21:21:33Z dbernstein $
2    *
3    * Created on Dec 12, 2005
4    *
5    * Copyright (C) 2005 Internet Archive.
6    *  
7    * This file is part of the Heritrix Cluster Controller (crawler.archive.org).
8    *  
9    * HCC is free software; you can redistribute it and/or modify
10   * it under the terms of the GNU Lesser Public License as published by
11   * the Free Software Foundation; either version 2.1 of the License, or
12   * any later version.
13   * 
14   * Heritrix is distributed in the hope that it will be useful, 
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   * GNU Lesser Public License for more details.
18   * 
19   * You should have received a copy of the GNU Lesser Public License
20   * along with Heritrix; if not, write to the Free Software
21   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22   */
23  package org.archive.hcc;
24  
25  import java.io.IOException;
26  import java.lang.reflect.Proxy;
27  import java.net.InetSocketAddress;
28  import java.util.Collection;
29  import java.util.Collections;
30  import java.util.Comparator;
31  import java.util.Date;
32  import java.util.HashMap;
33  import java.util.Hashtable;
34  import java.util.Iterator;
35  import java.util.LinkedList;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Properties;
39  import java.util.Set;
40  import java.util.Timer;
41  import java.util.TimerTask;
42  import java.util.concurrent.TimeUnit;
43  import java.util.logging.Level;
44  import java.util.logging.Logger;
45  
46  import javax.management.Attribute;
47  import javax.management.AttributeList;
48  import javax.management.AttributeNotFoundException;
49  import javax.management.DynamicMBean;
50  import javax.management.InstanceAlreadyExistsException;
51  import javax.management.InstanceNotFoundException;
52  import javax.management.InvalidAttributeValueException;
53  import javax.management.ListenerNotFoundException;
54  import javax.management.MBeanException;
55  import javax.management.MBeanInfo;
56  import javax.management.MBeanNotificationInfo;
57  import javax.management.MBeanRegistration;
58  import javax.management.MBeanRegistrationException;
59  import javax.management.MBeanServer;
60  import javax.management.MBeanServerConnection;
61  import javax.management.MBeanServerFactory;
62  import javax.management.MalformedObjectNameException;
63  import javax.management.NotCompliantMBeanException;
64  import javax.management.Notification;
65  import javax.management.NotificationBroadcasterSupport;
66  import javax.management.NotificationEmitter;
67  import javax.management.NotificationFilter;
68  import javax.management.NotificationListener;
69  import javax.management.ObjectName;
70  import javax.management.ReflectionException;
71  import javax.management.openmbean.ArrayType;
72  import javax.management.openmbean.CompositeData;
73  import javax.management.openmbean.OpenDataException;
74  import javax.management.openmbean.OpenMBeanAttributeInfo;
75  import javax.management.openmbean.OpenMBeanAttributeInfoSupport;
76  import javax.management.openmbean.OpenMBeanConstructorInfo;
77  import javax.management.openmbean.OpenMBeanInfoSupport;
78  import javax.management.openmbean.OpenMBeanOperationInfo;
79  import javax.management.openmbean.OpenMBeanOperationInfoSupport;
80  import javax.management.openmbean.OpenMBeanParameterInfo;
81  import javax.management.openmbean.OpenMBeanParameterInfoSupport;
82  import javax.management.openmbean.SimpleType;
83  import javax.management.openmbean.TabularData;
84  import javax.naming.Context;
85  import javax.naming.NameClassPair;
86  import javax.naming.NamingEnumeration;
87  import javax.naming.NamingException;
88  
89  import org.archive.hcc.util.ClusterControllerNotification;
90  import org.archive.hcc.util.NotificationDelegator;
91  import org.archive.hcc.util.SmartPropertiesResolver;
92  import org.archive.hcc.util.Delegator.DelegatorPolicy;
93  import org.archive.hcc.util.jmx.MBeanFutureTask;
94  import org.archive.hcc.util.jmx.MBeanOperation;
95  import org.archive.hcc.util.jmx.MBeanServerConnectionFactory;
96  import org.archive.hcc.util.jmx.OpenMBeanInvocationManager;
97  import org.archive.hcc.util.jmx.RegistrationNotificationHandler;
98  import org.archive.hcc.util.jmx.SimpleReflectingMBeanOperation;
99  import org.archive.util.JmxUtils;
100 import org.archive.util.JndiUtils;
101 
102 /***
103  * As the main workhorse of the package, the <code>ClusterControllerBean</code>
104  * provides a unified view of any number of Heritrix instances and all related
105  * objects within a JNDI scope.
106  * 
107  * @author Daniel Bernstein (dbernstein@archive.org)
108  */
109 public class ClusterControllerBean implements
110         DynamicMBean,
111         NotificationEmitter,
112         MBeanRegistration {
113 
114     /***
115      * The Jndi context
116      */
117     private Context context;
118 
119     /***
120      * logger
121      */
122     private static Logger log = Logger.getLogger(ClusterControllerBean.class
123             .getName());
124 
125     /***
126      * A timer thread that polls the jndi for new containers.
127      */
128     private Timer jndiPoller;
129 
130     /***
131      * A single instance of notification listener that simply forwards messages
132      * from source beans to listeners.
133      */
134     private NotificationDelegator remoteNotificationDelegator;
135 
136     /***
137      * poll period in seconds.
138      */
139     private static final int JNDI_POLL_PERIOD_IN_SECONDS = 60;
140 
141     /***
142      * A map of mbean server connections mapped by address. TODO make
143      * configuratable
144      */
145     private Map<InetSocketAddress, MBeanServerConnection> connections =
146         new HashMap<InetSocketAddress, MBeanServerConnection>();
147 
148     /***
149      * A list of remote container references
150      */
151     private Map<ObjectName, Container> containers;
152 
153     private NotificationBroadcasterSupport broadCaster;
154 
155     /***
156      * Manages and delegates OpenMBean invocations
157      */
158     private OpenMBeanInvocationManager invocationManager;
159 
160     /***
161      * Defines the open mbean interface for the bean.
162      */
163     private MBeanInfo info;
164 
165     /***
166      * A local mbean server
167      */
168     private MBeanServer mbeanServer;
169 
170     /***
171      * The MBean name of the controller.
172      */
173     private ObjectName name;
174 
175     /***
176      * A map of the remote crawler handles to the local Crawler dynamic proxy
177      * instances.
178      */
179     private Map<RemoteMapKey, Crawler> remoteNameToCrawlerMap =
180         new HashMap<RemoteMapKey, Crawler>();
181 
182     /***
183      * Watches all traffic (invocations and notifications) moving between the
184      * remote Heritrix instances and the heritrix cluster controller.
185      */
186     private NotificationListener spyListener;
187     
188     /***
189      * Upperbound on crawlers per container.
190      */
191     private int defaultMaxPerContainer = 1;
192 
193     /***
194      * Creates a cluster controller bean. This object uses a two step
195      * initialization pattern. Therefore you must call init() before invoking
196      * any options.
197      */
198     public ClusterControllerBean() {
199         this.remoteNotificationDelegator = buildRemoteNotificationDelegator();
200         jndiPoller = new Timer();
201         this.broadCaster = new NotificationBroadcasterSupport();
202         this.invocationManager = new OpenMBeanInvocationManager();
203         this.info = buildOpenMBeanInfo();
204         this.mbeanServer = createMBeanServer();
205         this.containers = new HashMap<ObjectName, Container>();
206         this.spyListener = new NotificationListener() {
207             public void handleNotification(
208                     Notification notification,
209                     Object handback) {
210 
211                 if (log.isLoggable(Level.FINER)) {
212                     log.finer(">>>>>>>>>spyListener: notification="
213                             + notification);
214                 }
215 
216                 fireNotificationEvent(notification);
217             }
218         };
219     }
220     
221     private class RemoteMapKey {
222     	private ObjectName name;
223     	private String id;
224     	public ObjectName getObjectName(){
225     		return name;
226     		
227     	}
228     	
229     	public RemoteMapKey(ObjectName on){
230     		this.name = on;
231     		this.id = extractId(on);
232     	}
233     	
234     	@Override
235     	public boolean equals(Object obj) {
236     		if(!(obj instanceof RemoteMapKey)){
237     			return false;
238     		}
239     		
240     		RemoteMapKey o = (RemoteMapKey)obj;
241     		
242     		return (o.id.equals(id));
243     	}
244     	
245 	    private String extractId(ObjectName name){
246 	    	StringBuffer b = new StringBuffer();
247 	    	b.append(name.getKeyProperty(JmxUtils.NAME));
248 	    	b.append(name.getKeyProperty(JmxUtils.HOST));
249 	    	b.append(name.getKeyProperty(JmxUtils.JMX_PORT));
250 	    	b.append(name.getKeyProperty(JmxUtils.TYPE));
251 	    	return b.toString();
252 	    }
253     	@Override
254     	public int hashCode() {
255     		// TODO Auto-generated method stub
256     		return id.hashCode();
257     	}
258     }
259 
260     private static MBeanServer createMBeanServer() {
261         MBeanServer result = null;
262         List servers = MBeanServerFactory.findMBeanServer(null);
263         if (servers == null) {
264             return result;
265         }
266         for (Iterator i = servers.iterator(); i.hasNext();) {
267             MBeanServer server = (MBeanServer) i.next();
268             if (server == null) {
269                 continue;
270             }
271             result = server;
272             break;
273         }
274         return result;
275     }
276 
277     /***
278      * @return Returns the total count of all crawlers within the cluster.
279      */
280     public Integer getTotalCrawlerCount() {
281         return this.remoteNameToCrawlerMap.size();
282     }
283 
284     private OpenMBeanInfoSupport buildOpenMBeanInfo() {
285        return new OpenMBeanInfoSupport(
286                 ClusterControllerBean.class.getName(),
287                 "A controller for a pool of Heritrix "
288                         + "containers and their contents.",
289                 buildAttributes(),
290                 buildConstructors(),
291                 buildOperations(),
292                 buildNotifications());
293     }
294 
295     private OpenMBeanAttributeInfo[] buildAttributes() {
296         try {
297             return new OpenMBeanAttributeInfo[] {
298                 new OpenMBeanAttributeInfoSupport(
299                     "TotalCrawlerCount",
300                     "Total number of crawlers that are "
301                             + "currently initialized in the system.",
302                     SimpleType.INTEGER,
303                     true,
304                     false,
305                     false,
306                     new Integer(0)) };
307         } catch (OpenDataException e) {
308             throw new RuntimeException(e);
309         }
310     }
311 
312     private OpenMBeanConstructorInfo[] buildConstructors() {
313         return new OpenMBeanConstructorInfo[0];
314     }
315 
316     private boolean uidMatches(TabularData td, String jobUid) {
317         if (jobUid == null) {
318             throw new NullPointerException("jobUid=" + jobUid);
319         }
320 
321         if (td == null) {
322             return false;
323         }
324 
325         for (Object o : td.values()) {
326             CompositeData cd = (CompositeData) o;
327             if (jobUid.equals(cd.get("uid"))) {
328                 return true;
329             }
330         }
331 
332         return false;
333     }
334 
335     
336     public ObjectName findCrawlServiceJobParent(
337             String jobUid,
338             String host,
339             Integer port) {
340 
341         InetSocketAddress remoteAddress = new InetSocketAddress(host, port);
342         Container container = getContainerOn(remoteAddress);
343 
344         if (container == null) {
345             return null;
346         }
347 
348         List<Crawler> crawlers = 
349         	new LinkedList<Crawler>(container.getCrawlers());
350         for (Crawler crawler : crawlers) {
351             DynamicMBean p = crawler.getCrawlServiceProxy();
352             try {
353 
354                 // check if currently running
355                 ObjectName cjp = crawler.getCrawlJobProxyObjectName();
356                 ObjectName csp = crawler.getCrawlServiceProxyObjectName();
357 
358                 if (cjp != null) {
359                     if (JmxUtils.getUid(cjp).equals(jobUid)) {
360                         return csp;
361                     }
362                 }
363 
364                 // check completed
365                 TabularData td = (TabularData) p.invoke(
366                         "completedJobs",
367                         new Object[0],
368                         new String[0]);
369 
370                 if (uidMatches(td, jobUid)) {
371                     return csp;
372                 }
373 
374                 // check pending
375                 td = (TabularData) p.invoke(
376                         "pendingJobs",
377                         new Object[0],
378                         new String[0]);
379 
380                 if (uidMatches(td, jobUid)) {
381                     return csp;
382                 }
383             } catch (MBeanException e) {
384                 e.printStackTrace();
385             } catch (ReflectionException e) {
386                 e.printStackTrace();
387             }
388         }
389 
390         return null;
391     }
392 
393     
394     /***
395      * Returns the current job object name associated with the specified crawler. 
396      * Returns null if no job was running.
397      * @param mother
398      * @return
399      */
400     public ObjectName getCurrentCrawlJob(ObjectName mother) {
401         InetSocketAddress remoteAddress = org.archive.hcc.util.JmxUtils.extractRemoteAddress(mother);
402         Container container = getContainerOn(remoteAddress);
403 
404         if (container == null) {
405             return null;
406         }
407 
408         for (Crawler crawler : new LinkedList<Crawler>(container.getCrawlers())) {
409             if(crawler.getCrawlServiceProxyObjectName().equals(mother)){
410                 return  crawler.getCrawlJobProxyObjectName();
411             }
412         }
413         return null;
414     }
415     
416     /***
417      * Returns the maximum number of instances allowed for this container.
418      * If the container does not exist, -1 is returned.
419      * @param hostname
420      * @param port
421      * @return
422      */
423     public int getMaxInstances(String hostname, Integer port){
424     	Collection<Container> list = 
425     		new LinkedList<Container>(this.containers.values());
426     	
427     	for(Container c : list){
428     		InetSocketAddress a = JmxUtils.extractAddress(c.getName());
429     		if(a.getHostName().equals(hostname) && port == a.getPort()){
430     			return c.getMaxInstances();
431     		}
432     	}
433     	
434     	return -1;
435     }
436 
437     /***
438      * Sets the maximum number of instances that may run on a 
439      * specified container defined by a host and port.
440      * @param hostname
441      * @param port
442      * @param maxInstances
443      */
444     public void setMaxInstances(String hostname, Integer port, Integer maxInstances){
445     	Collection<Container> list = 
446     		new LinkedList<Container>(this.containers.values());
447     	
448     	if(maxInstances < -1){
449     		maxInstances = -1;
450     	}
451     	
452     	for(Container c : list){
453     		InetSocketAddress a = JmxUtils.extractAddress(c.getName());
454     		if(a.getHostName().equals(hostname) && port == a.getPort()){
455     			c.setMaxInstances(maxInstances);
456     			break;
457     		}
458     	}
459     }
460     
461     /***
462      * Creates a new crawler on the least loaded machine on the cluster.
463      * 
464      * @return ObjectName of created crawler.
465      * @throws MBeanException
466      */
467     public ObjectName createCrawler() throws MBeanException {
468         List<Container> containers = resolveLeastLoadedContainers();
469 
470         if (containers != null) {
471             for(Container container : containers){
472                 try {
473                 	log.info("attempting to create crawler on container: " + container.getName());
474                     return createCrawlerIn(container);
475                 } catch (Exception e) {
476                 	log.warning("unexpected error!!! failed to create crawler as expected on " + container.getName());
477                 	e.printStackTrace();
478                 }
479             	
480             }
481             
482             log.severe("unable to start any crawlers on container due to communication " +
483             		"failure with all available containers.");
484         }
485 
486         MBeanException e =  new MBeanException(
487                 new Exception(
488                 "No space available in remote"+
489                 " containers for new crawler " + "instances."),
490                 "insufficent crawler resources"
491         );
492         
493         throw e;
494 
495     }
496 
497 //    /***
498 //     * @return Currently returns the least loaded going by a dumb count.
499 //     */
500 //    protected Container resolveLeastLoadedContainer() {
501 //
502 //        Container leastLoaded = null;
503 //
504 //        for (Container n : this.containers.values()) {
505 //            if (n.getCrawlers().size() >= n.getMaxInstances()) {
506 //                continue;
507 //            }
508 //            if (leastLoaded == null) {
509 //                leastLoaded = n;
510 //            }
511 //
512 //            if (n.getCrawlers().size() < leastLoaded.getCrawlers().size()) {
513 //                leastLoaded = n;
514 //            }
515 //        }
516 //
517 //        return leastLoaded;
518 //    }
519 
520     /***
521      * @return A list of available (ie not fully loaded) containers sorted from least loaded to most loaded. If no
522      * containers are available, returns null.
523      */
524     protected List<Container> resolveLeastLoadedContainers() {
525 
526         List<Container>leastLoaded = null;
527         Container last = null;
528         
529         List<Container> currentContainers = new LinkedList<Container>(this.containers.values());
530         
531         Collections.sort(currentContainers, new Comparator(){
532         	public int compare(Object o1, Object o2) {
533         		Container c1 = (Container)o1;
534         		Container c2 = (Container)o2;
535         		return new Integer(c1.getCrawlers().size()).compareTo(new Integer(c2.getCrawlers().size()));
536         	}
537         });
538 
539         for (Container n : currentContainers) {
540             if (n.getCrawlers().size() >= n.getMaxInstances()) {
541                 continue;
542             }
543             
544             if(leastLoaded == null){
545             	leastLoaded = new LinkedList<Container>();
546             }
547            
548             leastLoaded.add(n);
549         }
550 
551         return leastLoaded;
552     }
553 
554     private OpenMBeanOperationInfo[] buildOperations() {
555         try {
556 
557             addOperation(new SimpleReflectingMBeanOperation(
558                     ClusterControllerBean.this,
559                     new OpenMBeanOperationInfoSupport(
560                             "createCrawler",
561                             "creates a new crawler on the cluster",
562                             null,
563                             SimpleType.OBJECTNAME,
564                             OpenMBeanOperationInfoSupport.ACTION_INFO)));
565 
566             addOperation(new SimpleReflectingMBeanOperation(
567                     ClusterControllerBean.this,
568                     new OpenMBeanOperationInfoSupport(
569                             "destroy",
570                             "Effectively \"detaches\" the bean from the " +
571                                 "containers, crawl services and jobs that it " +
572                                 "is managing. The remote objects are not " +
573                                 "affected. ",
574                             null,
575                             SimpleType.VOID,
576                             OpenMBeanOperationInfoSupport.ACTION)));
577 
578             addOperation(new SimpleReflectingMBeanOperation(
579                     ClusterControllerBean.this,
580                     new OpenMBeanOperationInfoSupport(
581                             "listCrawlers",
582                             "lists crawlers associated with the cluster",
583                             null,
584                             new ArrayType(1, SimpleType.OBJECTNAME),
585                             OpenMBeanOperationInfoSupport.INFO)));
586 
587 
588             addOperation(new SimpleReflectingMBeanOperation(
589                     ClusterControllerBean.this,
590                     new OpenMBeanOperationInfoSupport(
591                             "destroyAllCrawlers",
592                             "destroys all crawlers that are managed by the cluster" +
593                             " controller",
594                             null,
595                             SimpleType.VOID,
596                             OpenMBeanOperationInfoSupport.ACTION)));
597 
598             
599             addOperation(new SimpleReflectingMBeanOperation(
600                     ClusterControllerBean.this,
601                     new OpenMBeanOperationInfoSupport(
602                             "pauseAllJobs",
603                             "pauses all jobs that are managed by the cluster" +
604                             " controller",
605                             null,
606                             SimpleType.BOOLEAN,
607                             OpenMBeanOperationInfoSupport.ACTION)));
608             
609             addOperation(new SimpleReflectingMBeanOperation(
610                     ClusterControllerBean.this,
611                     new OpenMBeanOperationInfoSupport(
612                             "resumeAllPausedJobs",
613                             "resumes crawling of all paused jobs that are managed by the cluster" +
614                             " controller",
615                             null,
616                             SimpleType.BOOLEAN,
617                             OpenMBeanOperationInfoSupport.ACTION)));
618 
619             
620             addOperation(new SimpleReflectingMBeanOperation(
621                     ClusterControllerBean.this,
622                     new OpenMBeanOperationInfoSupport(
623                             "findCrawlServiceJobParent", "returns the parent" +
624                                 "name of the specified crawl job.",
625                             new OpenMBeanParameterInfo[] {
626                                     new OpenMBeanParameterInfoSupport(
627                                             "uid",
628                                             "The job's uid",
629                                             SimpleType.STRING),
630                                     new OpenMBeanParameterInfoSupport(
631                                             "remoteHost",
632                                             "The remote host name",
633                                             SimpleType.STRING),
634                                     new OpenMBeanParameterInfoSupport(
635                                             "remotePort",
636                                             "The remote port",
637                                             SimpleType.INTEGER) },
638                             SimpleType.OBJECTNAME,
639                             OpenMBeanOperationInfoSupport.INFO)));
640             
641             addOperation(new SimpleReflectingMBeanOperation(
642                     ClusterControllerBean.this,
643                     new OpenMBeanOperationInfoSupport(
644                             "getCurrentCrawlJob", "returns the current " +
645                                 "crawl job name of the crawl job running" +
646                                 " on the specified crawler Returns null" +
647                                 " if either the mother is not found or " +
648                                 "if the crawler does not have a current job.",
649                             new OpenMBeanParameterInfo[] {
650                                     new OpenMBeanParameterInfoSupport(
651                                             "mother",
652                                             "The job's mother (CrawlService)",
653                                             SimpleType.OBJECTNAME)},
654                             SimpleType.OBJECTNAME,
655                             OpenMBeanOperationInfoSupport.INFO)));
656             
657 
658             addOperation(new SimpleReflectingMBeanOperation(
659                     ClusterControllerBean.this,
660                     new OpenMBeanOperationInfoSupport(
661                             "setMaxInstances", "sets the max number of " +
662                             		"instances of heritrix that a heritrix " +
663                             		"enabled jvm can serve.",
664                             new OpenMBeanParameterInfo[] {
665                                     new OpenMBeanParameterInfoSupport(
666                                             "host",
667                                             "The jvm's host",
668                                             SimpleType.STRING),
669                                     new OpenMBeanParameterInfoSupport(
670                                             "port",
671                                             "The jvm's jmx port",
672                                             SimpleType.INTEGER),
673                                     new OpenMBeanParameterInfoSupport(
674                                             "maxInstances",
675                                             "The max number of instances",
676                                             SimpleType.INTEGER) },
677                             SimpleType.VOID,
678                             OpenMBeanOperationInfoSupport.ACTION)));  
679             
680 
681             addOperation(new SimpleReflectingMBeanOperation(
682                     ClusterControllerBean.this,
683                     new OpenMBeanOperationInfoSupport(
684                             "getMaxInstances", "returns the max number of " +
685                             		"instances of heritrix that a heritrix " +
686                             		"enabled jvm can serve.",
687                             new OpenMBeanParameterInfo[] {
688                                     new OpenMBeanParameterInfoSupport(
689                                             "host",
690                                             "The jvm's host",
691                                             SimpleType.STRING),
692                                     new OpenMBeanParameterInfoSupport(
693                                             "port",
694                                             "The jvm's jmx port",
695                                             SimpleType.INTEGER)
696                                             },
697                             SimpleType.INTEGER,
698                             OpenMBeanOperationInfoSupport.INFO)));     
699             
700         } catch (OpenDataException e) {
701             e.printStackTrace();
702         }
703 
704         return this.invocationManager.getInfo();
705     }
706 
707     /***
708      * @return Returns a list of all crawler instances within the hcc's jndi
709      * scope.
710      */
711     public ObjectName[] listCrawlers() {
712         int size = this.remoteNameToCrawlerMap.keySet().size();
713         ObjectName[] crawlers = new ObjectName[size];
714         int count = 0;
715         for (Crawler c : this.remoteNameToCrawlerMap.values()) {
716             crawlers[count] = c.getCrawlServiceProxyObjectName();
717             count++;
718         }
719         return crawlers;
720     }
721     
722     public void destroyAllCrawlers(){
723     	List<Crawler> list = 
724     		new LinkedList<Crawler>(this.remoteNameToCrawlerMap.values());
725     	for(Crawler c : list){
726     		try {
727 				c.getCrawlServiceProxy().invoke("destroy", new Object[0], new String[0]);
728 			} catch (MBeanException e) {
729 				// TODO Auto-generated catch block
730 				e.printStackTrace();
731 			} catch (ReflectionException e) {
732 				// TODO Auto-generated catch block
733 				e.printStackTrace();
734 			}
735     	}
736     }
737 
738 
739     public boolean pauseAllJobs(){
740     	List<Crawler> list = 
741     		new LinkedList<Crawler>(this.remoteNameToCrawlerMap.values());
742     	
743     	boolean success = true;
744     	for(Crawler c : list){
745     		try {
746     			DynamicMBean m = c.getCrawlJobProxy();
747     			if(m == null){
748     				continue;
749     			}
750     			String status = (String)m.getAttribute("Status");
751     			
752     			if("RUNNING".equals(status)){
753     				m.invoke("pause",  new Object[0], new String[0]);
754     			}
755 			} catch (AttributeNotFoundException e) {
756 				e.printStackTrace();
757 			} catch (MBeanException e) {
758 				e.printStackTrace();
759 				success = false;
760 			} catch (ReflectionException e) {
761 				e.printStackTrace();
762 				success = false;
763 			}
764     	}
765     	
766     	return success;
767     }
768     
769     public boolean resumeAllPausedJobs(){
770     	List<Crawler> list = 
771     		new LinkedList<Crawler>(this.remoteNameToCrawlerMap.values());
772     	
773     	boolean success = true;
774     	for(Crawler c : list){
775     		try {
776     			DynamicMBean m = c.getCrawlJobProxy();
777     			if(m == null){
778     				continue;
779     			}
780 
781     			String status = (String)m.getAttribute("Status");
782     			if("PAUSED".equals(status) || "PAUSING".equals(status)){
783     				m.invoke("resume",  new Object[0], new String[0]);
784     			}
785 			} catch (AttributeNotFoundException e) {
786 				e.printStackTrace();
787 			} catch (MBeanException e) {
788 				e.printStackTrace();
789 				success = false;
790 			} catch (ReflectionException e) {
791 				e.printStackTrace();
792 				success = false;
793 			}
794     	}
795     	
796     	return success;
797     }
798 
799     private void addOperation(MBeanOperation operation) {
800         this.invocationManager.addMBeanOperation(operation);
801     }
802 
803     private MBeanNotificationInfo[] buildNotifications() {
804         List<MBeanNotificationInfo> info =
805             new LinkedList<MBeanNotificationInfo>();
806 
807         info.add(new MBeanNotificationInfo(
808             new String[] {ClusterControllerNotification.
809                     CRAWL_SERVICE_CREATED_NOTIFICATION.getKey() },
810                 ClusterControllerBean.class.getName(),
811                 "Notifies when a new instance of the crawl service comes up"));
812 
813         info.add(new MBeanNotificationInfo(
814             new String[] { ClusterControllerNotification.
815                     CRAWL_SERVICE_DESTROYED_NOTIFICATION.getKey() },
816                 ClusterControllerBean.class.getName(),
817                 "Notifies when an instance of the crawl service goes away"));
818 
819         info.add(new MBeanNotificationInfo(
820             new String[] { ClusterControllerNotification.
821                     CRAWL_SERVICE_JOB_STARTED_NOTIFICATION.getKey() },
822                 ClusterControllerBean.class.getName(),
823                 "Notifies when a new crawl service job starts"));
824 
825         info.add(new MBeanNotificationInfo(
826             new String[] { ClusterControllerNotification.
827                     CRAWL_SERVICE_JOB_COMPLETED_NOTIFICATION.getKey() },
828                 ClusterControllerBean.class.getName(),
829                 "Notifies when a new instance of the crawl service job " +
830                 "completes"));
831 
832         return info.toArray(new MBeanNotificationInfo[0]);
833     }
834 
835     /***
836      * Initializes the cluster controller.
837      */
838     public void init() {
839         try {
840             Properties p =
841                 SmartPropertiesResolver.getProperties("hcc.properties");
842             this.defaultMaxPerContainer = Integer.parseInt(
843                 p.getProperty(ClusterControllerBean.class.getName() +
844                     ".maxPerContainer", "1"));
845             log.info("maxPerContainer setting: " + this.defaultMaxPerContainer);
846             
847             context = JndiUtils.getSubContext("org.archive.crawler");
848             this.name = new ObjectName("org.archive.hcc:"
849                     + "type=ClusterControllerBean"
850                     + ",host="
851                     + System.getenv("HOSTNAME")
852                     + ",jmxport="
853                     + System.getProperty(
854                             "com.sun.management.jmxremote.port",
855                             "8849"));
856 
857             refreshRegistry();
858 
859             initializeJndiPoller();
860 
861             registerMBean();
862 
863         } catch (MalformedObjectNameException e) {
864             // TODO Auto-generated catch block
865             e.printStackTrace();
866         } catch (NullPointerException e) {
867             // TODO Auto-generated catch block
868             e.printStackTrace();
869         } catch (NamingException e) {
870             e.printStackTrace();
871             throw new RuntimeException(e);
872         }
873     }
874 
875     private void registerMBean() {
876         try {
877             this.mbeanServer.registerMBean(this, this.name);
878         } catch (InstanceAlreadyExistsException e) {
879             // TODO Auto-generated catch block
880             e.printStackTrace();
881         } catch (MBeanRegistrationException e) {
882             // TODO Auto-generated catch block
883             e.printStackTrace();
884         } catch (NotCompliantMBeanException e) {
885             // TODO Auto-generated catch block
886             e.printStackTrace();
887         }
888     }
889 
890     /***
891      * Disconnects the cluster controller from the network. It unhooks the
892      * controller without acting on the remote instances.
893      */
894     public void destroy() {
895         try {
896             if (log.isLoggable(Level.INFO)) {
897                 log.info("destroying cluster controller.");
898             }
899 
900             jndiPoller.cancel();
901 
902             if (log.isLoggable(Level.INFO)) {
903                 log.info("cancelled jndi poller");
904             }
905 
906             jndiPoller = null;
907 
908             this.broadCaster = null;
909 
910             List<Container> containers = new LinkedList<Container>(
911                     this.containers.values());
912             for (Container container : containers) {
913                 dereferenceContainer(container);
914             }
915 
916             context.close();
917         } catch (NamingException e) {
918             e.printStackTrace();
919         } finally {
920             try {
921                 this.mbeanServer.unregisterMBean(this.name);
922             } catch (InstanceNotFoundException e) {
923                 // TODO Auto-generated catch block
924                 e.printStackTrace();
925             } catch (MBeanRegistrationException e) {
926                 // TODO Auto-generated catch block
927                 e.printStackTrace();
928             }
929         }
930     }
931 
932     private void dereferenceCrawler(Crawler crawler) {
933 
934         crawler.removeFromParent();
935         if(crawler.getCrawlJobRemoteObjectName()!= null){
936             this.remoteNameToCrawlerMap.remove(
937             		new RemoteMapKey(crawler.getCrawlJobRemoteObjectName()));
938         }
939 
940         try {
941             this.mbeanServer.unregisterMBean(crawler
942                     .getCrawlServiceProxyObjectName());
943         } catch (InstanceNotFoundException e) {
944             if (log.isLoggable(Level.WARNING)) {
945                 log.warning(e.getMessage());
946             }
947 
948             e.printStackTrace();
949 
950         } catch (MBeanRegistrationException e) {
951             if (log.isLoggable(Level.WARNING)) {
952                 log.warning(e.getMessage());
953             }
954 
955             e.printStackTrace();
956         }
957 
958         try {
959             ObjectName crawlJob = crawler.getCrawlJobProxyObjectName();
960             if (crawlJob != null) {
961                 this.mbeanServer.unregisterMBean(crawlJob);
962             }
963         } catch (InstanceNotFoundException e) {
964             if (log.isLoggable(Level.WARNING)) {
965                 log.warning(e.getMessage());
966             }
967 
968         } catch (MBeanRegistrationException e) {
969             if (log.isLoggable(Level.WARNING)) {
970                 log.warning(e.getMessage());
971             }
972 
973             e.printStackTrace();
974         }
975     }
976 
977     /***
978      * Initializes the notification forwarder.
979      * @return Returns notification delegator.
980      */
981     private NotificationDelegator buildRemoteNotificationDelegator() {
982         NotificationDelegator d = new NotificationDelegator(
983                 DelegatorPolicy.ACCEPT_FIRST) {
984             protected boolean delegate(Notification n, Object handback) {
985                 return super.delegate(n, handback);
986             }
987         };
988 
989         d.addDelegatable(new CrawlJobLifecycleNotificationHandler());
990         d.addDelegatable(new CrawlerLifecycleNotificationHandler());
991         d.addDelegatable(new ContainerLifecycleNotificationHandler());
992 
993         return d;
994     }
995 
996     private void fireNotificationEvent(Notification n) {
997         this.broadCaster.sendNotification(n);
998     }
999 
1000     private class CrawlerLifecycleNotificationHandler
1001             extends
1002                 RegistrationNotificationHandler {
1003         @Override
1004         protected String getType() {
1005             return JmxUtils.SERVICE;
1006         }
1007 
1008         @Override
1009         protected void handleRegistered(ObjectName name) {
1010             handleCrawlerCreated(name);
1011         }
1012 
1013         @Override
1014         protected void handleUnregistered(ObjectName name) {
1015             handleCrawlerRemoved(name);
1016         }
1017     }
1018 
1019     private class ContainerLifecycleNotificationHandler
1020             extends
1021                 RegistrationNotificationHandler {
1022         @Override
1023         protected String getType() {
1024             return "container";
1025         }
1026 
1027         @Override
1028         protected void handleRegistered(ObjectName name) {
1029             // Empty.
1030         }
1031 
1032         @Override
1033         protected void handleUnregistered(ObjectName name) {
1034             handleContainerRemoved(name);
1035         }
1036     }
1037 
1038     private class CrawlJobLifecycleNotificationHandler
1039             extends
1040                 RegistrationNotificationHandler {
1041         @Override
1042         protected String getType() {
1043             return JmxUtils.JOB;
1044         }
1045 
1046         @Override
1047         protected void handleRegistered(ObjectName name) {
1048             handleJobAdded(name);
1049         }
1050 
1051         @Override
1052         protected void handleUnregistered(ObjectName name) {
1053             handleJobRemoved(name);
1054         }
1055     }
1056 
1057     protected void handleJobRemoved(ObjectName job) {
1058     	if(log.isLoggable(Level.INFO)){
1059     		log.info("entering: job=" + job);
1060     	}
1061         // locate crawler
1062         Crawler c = getJobContext(job);
1063         
1064         if(c == null){
1065         	if(log.isLoggable(Level.WARNING)){
1066         		log.warning("no crawler context found for job=" + job);
1067         	}
1068         	
1069         	return;
1070         }
1071         // remove job reference from crawler
1072         // unregister job proxy.
1073         ObjectName jobProxy = c.getCrawlJobProxyObjectName();
1074         if(jobProxy == null){
1075         	if(log.isLoggable(Level.WARNING)){
1076         		log.warning("jobProxy was not found on crawler=" + c.getCrawlJobProxyObjectName());
1077         	}
1078         	return;
1079         }
1080         
1081         try {
1082             this.mbeanServer.unregisterMBean(jobProxy);
1083         } catch (InstanceNotFoundException e) {
1084        		log.severe("failed to unregister job proxy: " + jobProxy  + "; remote job=" + job + "; error.class=" + e.getClass() + ";  error.message=" + e.getMessage());
1085 
1086             e.printStackTrace();
1087         } catch (MBeanRegistrationException e) {
1088        		log.severe("failed to unregister job proxy: " + jobProxy  + "; remote job=" + job  + "; error.class=" + e.getClass() + "; message=" + e.getMessage());
1089             e.printStackTrace();
1090         }
1091 
1092         c.setCrawlJobProxy(null);
1093         fireNotification(
1094                 jobProxy,
1095                 ClusterControllerNotification.
1096                     CRAWL_SERVICE_JOB_COMPLETED_NOTIFICATION.getKey());
1097             
1098     	if(log.isLoggable(Level.INFO)){
1099     		log.info("exitting successfully: job=" + job);
1100     	}
1101             
1102     }
1103 
1104     private boolean isJobOnCrawler(ObjectName job, ObjectName crawler) {
1105         
1106     	return equals(job, crawler, JmxUtils.JMX_PORT)
1107                 && equals(job, crawler, JmxUtils.HOST)
1108                 && job.getKeyProperty(JmxUtils.MOTHER).equals(
1109                         crawler.getKeyProperty(JmxUtils.NAME));
1110     }
1111 
1112     private Crawler getJobContext(ObjectName job) {
1113         for (RemoteMapKey key : this.remoteNameToCrawlerMap.keySet()) {
1114             if (isJobOnCrawler(job, key.getObjectName())) {
1115                 return this.remoteNameToCrawlerMap.get(key);
1116             }
1117         }
1118 
1119         throw new NullPointerException(
1120                 "shouldn't happen: no crawler found for job: " + job);
1121     }
1122 
1123     protected void handleJobAdded(ObjectName job) {
1124         try {
1125             ObjectName proxyName = addCrawlJob(job);
1126             // fire event
1127             fireNotification(
1128                     proxyName,
1129                     ClusterControllerNotification.
1130                         CRAWL_SERVICE_JOB_STARTED_NOTIFICATION.getKey());
1131         } catch (RuntimeException e) {
1132             // TODO Auto-generated catch block
1133             e.printStackTrace();
1134         }
1135     }
1136 
1137     private ObjectName addCrawlJob(ObjectName job) {
1138         InetSocketAddress address = JmxUtils.extractAddress(job);
1139         Crawler crawler = getJobContext(job);
1140         ObjectName proxyName = createClientProxyName(job);
1141         DynamicMBean proxy = (DynamicMBean) Proxy.newProxyInstance(
1142                 DynamicMBean.class.getClassLoader(),
1143                 new Class[] { DynamicMBean.class, NotificationEmitter.class },
1144                 new RemoteMBeanInvocationHandler(
1145                         job,
1146                         proxyName,
1147                         this.connections.get(address),
1148                         spyListener));
1149 
1150         crawler.setCrawlJobProxy(proxy);
1151         try {
1152             this.mbeanServer.registerMBean(proxy, proxyName);
1153         } catch (InstanceAlreadyExistsException e) {
1154             // TODO Auto-generated catch block
1155             e.printStackTrace();
1156         } catch (MBeanRegistrationException e) {
1157             // TODO Auto-generated catch block
1158             e.printStackTrace();
1159         } catch (NotCompliantMBeanException e) {
1160             // TODO Auto-generated catch block
1161             e.printStackTrace();
1162         }
1163 
1164         ((NotificationEmitter) proxy).addNotificationListener(
1165                 this.remoteNotificationDelegator,
1166                 null,
1167                 new Object());
1168 
1169         return proxyName;
1170     }
1171 
1172     protected boolean equals(ObjectName a, ObjectName b, String key) {
1173         Object va = a.getKeyProperty(key);
1174         Object vb = b.getKeyProperty(key);
1175 
1176         if (!(va != null && vb != null)) {
1177             return false;
1178         }
1179 
1180         return va.equals(vb);
1181     }
1182 
1183     protected void handleContainerRemoved(ObjectName name) {
1184 
1185         for (Container c : new LinkedList<Container>(containers.values())) {
1186             if (c.getName().equals(name)) {
1187                 for (Crawler crawler : c.getCrawlers()) {
1188                     removeCrawlerAndNotify(crawler);
1189                 }
1190                 dereferenceContainer(c);
1191                 break;
1192             }
1193         }
1194     }
1195 
1196     private void removeCrawlerAndNotify(Crawler crawler) {
1197         dereferenceCrawler(crawler);
1198         fireCrawlerDestroyed(crawler.getCrawlServiceProxyObjectName());
1199     }
1200 
1201     protected void handleCrawlerRemoved(ObjectName name) {
1202         Crawler c = this.remoteNameToCrawlerMap.remove(new RemoteMapKey(name));
1203         if (c != null) {
1204             removeCrawlerAndNotify(c);
1205         }
1206     }
1207 
1208     /***
1209      * @return Returns a list of containers registered with the jndi service.
1210      */
1211     protected final List<ObjectName> retrieveContainerListFromJndi() {
1212         List<ObjectName> list = new LinkedList<ObjectName>();
1213 
1214         try {
1215             NamingEnumeration<NameClassPair> e = context.list("");
1216             while (e.hasMore()) {
1217                 NameClassPair ncp = e.next();
1218                 String jndiName = ncp.getName();
1219                 
1220                 try {
1221                     ObjectName on = new ObjectName(":" + jndiName);
1222                     Hashtable ht = on.getKeyPropertyList();
1223                     if (ht.get("type").equals("container")) {
1224                         list.add(on);
1225                     }
1226 
1227                 } catch (MalformedObjectNameException e1) {
1228                     // TODO Auto-generated catch block
1229                     e1.printStackTrace();
1230                 }
1231             }
1232 
1233         } catch (NamingException e) {
1234             // TODO Auto-generated catch block
1235             e.printStackTrace();
1236         }
1237 
1238         return list;
1239     }
1240     
1241     protected final void refreshRegistry() {
1242         List<ObjectName> containerNameList = this
1243                 .retrieveContainerListFromJndi();
1244         this.containers = synchronizeContainers(
1245                 this.containers,
1246                 containerNameList);
1247     }
1248 
1249     /***
1250      * Synchronizes the container list with the fresh list (fresh meaning, last
1251      * polled from jndi), removing those that went away, and adding any newly
1252      * discovered containers.
1253      * 
1254      * @param containers
1255      * @param freshContainers
1256      * @return Map of container object names.
1257      */
1258     protected final Map<ObjectName, Container> synchronizeContainers(
1259             Map<ObjectName, Container> containers,
1260             List<ObjectName> freshContainers) {
1261 
1262         Map<ObjectName, Container> staleContainers = new HashMap<ObjectName, Container>(
1263                 containers);
1264 
1265         // remove and destroy all containers not in the new list.
1266         for (ObjectName n : staleContainers.keySet()) {
1267             if (!freshContainers.contains(n)) {
1268                 handleContainerRemoved(n);
1269             }
1270         }
1271 
1272         // add new containers not in the old list.
1273         for (ObjectName n : freshContainers) {
1274             if (!containers.keySet().contains(n)) {
1275                 try {
1276                     InetSocketAddress address = JmxUtils.extractAddress(n);
1277                     registerAddress(address);
1278                     synchronizeContainer(n);
1279                     
1280                     attachMBeanServerDelegateNotificationListener(
1281                             address,
1282                             this.remoteNotificationDelegator);
1283                 } catch (IOException e) {
1284                     e.printStackTrace();
1285                 }
1286             }
1287         }
1288 
1289         return containers;
1290     }
1291 
1292     /***
1293      * Attaches a notification listener to the remote mbean server delegate at
1294      * the specified address..
1295      * @param address 
1296      * @param listener
1297      */
1298     protected void attachMBeanServerDelegateNotificationListener(
1299             InetSocketAddress address,
1300             NotificationListener listener) {
1301         // attach container removed listener
1302         MBeanServerConnection mbc = this.connections.get(address);
1303 
1304         if (mbc == null) {
1305             throw new NullPointerException(
1306                     "no mbean server connection found on " + address);
1307         }
1308 
1309         try {
1310             mbc.addNotificationListener(
1311                     JmxUtils.MBEAN_SERVER_DELEGATE,
1312                     listener,
1313                     null,
1314                     null);
1315         } catch (InstanceNotFoundException e) {
1316             // TODO Auto-generated catch block
1317             e.printStackTrace();
1318         } catch (IOException e) {
1319             // TODO Auto-generated catch block
1320             e.printStackTrace();
1321         }
1322     }
1323 
1324     protected void registerAddress(InetSocketAddress isa) throws IOException {
1325         if (!this.connections.keySet().contains(isa)) {
1326             // create connection.
1327             MBeanServerConnection mbc = MBeanServerConnectionFactory.createConnection(isa);
1328             this.connections.put(isa, mbc);
1329         }
1330     }
1331 
1332     /***
1333      * Unhooks container from the bus.
1334      * @param c Container to remove.
1335      */
1336     protected void dereferenceContainer(Container c) {
1337         Container removed = containers.remove(c.getName());
1338         if (removed != null) {
1339             InetSocketAddress address = JmxUtils.extractAddress(c.getName());
1340             for (Crawler b : new LinkedList<Crawler>(c.getCrawlers())) {
1341                 dereferenceCrawler(b);
1342             }
1343 
1344             removeMBeanServerNotificationListener(address);
1345             this.connections.remove(address);
1346         }
1347     }
1348 
1349     protected void removeMBeanServerNotificationListener(
1350             InetSocketAddress address) {
1351         // attach container removed listener
1352         MBeanServerConnection mbc = this.connections.get(address);
1353 
1354         if (mbc == null) {
1355             throw new NullPointerException(
1356                     "no mbean server connection found on " + address);
1357         }
1358 
1359         try {
1360             mbc.removeNotificationListener(
1361                     JmxUtils.MBEAN_SERVER_DELEGATE,
1362                     this.remoteNotificationDelegator);
1363         } catch (InstanceNotFoundException e) {
1364             // TODO Auto-generated catch block
1365             e.printStackTrace();
1366         } catch (ListenerNotFoundException e) {
1367             // TODO Auto-generated catch block
1368             e.printStackTrace();
1369         } catch (IOException e) {
1370             // TODO Auto-generated catch block
1371             e.printStackTrace();
1372         }
1373     }
1374 
1375     /***
1376      * Synchronizes the state of an individual container. This means that the
1377      * container is polled for instances of mbeans, listeners are attached, to
1378      * the remote mbeans,
1379      * 
1380      * @param c Container to check.
1381      */
1382     protected void synchronizeContainer(ObjectName c) {
1383         log.info("synchonizing container:" + c.toString());
1384     	
1385         InetSocketAddress address = JmxUtils.extractAddress(c);
1386         MBeanServerConnection mbc = this.connections.get(address);
1387 
1388         if (mbc == null) {
1389             throw new NullPointerException(
1390                     "no mbean server connection found on " + address);
1391         }
1392         Container container = new Container(c, this.defaultMaxPerContainer);
1393 
1394         this.containers.put(c, container);
1395         try {
1396             // TODO - this should be filtering crawlers and current jobs.
1397             Set<ObjectName> names = mbc.queryNames(null, null);
1398             for (ObjectName n : names) {
1399                 synchronizeMBean(n);
1400             }
1401 
1402         } catch (IOException e) {
1403             // TODO Auto-generated catch block
1404             e.printStackTrace();
1405         }
1406     }
1407 
1408     /***
1409      * synchronizes state of beans according to their types. E.g. Heritrix
1410      * Service beans are added if it hasn't already been created.
1411      * 
1412      * @param name
1413      */
1414     protected void synchronizeMBean(ObjectName name) {
1415         String theType = name.getKeyProperty(JmxUtils.TYPE);
1416         if (JmxUtils.SERVICE.equals(theType)) {
1417            log.info("crawler service found:" + name.toString());
1418             handleCrawlerCreated(name);
1419         }
1420     }
1421 
1422     /***
1423      * defines and starts the jndi poller timer task
1424      */
1425     private void initializeJndiPoller() {
1426         this.jndiPoller.schedule(
1427         // define the timer task
1428                 new TimerTask() {
1429                     public void run() {
1430                         if (log.isLoggable(Level.INFO)) {
1431                             log.info("running poll task...");
1432                         }
1433 
1434                         if (jndiPoller == null) {
1435                             return;
1436                         }
1437 
1438                         refreshRegistry();
1439                         if (log.isLoggable(Level.INFO)) {
1440                             log.info("poll task done.");
1441                         }
1442                     }
1443 
1444                 },
1445 
1446                 new Date(), // start it now
1447                 JNDI_POLL_PERIOD_IN_SECONDS * 1000); // poll interval
1448     }
1449 
1450     public Object getAttribute(String attribute)
1451             throws AttributeNotFoundException,
1452             MBeanException,
1453             ReflectionException {
1454         // TODO Auto-generated method stub
1455         return null;
1456     }
1457 
1458     public AttributeList getAttributes(String[] attributes) {
1459         // TODO Auto-generated method stub
1460         return null;
1461     }
1462 
1463     public MBeanInfo getMBeanInfo() {
1464         return this.info;
1465     }
1466 
1467     public Object invoke(String actionName, Object[] params, String[] signature)
1468             throws MBeanException,
1469             ReflectionException {
1470         return this.invocationManager.invoke(actionName, params, signature);
1471     }
1472 
1473     public void setAttribute(Attribute attribute)
1474             throws AttributeNotFoundException,
1475             InvalidAttributeValueException,
1476             MBeanException,
1477             ReflectionException {
1478         // TODO Auto-generated method stub
1479 
1480     }
1481 
1482     public AttributeList setAttributes(AttributeList attributes) {
1483         // TODO Auto-generated method stub
1484         return null;
1485     }
1486 
1487     public void addNotificationListener(
1488             NotificationListener listener,
1489             NotificationFilter filter,
1490             Object handback) throws IllegalArgumentException {
1491         this.broadCaster.addNotificationListener(listener, filter, handback);
1492     }
1493 
1494     public MBeanNotificationInfo[] getNotificationInfo() {
1495         return this.broadCaster.getNotificationInfo();
1496     }
1497 
1498     public void removeNotificationListener(
1499             NotificationListener listener,
1500             NotificationFilter filter,
1501             Object handback) throws ListenerNotFoundException {
1502         this.broadCaster.removeNotificationListener(listener, filter, handback);
1503     }
1504 
1505     public void removeNotificationListener(NotificationListener listener)
1506             throws ListenerNotFoundException {
1507         this.broadCaster.removeNotificationListener(listener);
1508     }
1509 
1510     public void postDeregister() {
1511         // TODO Auto-generated method stub
1512 
1513     }
1514 
1515     public void postRegister(Boolean registrationDone) {
1516         // TODO Auto-generated method stub
1517 
1518     }
1519 
1520     public void preDeregister() throws Exception {
1521         // TODO Auto-generated method stub
1522 
1523     }
1524 
1525     public ObjectName preRegister(MBeanServer server, ObjectName name)
1526             throws Exception {
1527         return name;
1528     }
1529 
1530     private ObjectName createServiceBeanName(
1531             String name,
1532             InetSocketAddress address,
1533             String guiPort) {
1534         try {
1535             Hashtable<String, String> ht = new Hashtable<String, String>();
1536 
1537             ht.put(JmxUtils.HOST, address.getHostName());
1538             ht.put(JmxUtils.TYPE, JmxUtils.SERVICE);
1539             ht.put(JmxUtils.NAME, name);
1540             ht.put(JmxUtils.JMX_PORT, String.valueOf(address.getPort()));
1541             if(guiPort != null){
1542                 ht.put(JmxUtils.GUI_PORT, guiPort);
1543             }
1544 
1545             return new ObjectName("org.archive.crawler", ht);
1546         } catch (Exception e) {
1547             e.printStackTrace();
1548             throw new RuntimeException(e);
1549         }
1550     }
1551 
1552 
1553     private ObjectName createCrawlerIn(Container container) throws Exception {
1554         InetSocketAddress address = JmxUtils
1555                 .extractAddress(container.getName());
1556         MBeanServerConnection c = this.connections.get(address);
1557         if (c == null) {
1558             throw new Exception("No connection found on " + address);
1559         }
1560 
1561         MBeanFutureTask t = null;
1562         try {
1563             String newBeanName = "h" + System.currentTimeMillis();
1564             final ObjectName beanName = createServiceBeanName(
1565                     newBeanName,
1566                     address,
1567                     container.getName().getKeyProperty(JmxUtils.GUI_PORT));
1568 
1569             t = new MBeanFutureTask("create crawler:" + address) {
1570                 public boolean isNotificationEnabled(Notification notification) {
1571                     if (notification
1572                             .getType()
1573                             .equals(ClusterControllerNotification.
1574                                 CRAWL_SERVICE_CREATED_NOTIFICATION.getKey())) {
1575                         Crawler c = remoteNameToCrawlerMap.get(new RemoteMapKey(beanName));
1576                         ObjectName proxy = c.getCrawlServiceProxyObjectName();
1577                         return (proxy != null && notification
1578                                 .getUserData()
1579                                 .equals(proxy));
1580                     }
1581                     return false;
1582                 }
1583             };
1584 
1585             this.broadCaster.addNotificationListener(t, t, new Object());
1586 
1587             c.createMBean("org.archive.crawler.Heritrix", beanName);
1588 
1589             return (ObjectName) t.get(30 * 1000, TimeUnit.MILLISECONDS);
1590 
1591         } catch (Exception e) {
1592             e.printStackTrace();
1593             throw new Exception(e);
1594         } finally {
1595             if (t != null) {
1596                 this.broadCaster.removeNotificationListener(t);
1597             }
1598         }
1599     }
1600 
1601     private Container getContainerOn(InetSocketAddress address) {
1602         for (ObjectName n : this.containers.keySet()) {
1603             if (address.equals(JmxUtils.extractAddress(n))) {
1604                 return this.containers.get(n);
1605             }
1606         }
1607         return null;
1608     }
1609 
1610     protected ObjectName addCrawler(ObjectName newCrawler) {
1611         try {
1612             // add the crawler id to the list
1613             boolean contains = (remoteNameToCrawlerMap.containsKey(newCrawler));
1614             if (!contains) {
1615                 ObjectName proxyName = createClientProxyName(newCrawler);
1616 
1617                 InetSocketAddress address = JmxUtils.extractAddress(newCrawler);
1618                 MBeanServerConnection c = this.connections.get(address);
1619                 if (c == null) {
1620                     throw new RuntimeException("No connection found on "
1621                             + address + "- this should never happen");
1622                 }
1623 
1624                 // attach listener to remote mbean
1625                 // this.connections.get(address).addNotificationListener(newCrawler,
1626                 // this.mbeanServerDelegator, null, new Object());
1627 
1628                 DynamicMBean proxy = (DynamicMBean) Proxy.newProxyInstance(
1629                         DynamicMBean.class.getClassLoader(),
1630                         new Class[] { DynamicMBean.class },
1631                         new RemoteMBeanInvocationHandler(
1632                                 newCrawler,
1633                                 proxyName,
1634                                 c,
1635                                 spyListener));
1636 
1637                 Container container = getContainerOn(address);
1638 
1639                 if (container == null) {
1640                     throw new RuntimeException(
1641                             "Container should never be null: " + address);
1642                 }
1643 
1644                 Crawler crawler = new Crawler(proxy, null, container);
1645                 container.addCrawler(crawler);
1646                 remoteNameToCrawlerMap.put(new RemoteMapKey(newCrawler), crawler);
1647                 this.mbeanServer.registerMBean(proxy, proxyName);
1648 
1649                 Hashtable props = new Hashtable();
1650                 props.put(JmxUtils.TYPE, JmxUtils.JOB);
1651                 props.put(JmxUtils.MOTHER, newCrawler
1652                         .getKeyProperty(JmxUtils.NAME));
1653 
1654                 try {
1655                 	//TODO - design the query to pick up only the one job.
1656                     Set<ObjectName> jobs = c.queryNames(null,null);
1657                     for(ObjectName job : jobs){
1658                         if (JmxUtils.JOB.equals(job.getKeyProperty(JmxUtils.TYPE)) &&
1659                         		newCrawler
1660                                 .getKeyProperty(JmxUtils.NAME).equals(
1661                                 		job.getKeyProperty(JmxUtils.MOTHER))
1662                         		
1663                         	) {
1664                             addCrawlJob(job);
1665                             break;
1666                         }
1667                     	
1668                     }
1669                 } catch (Exception e) {
1670                     e.printStackTrace();
1671                 }
1672                 return proxyName;
1673             }
1674         } catch (InstanceAlreadyExistsException e) {
1675             e.printStackTrace();
1676         } catch (MBeanRegistrationException e) {
1677             e.printStackTrace();
1678         } catch (NotCompliantMBeanException e) {
1679             e.printStackTrace();
1680         } catch (IllegalArgumentException e) {
1681             e.printStackTrace();
1682         }
1683         return null;
1684     }
1685 
1686     protected void handleCrawlerCreated(ObjectName newCrawler) {
1687         ObjectName proxyName = addCrawler(newCrawler);
1688         if (proxyName != null) {
1689             fireCrawlerCreated(proxyName);
1690         }
1691     }
1692 
1693     private void fireCrawlerDestroyed(ObjectName proxyName) {
1694         fireNotification(
1695             proxyName,
1696             ClusterControllerNotification.CRAWL_SERVICE_DESTROYED_NOTIFICATION.getKey());
1697     }
1698 
1699     private void fireNotification(ObjectName name, String type) {
1700         Notification n = new Notification(type, this.name, System
1701                 .currentTimeMillis());
1702         n.setUserData(name);
1703         fireNotificationEvent(n);
1704         if (log.isLoggable(Level.INFO)) {
1705             log.info("name=" + name + "; type=" + type);
1706         }
1707     }
1708 
1709     private void fireCrawlerCreated(ObjectName proxyName) {
1710         fireNotification(
1711             proxyName,
1712             ClusterControllerNotification.
1713                 CRAWL_SERVICE_CREATED_NOTIFICATION.getKey());
1714     }
1715 
1716     private ObjectName createClientProxyName(ObjectName remote) {
1717         try {
1718 
1719             Hashtable lp = this.name.getKeyPropertyList();
1720             Hashtable cp = new Hashtable();
1721             Hashtable rp = remote.getKeyPropertyList();
1722             cp.put(JmxUtils.TYPE, rp.get(JmxUtils.TYPE));
1723             cp.put(JmxUtils.NAME, rp.get(JmxUtils.NAME));
1724             cp.put(JmxUtils.HOST, lp.get(JmxUtils.HOST));
1725             cp.put(JmxUtils.JMX_PORT, lp.get(JmxUtils.JMX_PORT));
1726             cp.put("remoteHost", rp.get(JmxUtils.HOST));
1727             cp.put("remoteJmxPort", rp.get(JmxUtils.JMX_PORT));
1728 
1729             return new ObjectName(this.name.getDomain(), cp);
1730         } catch (MalformedObjectNameException e) {
1731             e.printStackTrace();
1732             throw new RuntimeException(e);
1733         } catch (NullPointerException e) {
1734             e.printStackTrace();
1735             throw new RuntimeException(e);
1736         }
1737 
1738     }
1739 
1740     public static void main(String[] args) {
1741         ClusterControllerBean b = new ClusterControllerBean();
1742         b.init();
1743     }
1744 }