View Javadoc

1   /* $Id: ClusterControllerClientImpl.java 4166 2006-02-03 01:44:31Z dbernstein $
2    *
3    * Created on Dec 12, 2005
4    *
5    * Copyright (C) 2005 Internet Archive.
6    *  
7    * This file is part of the Heritrix Cluster Controller (crawler.archive.org).
8    *  
9    * HCC is free software; you can redistribute it and/or modify
10   * it under the terms of the GNU Lesser Public License as published by
11   * the Free Software Foundation; either version 2.1 of the License, or
12   * any later version.
13   * 
14   * Heritrix is distributed in the hope that it will be useful, 
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   * GNU Lesser Public License for more details.
18   * 
19   * You should have received a copy of the GNU Lesser Public License
20   * along with Heritrix; if not, write to the Free Software
21   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22   */
23  
24  package org.archive.hcc.client;
25  
26  import java.io.IOException;
27  import java.net.InetSocketAddress;
28  import java.security.InvalidParameterException;
29  import java.util.Collection;
30  import java.util.HashMap;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.logging.Level;
37  import java.util.logging.Logger;
38  
39  import javax.management.InstanceNotFoundException;
40  import javax.management.MBeanException;
41  import javax.management.MBeanServer;
42  import javax.management.MBeanServerConnection;
43  import javax.management.MBeanServerFactory;
44  import javax.management.MalformedObjectNameException;
45  import javax.management.Notification;
46  import javax.management.ObjectName;
47  import javax.management.ReflectionException;
48  import javax.management.openmbean.CompositeData;
49  import javax.management.openmbean.SimpleType;
50  import javax.naming.InsufficientResourcesException;
51  import javax.swing.event.EventListenerList;
52  
53  import org.archive.hcc.util.ClusterControllerNotification;
54  import org.archive.hcc.util.NotificationDelegatableBase;
55  import org.archive.hcc.util.NotificationDelegator;
56  import org.archive.hcc.util.jmx.MBeanServerConnectionFactory;
57  import org.archive.util.JmxUtils;
58  
59  /***
60   * As the workhorse of the cluster controller client, this class is responsible 
61   * for connecting to the local or remote <code>ClusterControllerBean</code> via
62   * its <code>DynamicMBean</code> interface. It hides all the details of
63   * connecting to the remote MBean, ie invocations and notifications. 
64   * @author Daniel Bernstein (dbernstein@archive.org)
65   */
66  class ClusterControllerClientImpl implements ClusterControllerClient{
67  
68      private static final Logger log = Logger.getLogger(
69              ClusterControllerClientImpl.class.getName());
70  
71      private MBeanServerConnection connection;
72  
73      private ObjectName name;
74  
75      private NotificationDelegator notificationDelegator;
76  
77      private EventListenerList listenerList;
78  
79      /***
80       * Constructs a client running on a remote machine.
81       * 
82       * @param address
83       * @throws InstanceNotFoundException
84       * @throws IOException
85       */
86      ClusterControllerClientImpl(InetSocketAddress address)
87              throws InstanceNotFoundException,
88              IOException {
89          init(MBeanServerConnectionFactory.createConnection(address));
90      }
91  
92      private void init(MBeanServerConnection connection)
93              throws InstanceNotFoundException {
94          try {
95              this.connection = connection;
96              ObjectName query = new ObjectName(
97                      "org.archive.hcc:type=ClusterControllerBean,*");
98              Set<ObjectName> names = connection.queryNames(query, null);
99              if (names.size() < 1) {
100                 throw new InstanceNotFoundException(
101                         "no mbean found matching query:" + query);
102             }
103             this.name = names.iterator().next();
104             this.notificationDelegator = createNotificationDelegator();
105             this.connection.addNotificationListener(
106                     this.name,
107                     this.notificationDelegator,
108                     null,
109                     new Object());
110             this.listenerList = new EventListenerList();
111         } catch (Exception e) {
112             e.printStackTrace();
113             throw new RuntimeException(e);
114         }
115     }
116 
117     /***
118      * Creates a local instance of the ClusterControllerBean and attaches to it.
119      */
120     ClusterControllerClientImpl() {
121         try {
122             init(createMBeanServer());
123         } catch (InstanceNotFoundException e) {
124             throw new RuntimeException(e);
125         }
126     }
127 
128     public void addCrawlerLifecycleListener(CrawlerLifecycleListener l) {
129         this.listenerList.add(CrawlerLifecycleListener.class, l);
130     }
131 
132     public void removeCrawlerLifecycleListener(CrawlerLifecycleListener l) {
133         this.listenerList.remove(CrawlerLifecycleListener.class, l);
134     }
135 
136     public void addCrawlJobListener(CurrentCrawlJobListener l) {
137         this.listenerList.add(CurrentCrawlJobListener.class, l);
138     }
139 
140     public void removeCrawlJobListener(CurrentCrawlJobListener l) {
141         this.listenerList.remove(CurrentCrawlJobListener.class, l);
142     }
143 
144     private NotificationDelegator createNotificationDelegator() {
145         NotificationDelegator d = new NotificationDelegator();
146         d.addDelegatable(new NotificationDelegatableBase() {
147             protected boolean delegate(Notification n, Object handbac) {
148                 if (n
149                         .getType()
150                         .equals(
151                                 ClusterControllerNotification.
152                                     CRAWL_SERVICE_CREATED_NOTIFICATION.getKey())) {
153                     
154                     if (log.isLoggable(Level.FINE)) {
155                         log.fine("crawler service created: " + n.getUserData());
156                     }
157                     handleCrawlServiceCreated((ObjectName) n.getUserData());
158                     return true;
159                 }
160                 return false;
161             }
162         });
163 
164         d.addDelegatable(new NotificationDelegatableBase() {
165             protected boolean delegate(Notification n, Object handbac) {
166                 if (n
167                         .getType()
168                         .equals(
169                                 ClusterControllerNotification.
170                                     CRAWL_SERVICE_DESTROYED_NOTIFICATION.getKey())) {
171                    
172                     if (log.isLoggable(Level.INFO)) {
173                         log.info("crawler service destroyed: "
174                                 + n.getUserData());
175                     }
176 
177                     handleCrawlServiceDestroyed((ObjectName) n.getUserData());
178                     return true;
179                 }
180                 return false;
181             }
182         });
183 
184         d.addDelegatable(new NotificationDelegatableBase() {
185             protected boolean delegate(Notification n, Object handbac) {
186                 if (n.getType().equals(
187                     ClusterControllerNotification.
188                         CRAWL_SERVICE_JOB_STARTED_NOTIFICATION.getKey())){
189                     handleCrawlServiceJobStarted((ObjectName) n.getUserData());
190                     return true;
191                 }
192                 return false;
193             }
194         });
195 
196         d.addDelegatable(new NotificationDelegatableBase() {
197             protected boolean delegate(Notification n, Object handbac) {
198                 if (n.getType().equals(
199                     ClusterControllerNotification.
200                         CRAWL_SERVICE_JOB_COMPLETED_NOTIFICATION.getKey())){
201                     handleCrawlServiceJobCompleted((ObjectName)n.getUserData());
202                     return true;
203                 }
204                 return false;
205             }
206         });
207 
208         d.addDelegatable(new NotificationDelegatableBase() {
209             protected boolean delegate(Notification n, Object handbac) {
210                 if (n.getType().equals("progressStatistics")) {
211                     ObjectName source = (ObjectName) n.getSource();
212                     if (source.getKeyProperty(JmxUtils.TYPE).equals(
213                             JmxUtils.JOB)) {
214                         Map statistics = toMap(n.getUserData());
215                         handleCrawlServiceJobAttributesChanged(
216                                 source,
217                                 statistics,
218                                 statistics);
219                         return true;
220                     }
221                 }
222                 return false;
223             }
224         });
225 
226         d.addDelegatable(new NotificationDelegatableBase() {
227             protected boolean delegate(Notification n, Object handbac) {
228                 if (n.getType().equals("crawlResuming")) {
229                     ObjectName source = (ObjectName) n.getSource();
230                     try {
231                         fireCrawlJobResumed(new CurrentCrawlJobImpl(
232                                 source,
233                                 findCrawlJobParentInternal(
234                                         JmxUtils.getUid(source),
235                                         JmxUtils.extractAddress(source)),
236                                 connection));
237                     } catch (ClusterException e) {
238                         e.printStackTrace();
239                     }
240                     return true;
241                 }
242                 return false;
243             }
244         });
245 
246         d.addDelegatable(new NotificationDelegatableBase() {
247             protected boolean delegate(Notification n, Object handbac) {
248                 if (n.getType().equals("crawlPaused")) {
249                     ObjectName source = (ObjectName) n.getSource();
250                     try {
251                         fireCrawlJobPaused(new CurrentCrawlJobImpl(
252                                 source,
253                                 findCrawlJobParentInternal(
254                                         JmxUtils.getUid(source),
255                                         JmxUtils.extractAddress(source)),
256                                 connection));
257                         return true;
258                     } catch (ClusterException e) {
259                         e.printStackTrace();
260                     }
261                 }
262                 return false;
263             }
264         });
265 
266         
267         d.addDelegatable(new NotificationDelegatableBase() {
268             protected boolean delegate(Notification n, Object handbac) {
269                 if (n.getType().equals("crawlEnding")) {
270                     ObjectName source = (ObjectName) n.getSource();
271                     try {
272                         fireCrawlJobStopping(createCurrentCrawlJob(source, connection));
273                         return true;
274                     } catch (ClusterException e) {
275                         e.printStackTrace();
276                     }
277                 }
278                 return false;
279             }
280         });
281         return d;
282     }
283     
284     private CurrentCrawlJobImpl createCurrentCrawlJob(
285                                             ObjectName job, 
286                                             MBeanServerConnection connection) 
287                                             throws ClusterException {
288         return new CurrentCrawlJobImpl(
289                 job,
290                 findCrawlJobParentInternal(
291                         JmxUtils.getUid(job),
292                         org.archive.hcc.util.JmxUtils.extractRemoteAddress(job)),
293                         connection);
294     }
295 
296     private static Map toMap(Object object) {
297         if (object instanceof Map) {
298             return (Map) object;
299         } else if (object instanceof CompositeData) {
300             CompositeData cd = (CompositeData) object;
301             String[] keys = new String[] { "deepestUri", "downloadedUriCount",
302                     "queuedUriCount", "docsPerSecond", "freeMemory",
303                     "downloadFailures", "totalKBPerSec", "totalMemory",
304                     "currentKBPerSec", "currentDocsPerSecond", "busyThreads",
305                     "averageDepth", "discoveredUriCount", "congestionRatio","totalProcessedBytes" };
306             Map map = new HashMap();
307             
308             Object[] values = cd.getAll(keys);
309             for (int i = 0; i < keys.length; i++) {
310                 map.put(keys[i], values[i]);
311             }
312 
313             return map;
314         } else {
315             throw new InvalidParameterException("unable to convert to map: "
316                     + object.getClass());
317         }
318     }
319 
320     private void handleCrawlServiceJobAttributesChanged(
321             ObjectName source,
322             Map oldValue,
323             Map newValue) {
324         try {
325             CurrentCrawlJobImpl cj = createCurrentCrawlJob(
326                     source,this.connection);
327             CurrentCrawlJobListener[] listener = this.listenerList
328                     .getListeners(CurrentCrawlJobListener.class);
329             for (int i = 0; i < listener.length; i++) {
330                 listener[i].statisticsChanged(cj, newValue);
331             }
332         } catch (ClusterException e) {
333             // TODO Auto-generated catch block
334             e.printStackTrace();
335         }
336     }
337 
338     private void handleCrawlServiceCreated(ObjectName crawlerName) {
339             Crawler c = new CrawlerImpl(crawlerName, this.connection);
340             fireCrawlerCreated(c);
341     }
342 
343     private void handleCrawlServiceDestroyed(ObjectName crawlerName) {
344         Crawler c = new CrawlerImpl(crawlerName, this.connection);
345         fireCrawlerDestroyed(c);
346     }
347 
348     private void handleCrawlServiceJobStarted(ObjectName crawlJob) {
349         try {
350             CurrentCrawlJob job = createCurrentCrawlJob(
351                                     crawlJob,this.connection);
352             fireCrawlJobStarted(job);
353         } catch (ClusterException e) {
354             // TODO Auto-generated catch block
355             e.printStackTrace();
356         }
357     }
358     
359     
360 
361     private void handleCrawlServiceJobCompleted(ObjectName crawlJob) {
362         try {
363             CurrentCrawlJob job = createCurrentCrawlJob(
364                     crawlJob,this.connection);
365             fireCrawlJobCompleted(job);
366 
367         } catch (ClusterException e) {
368             e.printStackTrace();
369         }
370     }
371 
372     private void fireCrawlerCreated(Crawler crawler) {
373         CrawlerLifecycleListener[] listener = this.listenerList
374                 .getListeners(CrawlerLifecycleListener.class);
375         for (int i = 0; i < listener.length; i++) {
376             listener[i].crawlerCreated(crawler);
377         }
378     }
379 
380     private void fireCrawlerDestroyed(Crawler crawler) {
381         CrawlerLifecycleListener[] listener = this.listenerList
382                 .getListeners(CrawlerLifecycleListener.class);
383         for (int i = 0; i < listener.length; i++) {
384             listener[i].crawlerDestroyed(crawler);
385         }
386     }
387 
388     private void fireCrawlJobStarted(CurrentCrawlJob job) {
389         CurrentCrawlJobListener[] listener = this.listenerList
390                 .getListeners(CurrentCrawlJobListener.class);
391         for (int i = 0; i < listener.length; i++) {
392             listener[i].crawlJobStarted(job);
393         }
394     }
395 
396     private void fireCrawlJobPaused(CurrentCrawlJob job) {
397         CurrentCrawlJobListener[] listener = this.listenerList
398                 .getListeners(CurrentCrawlJobListener.class);
399         for (int i = 0; i < listener.length; i++) {
400             listener[i].crawlJobPaused(job);
401         }
402     }
403     
404     private void fireCrawlJobStopping(CurrentCrawlJob job) {
405         CurrentCrawlJobListener[] listener = this.listenerList
406                 .getListeners(CurrentCrawlJobListener.class);
407         for (int i = 0; i < listener.length; i++) {
408             listener[i].crawlJobStopping(job);
409         }
410     }
411 
412     private void fireCrawlJobResumed(CurrentCrawlJob job) {
413         CurrentCrawlJobListener[] listener = this.listenerList
414                 .getListeners(CurrentCrawlJobListener.class);
415         for (int i = 0; i < listener.length; i++) {
416             listener[i].crawlJobResumed(job);
417         }
418     }
419 
420     private void fireCrawlJobCompleted(CurrentCrawlJob job) {
421         CurrentCrawlJobListener[] listener = this.listenerList
422                 .getListeners(CurrentCrawlJobListener.class);
423         CompletedCrawlJobImpl cj = 
424             new CompletedCrawlJobImpl(
425                     job.getUid(), 
426                     job.getJobName(),
427                     (CrawlerImpl)job.getMother(), 
428                     this.connection);
429 
430         for (int i = 0; i < listener.length; i++) {
431             listener[i].crawlJobCompleted(cj);
432         }
433     }
434     
435 
436     public void destroyAllCrawlers() throws ClusterException{
437         try {
438             ObjectName parent = (ObjectName) this.connection.invoke(
439                     this.name,
440                     "destroyAllCrawlers",
441                     new Object[] {},
442                     new String[] {});
443 
444            
445         } catch (Exception e) {
446             e.printStackTrace();
447             throw new ClusterException(e);
448         }
449     }
450 
451     public boolean pauseAllJobs() throws ClusterException {
452     	try {
453             return (Boolean) this.connection.invoke(
454                     this.name,
455                     "pauseAllJobs",
456                     new Object[] {},
457                     new String[] {});
458 
459            
460         } catch (Exception e) {
461             e.printStackTrace();
462             throw new ClusterException(e);
463         }
464     }
465     
466     public boolean resumeAllPausedJobs() throws ClusterException {
467     	try {
468             return (Boolean) this.connection.invoke(
469                     this.name,
470                     "resumeAllPausedJobs",
471                     new Object[] {},
472                     new String[] {});
473 
474            
475         } catch (Exception e) {
476             e.printStackTrace();
477             throw new ClusterException(e);
478         }
479     }
480     
481     public Crawler findCrawlJobParent(String uid, InetSocketAddress address) 
482         throws ClusterException {
483         return findCrawlJobParentInternal(uid, address);
484     }
485     
486 
487     public CrawlerImpl findCrawlJobParentInternal(String uid, InetSocketAddress address)
488             throws ClusterException {
489         try {
490             ObjectName parent = (ObjectName) this.connection.invoke(
491                     this.name,
492                     "findCrawlServiceJobParent",
493                     new Object[] { uid, address.getHostName(),
494                             new Integer(address.getPort()) },
495                     new String[] { "java.lang.String", "java.lang.String",
496                             "java.lang.Integer" });
497 
498             if (parent != null) {
499                 return new CrawlerImpl(parent, this.connection);
500             }
501 
502             return null;
503         } catch (Exception e) {
504             e.printStackTrace();
505             throw new ClusterException(e);
506         }
507 
508     }
509 
510     private MBeanServer createMBeanServer() {
511         MBeanServer result = null;
512         List servers = MBeanServerFactory.findMBeanServer(null);
513         if (servers == null) {
514             return result;
515         }
516         for (Iterator i = servers.iterator(); i.hasNext();) {
517             MBeanServer server = (MBeanServer) i.next();
518             if (server == null) {
519                 continue;
520             }
521             result = server;
522             break;
523         }
524         return result;
525     }
526 
527     public Crawler createCrawler() throws 
528             InsufficientCrawlingResourcesException, ClusterException {
529         try {
530             ObjectName crawler = (ObjectName) this.connection.invoke(
531                     this.name,
532                     "createCrawler",
533                     new Object[0],
534                     new String[0]);
535             return new CrawlerImpl(crawler, this.connection);
536         }catch (MBeanException e) {
537             if("insufficent crawler resources".equals(e.getMessage())){
538                 throw new InsufficientCrawlingResourcesException(e);
539             }
540             e.printStackTrace();
541             throw new ClusterException(e);
542         } 
543         
544         catch (Exception e) {
545             e.printStackTrace();
546             throw new ClusterException(e);
547         }
548     }
549 
550     public Collection<Crawler> listCrawlers() throws ClusterException {
551         try {
552             ObjectName[] crawlers = (ObjectName[]) this.connection.invoke(
553                     this.name,
554                     "listCrawlers",
555                     new Object[0],
556                     new String[0]);
557             Collection<Crawler> crawlerList = new LinkedList<Crawler>();
558             for (int i = 0; i < crawlers.length; i++) {
559                 crawlerList.add(new CrawlerImpl(crawlers[i], this.connection));
560             }
561 
562             return crawlerList;
563         } catch (Exception e) {
564             e.printStackTrace();
565             throw new ClusterException(e);
566         }
567 
568     }
569 
570     public void destroy() {
571         try {
572             this.connection.invoke(
573                     this.name,
574                     "destroy",
575                     new Object[0],
576                     new String[0]);
577         } catch (InstanceNotFoundException e) {
578             // TODO Auto-generated catch block
579             e.printStackTrace();
580         } catch (MBeanException e) {
581             // TODO Auto-generated catch block
582             e.printStackTrace();
583         } catch (ReflectionException e) {
584             // TODO Auto-generated catch block
585             e.printStackTrace();
586         } catch (IOException e) {
587             // TODO Auto-generated catch block
588             e.printStackTrace();
589         }
590     }
591     
592     /* (non-Javadoc)
593      * @see org.archive.hcc.client.ClusterControllerClient#getCurrentCrawlJob(org.archive.hcc.client.Crawler)
594      */
595     public CurrentCrawlJob getCurrentCrawlJob(Crawler crawler) throws ClusterException {
596         try {
597             ObjectName currentCrawlJob = (ObjectName) this.connection.invoke(
598                     this.name,
599                     "getCurrentCrawlJob",
600                     new Object[]{crawler.getName()},
601                     new String[]{SimpleType.OBJECTNAME.getClassName()});
602             
603             if(currentCrawlJob == null){
604                 return null;
605             }
606 
607             return new CurrentCrawlJobImpl(currentCrawlJob, (CrawlerImpl)crawler, this.connection);
608         }catch (Exception e) {
609             e.printStackTrace();
610             throw new ClusterException(e);
611         }
612     }
613     
614     /***
615      * Returns the maximum number of instances allowed for this container.
616      * If the container does not exist, -1 is returned.
617      * @param hostname
618      * @param port
619      * @return
620      */
621     public int getMaxInstances(String hostname, int port) throws ClusterException{
622         try {
623             Integer maxInstances = (Integer) this.connection.invoke(
624                     this.name,
625                     "getMaxInstances",
626                     new Object[]{hostname, new Integer(port)},
627                     new String[]{SimpleType.STRING.getClassName(), 
628                     				SimpleType.INTEGER.getClassName()});
629             return maxInstances;
630            
631         }catch (Exception e) {
632             e.printStackTrace();
633             throw new ClusterException(e);
634         }
635     }
636     
637     /***
638      * Sets the maximum number of instances that may run on a 
639      * specified container defined by a host and port.
640      * @param hostname
641      * @param port
642      * @param maxInstances
643      */
644     public void setMaxInstances(String hostname, int port, int maxInstances) 
645     	throws ClusterException{
646         try {
647             this.connection.invoke(
648                     this.name,
649                     "setMaxInstances",
650                     new Object[]{
651                     			hostname, 
652                     			new Integer(port),
653                     			new Integer(maxInstances)},
654                     new String[]{
655                     			SimpleType.STRING.getClassName(),
656                     			SimpleType.INTEGER.getClassName(),
657                     			SimpleType.INTEGER.getClassName()});
658             
659            
660         }catch (Exception e) {
661             e.printStackTrace();
662             throw new ClusterException(e);
663         }
664     }
665 }