View Javadoc

1   /* Copyright (C) 2009 Internet Archive
2    *
3    * This file is part of the Heritrix web crawler (crawler.archive.org).
4    *
5    * Heritrix is free software; you can redistribute it and/or modify
6    * it under the terms of the GNU Lesser Public License as published by
7    * the Free Software Foundation; either version 2.1 of the License, or
8    * any later version.
9    *
10   * Heritrix is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Lesser Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser Public License
16   * along with Heritrix; if not, write to the Free Software
17   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   *
19   * CrawlController.java
20   * Created on May 14, 2003
21   *
22   * $Id: CrawlController.java 6979 2010-11-03 00:20:51Z gojomo $
23   */
24  package org.archive.crawler.framework;
25  
26  import java.io.File;
27  import java.io.FileOutputStream;
28  import java.io.FilenameFilter;
29  import java.io.IOException;
30  import java.io.ObjectInputStream;
31  import java.io.PrintWriter;
32  import java.io.Serializable;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.Collections;
36  import java.util.EventObject;
37  import java.util.HashMap;
38  import java.util.HashSet;
39  import java.util.Hashtable;
40  import java.util.Iterator;
41  import java.util.LinkedList;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.Set;
45  import java.util.TreeSet;
46  import java.util.concurrent.atomic.AtomicInteger;
47  import java.util.concurrent.locks.ReentrantLock;
48  import java.util.logging.FileHandler;
49  import java.util.logging.Formatter;
50  import java.util.logging.Level;
51  import java.util.logging.Logger;
52  
53  import javax.management.AttributeNotFoundException;
54  import javax.management.InvalidAttributeValueException;
55  import javax.management.MBeanException;
56  import javax.management.ReflectionException;
57  
58  import org.apache.commons.httpclient.URIException;
59  import org.archive.crawler.admin.CrawlJob;
60  import org.archive.crawler.admin.StatisticsTracker;
61  import org.archive.crawler.datamodel.Checkpoint;
62  import org.archive.crawler.datamodel.CrawlOrder;
63  import org.archive.crawler.datamodel.CrawlURI;
64  import org.archive.crawler.datamodel.ServerCache;
65  import org.archive.crawler.event.CrawlStatusListener;
66  import org.archive.crawler.event.CrawlURIDispositionListener;
67  import org.archive.crawler.framework.exceptions.FatalConfigurationException;
68  import org.archive.crawler.framework.exceptions.InitializationException;
69  import org.archive.crawler.io.LocalErrorFormatter;
70  import org.archive.crawler.io.RuntimeErrorFormatter;
71  import org.archive.crawler.io.StatisticsLogFormatter;
72  import org.archive.crawler.io.UriErrorFormatter;
73  import org.archive.crawler.io.UriProcessingFormatter;
74  import org.archive.crawler.settings.MapType;
75  import org.archive.crawler.settings.SettingsHandler;
76  import org.archive.crawler.util.CheckpointUtils;
77  import org.archive.io.GenerationFileHandler;
78  import org.archive.net.UURI;
79  import org.archive.net.UURIFactory;
80  import org.archive.util.ArchiveUtils;
81  import org.archive.util.CachedBdbMap;
82  import org.archive.util.FileUtils;
83  import org.archive.util.ObjectIdentityBdbCache;
84  import org.archive.util.ObjectIdentityCache;
85  import org.archive.util.Reporter;
86  import org.archive.util.bdbje.EnhancedEnvironment;
87  import org.xbill.DNS.DClass;
88  import org.xbill.DNS.Lookup;
89  
90  import com.sleepycat.bind.serial.StoredClassCatalog;
91  import com.sleepycat.je.CheckpointConfig;
92  import com.sleepycat.je.DatabaseException;
93  import com.sleepycat.je.DbInternal;
94  import com.sleepycat.je.EnvironmentConfig;
95  import com.sleepycat.je.dbi.EnvironmentImpl;
96  import com.sleepycat.je.utilint.DbLsn;
97  
98  /***
99   * CrawlController collects all the classes which cooperate to
100  * perform a crawl and provides a high-level interface to the
101  * running crawl.
102  *
103  * As the "global context" for a crawl, subcomponents will
104  * often reach each other through the CrawlController.
105  *
106  * @author Gordon Mohr
107  */
108 public class CrawlController implements Serializable, Reporter {
109     // be robust against trivial implementation changes
110     private static final long serialVersionUID =
111         ArchiveUtils.classnameBasedUID(CrawlController.class,1);
112 
113     /***
114      * Messages from the crawlcontroller.
115      *
116      * They appear on console.
117      */
118     private final static Logger LOGGER =
119         Logger.getLogger(CrawlController.class.getName());
120 
121     // manifest support
122     /*** abbrieviation label for config files in manifest */
123     public static final char MANIFEST_CONFIG_FILE = 'C';
124     /*** abbrieviation label for report files in manifest */
125     public static final char MANIFEST_REPORT_FILE = 'R';
126     /*** abbrieviation label for log files in manifest */
127     public static final char MANIFEST_LOG_FILE = 'L';
128 
129     // key log names
130     public static final String LOGNAME_PROGRESS_STATISTICS =
131         "progress-statistics";
132     public static final String LOGNAME_URI_ERRORS = "uri-errors";
133     public static final String LOGNAME_RUNTIME_ERRORS = "runtime-errors";
134     public static final String LOGNAME_LOCAL_ERRORS = "local-errors";
135     public static final String LOGNAME_CRAWL = "crawl";
136 
137     // key subcomponents which define and implement a crawl in progress
138     private transient CrawlOrder order;
139     private transient CrawlScope scope;
140     private transient ProcessorChainList processorChains;
141     
142     private transient Frontier frontier;
143 
144     private transient AtomicInteger loopingToes;
145     private transient ToePool toePool;
146     
147     private transient ServerCache serverCache;
148     
149     // This gets passed into the initialize method.
150     private transient SettingsHandler settingsHandler;
151 
152 
153     // Used to enable/disable single-threaded operation after OOM
154     private volatile transient boolean singleThreadMode = false; 
155     private transient ReentrantLock singleThreadLock = null;
156 
157     // emergency reserve of memory to allow some progress/reporting after OOM
158     private transient LinkedList<char[]> reserveMemory;
159     private static final int RESERVE_BLOCKS = 1;
160     private static final int RESERVE_BLOCK_SIZE = 6*2^20; // 6MB
161 
162     // crawl state: as requested or actual
163     
164     /***
165      * Crawl exit status.
166      */
167     private transient String sExit;
168 
169     public static final Object NASCENT = "NASCENT".intern();
170     public static final Object RUNNING = "RUNNING".intern();
171     public static final Object PAUSED = "PAUSED".intern();
172     public static final Object PAUSING = "PAUSING".intern();
173     public static final Object CHECKPOINTING = "CHECKPOINTING".intern();
174     public static final Object STOPPING = "STOPPING".intern();
175     public static final Object FINISHED = "FINISHED".intern();
176     public static final Object STARTED = "STARTED".intern();
177     public static final Object PREPARING = "PREPARING".intern();
178 
179     transient private Object state = NASCENT;
180 
181     // disk paths
182     private transient File disk;        // overall disk path
183     private transient File logsDisk;    // for log files
184     
185     /***
186      * For temp files representing state of crawler (eg queues)
187      */
188     private transient File stateDisk;
189     
190     /***
191      * For discardable temp files (eg fetch buffers).
192      */
193     private transient File scratchDisk;
194 
195     /***
196      * Directory that holds checkpoint.
197      */
198     private transient File checkpointsDisk;
199     
200     /***
201      * Checkpointer.
202      * Knows if checkpoint in progress and what name of checkpoint is.  Also runs
203      * checkpoints.
204      */
205     private Checkpointer checkpointer;
206     
207     /***
208      * Gets set to checkpoint we're in recovering if in checkpoint recover
209      * mode.  Gets setup by {@link #getCheckpointRecover()}.
210      */
211     private transient Checkpoint checkpointRecover = null;
212 
213     // crawl limits
214     private long maxBytes;
215     private long maxDocument;
216     private long maxTime;
217 
218     /***
219      * A manifest of all files used/created during this crawl. Written to file
220      * at the end of the crawl (the absolutely last thing done).
221      */
222     private StringBuffer manifest;
223 
224     /***
225      * Record of fileHandlers established for loggers,
226      * assisting file rotation.
227      */
228     transient private Map<Logger,FileHandler> fileHandlers;
229 
230     /*** suffix to use on active logs */
231     public static final String CURRENT_LOG_SUFFIX = ".log";
232 
233     /***
234      * Crawl progress logger.
235      *
236      * No exceptions.  Logs summary result of each url processing.
237      */
238     public transient Logger uriProcessing;
239 
240     /***
241      * This logger contains unexpected runtime errors.
242      *
243      * Would contain errors trying to set up a job or failures inside
244      * processors that they are not prepared to recover from.
245      */
246     public transient Logger runtimeErrors;
247 
248     /***
249      * This logger is for job-scoped logging, specifically errors which
250      * happen and are handled within a particular processor.
251      *
252      * Examples would be socket timeouts, exceptions thrown by extractors, etc.
253      */
254     public transient Logger localErrors;
255 
256     /***
257      * Special log for URI format problems, wherever they may occur.
258      */
259     public transient Logger uriErrors;
260 
261     /***
262      * Statistics tracker writes here at regular intervals.
263      */
264     private transient Logger progressStats;
265 
266     /***
267      * Logger to hold job summary report.
268      *
269      * Large state reports made at infrequent intervals (e.g. job ending) go
270      * here.
271      */
272     public transient Logger reports;
273 
274     protected StatisticsTracking statistics = null;
275 
276     /***
277      * List of crawl status listeners.
278      *
279      * All iterations need to synchronize on this object if they're to avoid
280      * concurrent modification exceptions.
281      * See {@link java.util.Collections#synchronizedList(List)}.
282      */
283     private transient List<CrawlStatusListener> registeredCrawlStatusListeners =
284         Collections.synchronizedList(new ArrayList<CrawlStatusListener>());
285     
286     // Since there is a high probability that there will only ever by one
287     // CrawlURIDispositionListner we will use this while there is only one:
288     private transient CrawlURIDispositionListener
289         registeredCrawlURIDispositionListener;
290 
291     // And then switch to the array once there is more then one.
292      protected transient ArrayList<CrawlURIDispositionListener> 
293      registeredCrawlURIDispositionListeners;
294     
295     /*** Shared bdb Environment for Frontier subcomponents */
296     // TODO: investigate using multiple environments to split disk accesses
297     // across separate physical disks
298     private transient EnhancedEnvironment bdbEnvironment = null;
299     
300     /***
301      * Keep a list of all BigMap instance made -- shouldn't be many -- so that
302      * we can checkpoint.
303      */
304     private transient Map<String,ObjectIdentityCache<?,?>> bigmaps = null;
305     
306     /***
307      * Default constructor
308      */
309     public CrawlController() {
310         super();
311         // Defer most setup to initialize methods
312     }
313 
314     /***
315      * Starting from nothing, set up CrawlController and associated
316      * classes to be ready for a first crawl.
317      *
318      * @param sH Settings handler.
319      * @throws InitializationException
320      */
321     public void initialize(SettingsHandler sH)
322     throws InitializationException {
323         sendCrawlStateChangeEvent(PREPARING, CrawlJob.STATUS_PREPARING);
324  
325         this.singleThreadLock = new ReentrantLock();
326         this.settingsHandler = sH;
327         installThreadContextSettingsHandler();
328         this.order = settingsHandler.getOrder();
329         this.order.setController(this);
330         this.bigmaps = new Hashtable<String,ObjectIdentityCache<?,?>>();
331         sExit = "";
332         this.manifest = new StringBuffer();
333         String onFailMessage = "";
334         try {
335             onFailMessage = "You must set the User-Agent and From HTTP" +
336             " header values to acceptable strings. \n" +
337             " User-Agent: [software-name](+[info-url])[misc]\n" +
338             " From: [email-address]\n";
339             order.checkUserAgentAndFrom();
340 
341             onFailMessage = "Unable to setup disk";
342             if (disk == null) {
343                 setupDisk();
344             }
345 
346             onFailMessage = "Unable to create log file(s)";
347             setupLogs();
348             
349             // Figure if we're to do a checkpoint restore. If so, get the
350             // checkpointRecover instance and then put into place the old bdb
351             // log files. If any of the log files already exist in target state
352             // diretory, WE DO NOT OVERWRITE (Makes for faster recovery).
353             // CrawlController checkpoint recovery code manages restoration of
354             // the old StatisticsTracker, any BigMaps used by the Crawler and
355             // the moving of bdb log files into place only. Other objects
356             // interested in recovery need to ask if
357             // CrawlController#isCheckpointRecover is set to figure if in
358             // recovery and then take appropriate recovery action
359             // (These objects can call CrawlController#getCheckpointRecover
360             // to get the directory that might hold files/objects dropped
361             // checkpointing).  Such objects will need to use a technique other
362             // than object serialization restoring settings because they'll
363             // have already been constructed when comes time for object to ask
364             // if its to recover itself. See ARCWriterProcessor for example.
365             onFailMessage = "Unable to test/run checkpoint recover";
366             this.checkpointRecover = getCheckpointRecover();
367             if (this.checkpointRecover == null) {
368                 this.checkpointer =
369                     new Checkpointer(this, this.checkpointsDisk);
370             } else {
371                 setupCheckpointRecover();
372             }
373             
374             onFailMessage = "Unable to setup bdb environment.";
375             setupBdb();
376             
377             onFailMessage = "Unable to setup statistics";
378             setupStatTracking();
379             
380             onFailMessage = "Unable to setup crawl modules";
381             setupCrawlModules();
382         } catch (Exception e) {
383             String tmp = "On crawl: "
384                 + settingsHandler.getSettingsObject(null).getName() + " " +
385                 onFailMessage;
386             LOGGER.log(Level.SEVERE, tmp, e);
387             throw new InitializationException(tmp, e);
388         }
389 
390         // force creation of DNS Cache now -- avoids CacheCleaner in toe-threads group
391         // also cap size at 1 (we never wanta cached value; 0 is non-operative)
392         Lookup.getDefaultCache(DClass.IN).setMaxEntries(1);
393         //dns.getRecords("localhost", Type.A, DClass.IN);
394         
395         loopingToes = new AtomicInteger(0);
396         setupToePool();
397         setThresholds();
398         
399         reserveMemory = new LinkedList<char[]>();
400         for(int i = 1; i < RESERVE_BLOCKS; i++) {
401             reserveMemory.add(new char[RESERVE_BLOCK_SIZE]);
402         }
403     }
404 
405     /***
406      * Utility method to install this crawl's SettingsHandler into the 
407      * 'global' (for this thread) holder, so that any subsequent 
408      * deserialization operations in this thread can find it. 
409      * 
410      * @param sH
411      */
412     public void installThreadContextSettingsHandler() {
413         SettingsHandler.setThreadContextSettingsHandler(settingsHandler);
414     }
415     
416     /***
417      * Does setup of checkpoint recover.
418      * Copies bdb log files into state dir.
419      * @throws IOException
420      */
421     protected void setupCheckpointRecover()
422     throws IOException {
423         long started = System.currentTimeMillis();;
424         if (LOGGER.isLoggable(Level.FINE)) {
425             LOGGER.fine("Starting recovery setup -- copying into place " +
426                 "bdbje log files -- for checkpoint named " +
427                 this.checkpointRecover.getDisplayName());
428         }
429         // Mark context we're in a recovery.
430         this.checkpointer.recover(this);
431         this.progressStats.info("CHECKPOINT RECOVER " +
432             this.checkpointRecover.getDisplayName());
433         // Copy the bdb log files to the state dir so we don't damage
434         // old checkpoint.  If thousands of log files, can take
435         // tens of minutes (1000 logs takes ~5 minutes to java copy,
436         // dependent upon hardware).  If log file already exists over in the
437         // target state directory, we do not overwrite -- we assume the log
438         // file in the target same as one we'd copy from the checkpoint dir.
439         File bdbSubDir = CheckpointUtils.
440             getBdbSubDirectory(this.checkpointRecover.getDirectory());
441         List<IOException> errs = new ArrayList<IOException>();
442         FileUtils.copyFiles(bdbSubDir, CheckpointUtils.getJeLogsFilter(),
443             getStateDisk(), true, false, errs);
444         for (IOException ioe : errs) {
445             LOGGER.log(Level.SEVERE, "Problem copying checkpoint files: "
446                     +"checkpoint may be corrupt",ioe);
447         }
448         if (LOGGER.isLoggable(Level.INFO)) {
449             LOGGER.info("Finished recovery setup for checkpoint named " +
450                 this.checkpointRecover.getDisplayName() + " in " +
451                 (System.currentTimeMillis() - started) + "ms.");
452         }
453     }
454     
455     protected boolean getCheckpointCopyBdbjeLogs() {
456         return ((Boolean)this.order.getUncheckedAttribute(null,
457             CrawlOrder.ATTR_CHECKPOINT_COPY_BDBJE_LOGS)).booleanValue();
458     }
459     
460     private void setupBdb()
461     throws FatalConfigurationException, AttributeNotFoundException {
462         EnvironmentConfig envConfig = new EnvironmentConfig();
463         envConfig.setAllowCreate(true);
464         int bdbCachePercent = ((Integer)this.order.
465             getAttribute(null, CrawlOrder.ATTR_BDB_CACHE_PERCENT)).intValue();
466         if(bdbCachePercent > 0) {
467             // Operator has expressed a preference; override BDB default or 
468             // je.properties value
469             envConfig.setCachePercent(bdbCachePercent);
470         }
471         envConfig.setSharedCache(true);
472         envConfig.setLockTimeout(5000000); // 5 seconds
473         if (LOGGER.isLoggable(Level.FINEST)) {
474             envConfig.setConfigParam("java.util.logging.level", "SEVERE");
475             envConfig.setConfigParam("java.util.logging.level.evictor",
476                 "SEVERE");
477             envConfig.setConfigParam("java.util.logging.ConsoleHandler.on",
478                 "true");
479         }
480 
481         if (!getCheckpointCopyBdbjeLogs()) {
482             // If we are not copying files on checkpoint, then set bdbje to not
483             // remove its log files so that its possible to later assemble
484             // (manually) all needed to run a recovery using mix of current
485             // bdbje logs and those its marked for deletion.
486             envConfig.setConfigParam("je.cleaner.expunge", "false");
487         }
488                 
489         try {
490             this.bdbEnvironment = new EnhancedEnvironment(getStateDisk(), envConfig);
491             if (LOGGER.isLoggable(Level.FINE)) {
492                 // Write out the bdb configuration.
493                 envConfig = bdbEnvironment.getConfig();
494                 LOGGER.fine("BdbConfiguration: Cache percentage " +
495                     envConfig.getCachePercent() +
496                     ", cache size " + envConfig.getCacheSize());
497             }
498         } catch (DatabaseException e) {
499             e.printStackTrace();
500             throw new FatalConfigurationException(e.getMessage());
501         }
502     }
503     
504     /***
505      * @return the shared EnhancedEnvironment
506      */
507     public EnhancedEnvironment getBdbEnvironment() {
508         return this.bdbEnvironment;
509     }
510     
511     /***
512      * @deprecated use EnhancedEnvironment's getClassCatalog() instead
513      */
514     public StoredClassCatalog getClassCatalog() {
515         return this.bdbEnvironment.getClassCatalog();
516     }
517 
518     /***
519      * Register for CrawlStatus events.
520      *
521      * @param cl a class implementing the CrawlStatusListener interface
522      *
523      * @see CrawlStatusListener
524      */
525     public void addCrawlStatusListener(CrawlStatusListener cl) {
526         synchronized (this.registeredCrawlStatusListeners) {
527             this.registeredCrawlStatusListeners.add(cl);
528         }
529     }
530 
531     /***
532      * Register for CrawlURIDisposition events.
533      *
534      * @param cl a class implementing the CrawlURIDispostionListener interface
535      *
536      * @see CrawlURIDispositionListener
537      */
538     public void addCrawlURIDispositionListener(CrawlURIDispositionListener cl) {
539         registeredCrawlURIDispositionListener = null;
540         if (registeredCrawlURIDispositionListeners == null) {
541             // First listener;
542             registeredCrawlURIDispositionListener = cl;
543             //Only used for the first one while it is the only one.
544             registeredCrawlURIDispositionListeners 
545              = new ArrayList<CrawlURIDispositionListener>(1);
546             //We expect it to be very small.
547         }
548         registeredCrawlURIDispositionListeners.add(cl);
549     }
550 
551     /***
552      * Allows an external class to raise a CrawlURIDispostion
553      * crawledURISuccessful event that will be broadcast to all listeners that
554      * have registered with the CrawlController.
555      *
556      * @param curi - The CrawlURI that will be sent with the event notification.
557      *
558      * @see CrawlURIDispositionListener#crawledURISuccessful(CrawlURI)
559      */
560     public void fireCrawledURISuccessfulEvent(CrawlURI curi) {
561         if (registeredCrawlURIDispositionListener != null) {
562             // Then we'll just use that.
563             registeredCrawlURIDispositionListener.crawledURISuccessful(curi);
564         } else {
565             // Go through the list.
566             if (registeredCrawlURIDispositionListeners != null
567                 && registeredCrawlURIDispositionListeners.size() > 0) {
568                 Iterator it = registeredCrawlURIDispositionListeners.iterator();
569                 while (it.hasNext()) {
570                     (
571                         (CrawlURIDispositionListener) it
572                             .next())
573                             .crawledURISuccessful(
574                         curi);
575                 }
576             }
577         }
578     }
579 
580     /***
581      * Allows an external class to raise a CrawlURIDispostion
582      * crawledURINeedRetry event that will be broadcast to all listeners that
583      * have registered with the CrawlController.
584      *
585      * @param curi - The CrawlURI that will be sent with the event notification.
586      *
587      * @see CrawlURIDispositionListener#crawledURINeedRetry(CrawlURI)
588      */
589     public void fireCrawledURINeedRetryEvent(CrawlURI curi) {
590         if (registeredCrawlURIDispositionListener != null) {
591             // Then we'll just use that.
592             registeredCrawlURIDispositionListener.crawledURINeedRetry(curi);
593             return;
594         }
595         
596         // Go through the list.
597         if (registeredCrawlURIDispositionListeners != null
598                 && registeredCrawlURIDispositionListeners.size() > 0) {
599             for (Iterator i = registeredCrawlURIDispositionListeners.iterator();
600                     i.hasNext();) {
601                 ((CrawlURIDispositionListener)i.next()).crawledURINeedRetry(curi);
602             }
603         }
604     }
605 
606     /***
607      * Allows an external class to raise a CrawlURIDispostion
608      * crawledURIDisregard event that will be broadcast to all listeners that
609      * have registered with the CrawlController.
610      * 
611      * @param curi -
612      *            The CrawlURI that will be sent with the event notification.
613      * 
614      * @see CrawlURIDispositionListener#crawledURIDisregard(CrawlURI)
615      */
616     public void fireCrawledURIDisregardEvent(CrawlURI curi) {
617         if (registeredCrawlURIDispositionListener != null) {
618             // Then we'll just use that.
619             registeredCrawlURIDispositionListener.crawledURIDisregard(curi);
620         } else {
621             // Go through the list.
622             if (registeredCrawlURIDispositionListeners != null
623                 && registeredCrawlURIDispositionListeners.size() > 0) {
624                 Iterator it = registeredCrawlURIDispositionListeners.iterator();
625                 while (it.hasNext()) {
626                     (
627                         (CrawlURIDispositionListener) it
628                             .next())
629                             .crawledURIDisregard(
630                         curi);
631                 }
632             }
633         }
634     }
635 
636     /***
637      * Allows an external class to raise a CrawlURIDispostion crawledURIFailure event
638      * that will be broadcast to all listeners that have registered with the CrawlController.
639      *
640      * @param curi - The CrawlURI that will be sent with the event notification.
641      *
642      * @see CrawlURIDispositionListener#crawledURIFailure(CrawlURI)
643      */
644     public void fireCrawledURIFailureEvent(CrawlURI curi) {
645         if (registeredCrawlURIDispositionListener != null) {
646             // Then we'll just use that.
647             registeredCrawlURIDispositionListener.crawledURIFailure(curi);
648         } else {
649             // Go through the list.
650             if (registeredCrawlURIDispositionListeners != null
651                 && registeredCrawlURIDispositionListeners.size() > 0) {
652                 Iterator it = registeredCrawlURIDispositionListeners.iterator();
653                 while (it.hasNext()) {
654                     ((CrawlURIDispositionListener)it.next())
655                         .crawledURIFailure(curi);
656                 }
657             }
658         }
659     }
660 
661     private void setupCrawlModules() throws FatalConfigurationException,
662              AttributeNotFoundException, MBeanException, ReflectionException {
663         if (scope == null) {
664             scope = (CrawlScope) order.getAttribute(CrawlScope.ATTR_NAME);
665             scope.initialize(this);
666         }
667         try {
668             this.serverCache = new ServerCache(this);
669         } catch (Exception e) {
670             throw new FatalConfigurationException("Unable to" +
671                " initialize frontier (Failed setup of ServerCache) " + e);
672         }
673         
674         if (this.frontier == null) {
675             this.frontier = (Frontier)order.getAttribute(Frontier.ATTR_NAME);
676             try {
677                 frontier.initialize(this);
678                 frontier.pause(); // Pause until begun
679                 // Run recovery if recoverPath points to a file (If it points
680                 // to a directory, its a checkpoint recovery).
681                 // TODO: make recover path relative to job root dir.
682                 if (!isCheckpointRecover()) {
683                     runFrontierRecover((String)order.
684                         getAttribute(CrawlOrder.ATTR_RECOVER_PATH));
685                 }
686             } catch (IOException e) {
687                 throw new FatalConfigurationException(
688                     "unable to initialize frontier: " + e);
689             }
690         }
691 
692         // Setup processors
693         if (processorChains == null) {
694             processorChains = new ProcessorChainList(order);
695         }
696     }
697     
698     protected void runFrontierRecover(String recoverPath)
699             throws AttributeNotFoundException, MBeanException,
700             ReflectionException, FatalConfigurationException {
701         if (recoverPath == null || recoverPath.length() <= 0) {
702             return;
703         }
704         File f = new File(recoverPath);
705         if (!f.exists()) {
706             LOGGER.severe("Recover file does not exist " + f.getAbsolutePath());
707             return;
708         }
709         if (!f.isFile()) {
710             // Its a directory if supposed to be doing a checkpoint recover.
711             return;
712         }
713         boolean retainFailures = ((Boolean)order.
714           getAttribute(CrawlOrder.ATTR_RECOVER_RETAIN_FAILURES)).booleanValue();
715         try {
716             frontier.importRecoverLog(f.getAbsolutePath(), retainFailures);
717         } catch (IOException e) {
718             e.printStackTrace();
719             throw (FatalConfigurationException) new FatalConfigurationException(
720                 "Recover.log " + recoverPath + " problem: " + e).initCause(e);
721         }
722     }
723 
724     private void setupDisk() throws AttributeNotFoundException {
725         String diskPath
726             = (String) order.getAttribute(null, CrawlOrder.ATTR_DISK_PATH);
727         this.disk = getSettingsHandler().
728             getPathRelativeToWorkingDirectory(diskPath);
729         this.disk.mkdirs();
730         this.logsDisk = getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
731         this.checkpointsDisk = getSettingsDir(CrawlOrder.ATTR_CHECKPOINTS_PATH);
732         this.stateDisk = getSettingsDir(CrawlOrder.ATTR_STATE_PATH);
733         this.scratchDisk = getSettingsDir(CrawlOrder.ATTR_SCRATCH_PATH);
734     }
735     
736     /***
737      * @return The logging directory or null if problem reading the settings.
738      */
739     public File getLogsDir() {
740         File f = null;
741         try {
742             f = getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
743         } catch (AttributeNotFoundException e) {
744             LOGGER.severe("Failed get of logs directory: " + e.getMessage());
745         }
746         return f;
747     }
748     
749     /***
750      * Return fullpath to the directory named by <code>key</code>
751      * in settings.
752      * If directory does not exist, it and all intermediary dirs
753      * will be created.
754      * @param key Key to use going to settings.
755      * @return Full path to directory named by <code>key</code>.
756      * @throws AttributeNotFoundException
757      */
758     public File getSettingsDir(String key)
759     throws AttributeNotFoundException {
760         String path = (String)order.getAttribute(null, key);
761         File f = new File(path);
762         if (!f.isAbsolute()) {
763             f = new File(disk.getPath(), path);
764         }
765         if (!f.exists()) {
766             f.mkdirs();
767         }
768         return f;
769     }
770 
771     /***
772      * Setup the statistics tracker.
773      * The statistics object must be created before modules can use it.
774      * Do it here now so that when modules retrieve the object from the
775      * controller during initialization (which some do), its in place.
776      * @throws InvalidAttributeValueException
777      * @throws FatalConfigurationException
778      */
779     private void setupStatTracking()
780     throws InvalidAttributeValueException, FatalConfigurationException {
781         MapType loggers = order.getLoggers();
782         final String cstName = "crawl-statistics";
783         if (loggers.isEmpty(null)) {
784             if (!isCheckpointRecover() && this.statistics == null) {
785                 this.statistics = new StatisticsTracker(cstName);
786             }
787             loggers.addElement(null, (StatisticsTracker)this.statistics);
788         }
789         
790         if (isCheckpointRecover()) {
791             restoreStatisticsTracker(loggers, cstName);
792         }
793 
794         for (Iterator it = loggers.iterator(null); it.hasNext();) {
795             StatisticsTracking tracker = (StatisticsTracking)it.next();
796             tracker.initialize(this);
797             if (this.statistics == null) {
798                 this.statistics = tracker;
799             }
800         }
801     }
802     
803     protected void restoreStatisticsTracker(MapType loggers,
804         String replaceName)
805     throws FatalConfigurationException {
806         try {
807             // Add the deserialized statstracker to the settings system.
808             loggers.removeElement(loggers.globalSettings(), replaceName);
809             loggers.addElement(loggers.globalSettings(),
810                 (StatisticsTracker)this.statistics);
811          } catch (Exception e) {
812              throw convertToFatalConfigurationException(e);
813          }
814     }
815     
816     protected FatalConfigurationException
817             convertToFatalConfigurationException(Exception e) {
818         FatalConfigurationException fce =
819             new FatalConfigurationException("Converted exception: " +
820                e.getMessage());
821         fce.setStackTrace(e.getStackTrace());
822         return fce;
823     }
824 
825     private void setupLogs() throws IOException {
826         String logsPath = logsDisk.getAbsolutePath() + File.separatorChar;
827         uriProcessing = Logger.getLogger(LOGNAME_CRAWL + "." + logsPath);
828         runtimeErrors = Logger.getLogger(LOGNAME_RUNTIME_ERRORS + "." +
829             logsPath);
830         localErrors = Logger.getLogger(LOGNAME_LOCAL_ERRORS + "." + logsPath);
831         uriErrors = Logger.getLogger(LOGNAME_URI_ERRORS + "." + logsPath);
832         progressStats = Logger.getLogger(LOGNAME_PROGRESS_STATISTICS + "." +
833             logsPath);
834 
835         this.fileHandlers = new HashMap<Logger,FileHandler>();
836 
837         setupLogFile(uriProcessing,
838             logsPath + LOGNAME_CRAWL + CURRENT_LOG_SUFFIX,
839             new UriProcessingFormatter(), true);
840 
841         setupLogFile(runtimeErrors,
842             logsPath + LOGNAME_RUNTIME_ERRORS + CURRENT_LOG_SUFFIX,
843             new RuntimeErrorFormatter(), true);
844 
845         setupLogFile(localErrors,
846             logsPath + LOGNAME_LOCAL_ERRORS + CURRENT_LOG_SUFFIX,
847             new LocalErrorFormatter(), true);
848 
849         setupLogFile(uriErrors,
850             logsPath + LOGNAME_URI_ERRORS + CURRENT_LOG_SUFFIX,
851             new UriErrorFormatter(), true);
852 
853         setupLogFile(progressStats,
854             logsPath + LOGNAME_PROGRESS_STATISTICS + CURRENT_LOG_SUFFIX,
855             new StatisticsLogFormatter(), true);
856 
857     }
858 
859     private void setupLogFile(Logger logger, String filename, Formatter f,
860             boolean shouldManifest) throws IOException, SecurityException {
861         GenerationFileHandler fh = new GenerationFileHandler(filename, true,
862             shouldManifest);
863         fh.setFormatter(f);
864         logger.addHandler(fh);
865         addToManifest(filename, MANIFEST_LOG_FILE, shouldManifest);
866         logger.setUseParentHandlers(false);
867         this.fileHandlers.put(logger, fh);
868     }
869     
870     protected void rotateLogFiles(String generationSuffix)
871     throws IOException {
872         if (this.state != PAUSED && this.state != CHECKPOINTING) {
873             throw new IllegalStateException("Pause crawl before requesting " +
874                 "log rotation.");
875         }
876         for (Iterator i = fileHandlers.keySet().iterator(); i.hasNext();) {
877             Logger l = (Logger)i.next();
878             GenerationFileHandler gfh =
879                 (GenerationFileHandler)fileHandlers.get(l);
880             GenerationFileHandler newGfh =
881                 gfh.rotate(generationSuffix, CURRENT_LOG_SUFFIX);
882             if (gfh.shouldManifest()) {
883                 addToManifest((String) newGfh.getFilenameSeries().get(1),
884                     MANIFEST_LOG_FILE, newGfh.shouldManifest());
885             }
886             l.removeHandler(gfh);
887             l.addHandler(newGfh);
888             fileHandlers.put(l, newGfh);
889         }
890     }
891 
892     /***
893      * Close all log files and remove handlers from loggers.
894      */
895     public void closeLogFiles() {
896        for (Iterator i = fileHandlers.keySet().iterator(); i.hasNext();) {
897             Logger l = (Logger)i.next();
898             GenerationFileHandler gfh =
899                 (GenerationFileHandler)fileHandlers.get(l);
900             gfh.close();
901             l.removeHandler(gfh);
902         }
903     }
904 
905     /***
906      * Sets the values for max bytes, docs and time based on crawl order. 
907      */
908     private void setThresholds() {
909         try {
910             maxBytes =
911                 ((Long) order.getAttribute(CrawlOrder.ATTR_MAX_BYTES_DOWNLOAD))
912                     .longValue();
913         } catch (Exception e) {
914             maxBytes = 0;
915         }
916         try {
917             maxDocument =
918                 ((Long) order
919                     .getAttribute(CrawlOrder.ATTR_MAX_DOCUMENT_DOWNLOAD))
920                     .longValue();
921         } catch (Exception e) {
922             maxDocument = 0;
923         }
924         try {
925             maxTime =
926                 ((Long) order.getAttribute(CrawlOrder.ATTR_MAX_TIME_SEC))
927                     .longValue();
928         } catch (Exception e) {
929             maxTime = 0;
930         }
931     }
932 
933     /***
934      * @return Object this controller is using to track crawl statistics
935      */
936     public StatisticsTracking getStatistics() {
937         return statistics==null ?
938             new StatisticsTracker("crawl-statistics"): this.statistics;
939     }
940     
941     /***
942      * Send crawl change event to all listeners.
943      * @param newState State change we're to tell listeners' about.
944      * @param message Message on state change.
945      * @see #sendCheckpointEvent(File) for special case event sending
946      * telling listeners to checkpoint.
947      */
948     protected void sendCrawlStateChangeEvent(Object newState, String message) {
949         synchronized (this.registeredCrawlStatusListeners) {
950             this.state = newState;
951             for (Iterator i = this.registeredCrawlStatusListeners.iterator();
952                     i.hasNext();) {
953                 CrawlStatusListener l = (CrawlStatusListener)i.next();
954                 if (newState.equals(PAUSED)) {
955                    l.crawlPaused(message);
956                 } else if (newState.equals(RUNNING)) {
957                     l.crawlResuming(message);
958                 } else if (newState.equals(PAUSING)) {
959                    l.crawlPausing(message);
960                 } else if (newState.equals(STARTED)) {
961                     l.crawlStarted(message);
962                 } else if (newState.equals(STOPPING)) {
963                     l.crawlEnding(message);
964                 } else if (newState.equals(FINISHED)) {
965                     l.crawlEnded(message);
966                 } else if (newState.equals(PREPARING)) {
967                     l.crawlResuming(message);
968                 } else {
969                     throw new RuntimeException("Unknown state: " + newState);
970                 }
971                 if (LOGGER.isLoggable(Level.FINE)) {
972                     LOGGER.fine("Sent " + newState + " to " + l);
973                 }
974             }
975             LOGGER.fine("Sent " + newState);
976         }
977     }
978     
979     /***
980      * Send the checkpoint event.
981      * Has its own method apart from
982      * {@link #sendCrawlStateChangeEvent(Object, String)} because checkpointing
983      * throws an Exception (Didn't want to have to wrap all of the
984      * sendCrawlStateChangeEvent in try/catches).
985      * @param checkpointDir Where to write checkpoint state to.
986      * @throws Exception
987      */
988     protected void sendCheckpointEvent(File checkpointDir) throws Exception {
989         synchronized (this.registeredCrawlStatusListeners) {
990             if (this.state != PAUSED) {
991                 throw new IllegalStateException("Crawler must be completly " +
992                     "paused before checkpointing can start");
993             }
994             this.state = CHECKPOINTING;
995             for (Iterator i = this.registeredCrawlStatusListeners.iterator();
996                     i.hasNext();) {
997                 CrawlStatusListener l = (CrawlStatusListener)i.next();
998                 l.crawlCheckpoint(checkpointDir);
999                 if (LOGGER.isLoggable(Level.FINE)) {
1000                     LOGGER.fine("Sent " + CHECKPOINTING + " to " + l);
1001                 }
1002             }
1003             LOGGER.fine("Sent " + CHECKPOINTING);
1004         }
1005     }
1006 
1007     /***
1008      * Operator requested crawl begin
1009      */
1010     public void requestCrawlStart() {
1011         runProcessorInitialTasks();
1012 
1013         sendCrawlStateChangeEvent(STARTED, CrawlJob.STATUS_PENDING);
1014         String jobState;
1015         state = RUNNING;
1016         jobState = CrawlJob.STATUS_RUNNING;
1017         sendCrawlStateChangeEvent(this.state, jobState);
1018 
1019         // A proper exit will change this value.
1020         this.sExit = CrawlJob.STATUS_FINISHED_ABNORMAL;
1021         
1022         Thread statLogger = new Thread(statistics);
1023         statLogger.setName("StatLogger");
1024         statLogger.start();
1025         
1026         frontier.start();
1027     }
1028 
1029     /***
1030      * Called when the last toethread exits.
1031      */
1032     protected void completeStop() {
1033         LOGGER.fine("Entered complete stop.");
1034         // Run processors' final tasks
1035         runProcessorFinalTasks();
1036         // Run frontier finalTasks (before crawl can be considered 'finished')
1037         frontier.finalTasks(); 
1038         // Ok, now we are ready to exit.
1039         sendCrawlStateChangeEvent(FINISHED, this.sExit);
1040         synchronized (this.registeredCrawlStatusListeners) {
1041             // Remove all listeners now we're done with them.
1042             this.registeredCrawlStatusListeners.
1043                 removeAll(this.registeredCrawlStatusListeners);
1044             this.registeredCrawlStatusListeners = null;
1045         }
1046         
1047         closeLogFiles();
1048         
1049         // Release reference to logger file handler instances.
1050         this.fileHandlers = null;
1051         this.uriErrors = null;
1052         this.uriProcessing = null;
1053         this.localErrors = null;
1054         this.runtimeErrors = null;
1055         this.progressStats = null;
1056         this.reports = null;
1057         this.manifest = null;
1058 
1059         // Do cleanup.
1060         this.statistics = null;
1061         this.frontier = null;
1062         this.disk = null;
1063         this.scratchDisk = null;
1064         this.order = null;
1065         this.scope = null;
1066         this.reserveMemory = null;
1067         this.processorChains = null;
1068         if (this.serverCache != null) {
1069             this.serverCache.cleanup();
1070             this.serverCache = null;
1071         }
1072         if (this.checkpointer != null) {
1073             this.checkpointer.cleanup();
1074             this.checkpointer = null;
1075         }
1076         if (this.bdbEnvironment != null) {
1077             try {
1078                 this.bdbEnvironment.sync();
1079                 this.bdbEnvironment.close();
1080             } catch (DatabaseException e) {
1081                 e.printStackTrace();
1082             }
1083             this.bdbEnvironment = null;
1084         }
1085         this.bigmaps = null;
1086         if (this.settingsHandler !=  null) {
1087             this.settingsHandler.cleanup();
1088         }
1089         this.settingsHandler = null;
1090         if (this.toePool != null) {
1091             this.toePool.cleanup();
1092             // I played with launching a thread here to do cleanup of the
1093             // ToePool ThreadGroup (making sure the cleanup thread was not
1094             // in the ToePool ThreadGroup).  Did this because ToePools seemed
1095             // to be sticking around holding references to CrawlController at
1096             // least.  Need to spend more time looking to see that this is
1097             // still the case even after adding the above toePool#cleanup call.
1098         }
1099         this.toePool = null;
1100         LOGGER.fine("Finished crawl.");
1101     }
1102     
1103     synchronized void completePause() {
1104         // Send a notifyAll. At least checkpointing thread may be waiting on a
1105         // complete pause.
1106         notifyAll();
1107         sendCrawlStateChangeEvent(PAUSED, CrawlJob.STATUS_PAUSED);
1108     }
1109 
1110     private boolean shouldContinueCrawling() {
1111         if (frontier.isEmpty()) {
1112             this.sExit = CrawlJob.STATUS_FINISHED;
1113             return false;
1114         }
1115 
1116         if (maxBytes > 0 && statistics.totalBytesCrawled() >= maxBytes) {
1117             // Hit the max byte download limit!
1118             sExit = CrawlJob.STATUS_FINISHED_DATA_LIMIT;
1119             return false;
1120         } else if (maxDocument > 0
1121                 && frontier.succeededFetchCount() >= maxDocument) {
1122             // Hit the max document download limit!
1123             this.sExit = CrawlJob.STATUS_FINISHED_DOCUMENT_LIMIT;
1124             return false;
1125         } else if (maxTime > 0 &&
1126                 statistics.crawlDuration() >= maxTime * 1000) {
1127             // Hit the max byte download limit!
1128             this.sExit = CrawlJob.STATUS_FINISHED_TIME_LIMIT;
1129             return false;
1130         }
1131         return state == RUNNING;
1132     }
1133 
1134     /***
1135      * Request a checkpoint.
1136      * Sets a checkpointing thread running.
1137      * @throws IllegalStateException Thrown if crawl is not in paused state
1138      * (Crawl must be first paused before checkpointing).
1139      */
1140     public synchronized void requestCrawlCheckpoint()
1141     throws IllegalStateException {
1142         if (this.checkpointer == null) {
1143             return;
1144         }
1145         if (this.checkpointer.isCheckpointing()) {
1146             throw new IllegalStateException("Checkpoint already running.");
1147         }
1148         this.checkpointer.checkpoint();
1149     }   
1150     
1151     /***
1152      * @return True if checkpointing.
1153      */
1154     public boolean isCheckpointing() {
1155         return this.state == CHECKPOINTING;
1156     }
1157     
1158     /***
1159      * Run checkpointing.
1160      * CrawlController takes care of managing the checkpointing/serializing
1161      * of bdb, the StatisticsTracker, and the CheckpointContext.  Other
1162      * modules that want to revive themselves on checkpoint recovery need to
1163      * save state during their {@link CrawlStatusListener#crawlCheckpoint(File)}
1164      * invocation and then in their #initialize if a module,
1165      * or in their #initialTask if a processor, check with the CrawlController
1166      * if its checkpoint recovery. If it is, read in their old state from the
1167      * pointed to  checkpoint directory.
1168      * <p>Default access only to be called by Checkpointer.
1169      * @throws Exception
1170      */
1171     void checkpoint()
1172     throws Exception {
1173         // Tell registered listeners to checkpoint.
1174         sendCheckpointEvent(this.checkpointer.
1175             getCheckpointInProgressDirectory());
1176         
1177         // Rotate off crawler logs.
1178         LOGGER.fine("Rotating log files.");
1179         rotateLogFiles(CURRENT_LOG_SUFFIX + "." +
1180             this.checkpointer.getNextCheckpointName());
1181 
1182         // Sync the BigMap contents to bdb, if their bdb bigmaps.
1183         LOGGER.fine("BigMaps.");
1184         checkpointBigMaps(this.checkpointer.getCheckpointInProgressDirectory());
1185 
1186         // Note, on deserialization, the super CrawlType#parent
1187         // needs to be restored. Parent is '/crawl-order/loggers'.
1188         // The settings handler for this module also needs to be
1189         // restored. Both of these fields are private in the
1190         // super class. Adding the restored ST to crawl order should take
1191         // care of this.
1192 
1193         // Checkpoint bdb environment.
1194         LOGGER.fine("Bdb environment.");
1195         checkpointBdb(this.checkpointer.getCheckpointInProgressDirectory());
1196 
1197         // Make copy of order, seeds, and settings.
1198         LOGGER.fine("Copying settings.");
1199         copySettings(this.checkpointer.getCheckpointInProgressDirectory());
1200 
1201         // Checkpoint this crawlcontroller.
1202         CheckpointUtils.writeObjectToFile(this,
1203             this.checkpointer.getCheckpointInProgressDirectory());
1204     }
1205     
1206     /***
1207      * Copy off the settings.
1208      * @param checkpointDir Directory to write checkpoint to.
1209      * @throws IOException 
1210      */
1211     protected void copySettings(final File checkpointDir) throws IOException {
1212         final List files = this.settingsHandler.getListOfAllFiles();
1213         boolean copiedSettingsDir = false;
1214         final File settingsDir = new File(this.disk, "settings");
1215         for (final Iterator i = files.iterator(); i.hasNext();) {
1216             File f = new File((String)i.next());
1217             if (f.getAbsolutePath().startsWith(settingsDir.getAbsolutePath())) {
1218                 if (copiedSettingsDir) {
1219                     // Skip.  We've already copied this member of the
1220                     // settings directory.
1221                     continue;
1222                 }
1223                 // Copy 'settings' dir all in one lump, not a file at a time.
1224                 copiedSettingsDir = true;
1225                 FileUtils.copyFiles(settingsDir,
1226                     new File(checkpointDir, settingsDir.getName()));
1227                 continue;
1228             }
1229             FileUtils.copyFiles(f, f.isDirectory()? checkpointDir:
1230                 new File(checkpointDir, f.getName()));
1231         }
1232     }
1233     
1234     /***
1235      * Checkpoint bdb.
1236      * I used do a call to log cleaning as suggested in je-2.0 javadoc but takes
1237      * way too much time (20minutes for a crawl of 1million items). Assume
1238      * cleaner is keeping up. Below was log cleaning loop .
1239      * <pre>int totalCleaned = 0;
1240      * for (int cleaned = 0; (cleaned = this.bdbEnvironment.cleanLog()) != 0;
1241      *  totalCleaned += cleaned) {
1242      *      LOGGER.fine("Cleaned " + cleaned + " log files.");
1243      * }
1244      * </pre>
1245      * <p>I also used to do a sync. But, from Mark Hayes, sync and checkpoint
1246      * are effectively same thing only sync is not configurable.  He suggests
1247      * doing one or the other:
1248      * <p>MS: Reading code, Environment.sync() is a checkpoint.  Looks like
1249      * I don't need to call a checkpoint after calling a sync?
1250      * <p>MH: Right, they're almost the same thing -- just do one or the other,
1251      * not both.  With the new API, you'll need to do a checkpoint not a
1252      * sync, because the sync() method has no config parameter.  Don't worry
1253      * -- it's fine to do a checkpoint even though you're not using.
1254      * @param checkpointDir Directory to write checkpoint to.
1255      * @throws DatabaseException 
1256      * @throws IOException 
1257      * @throws RuntimeException Thrown if failed setup of new bdb environment.
1258      */
1259     protected void checkpointBdb(File checkpointDir)
1260     throws DatabaseException, IOException, RuntimeException {
1261         EnvironmentConfig envConfig = this.bdbEnvironment.getConfig();
1262         final List bkgrdThreads = Arrays.asList(new String []
1263             {"je.env.runCheckpointer", "je.env.runCleaner",
1264                 "je.env.runINCompressor"});
1265         try {
1266             // Disable background threads
1267             setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "false");
1268             // Do a force checkpoint.  Thats what a sync does (i.e. doSync).
1269             CheckpointConfig chkptConfig = new CheckpointConfig();
1270             chkptConfig.setForce(true);
1271             
1272             // Mark Hayes of sleepycat says:
1273             // "The default for this property is false, which gives the current
1274             // behavior (allow deltas).  If this property is true, deltas are
1275             // prohibited -- full versions of internal nodes are always logged
1276             // during the checkpoint. When a full version of an internal node
1277             // is logged during a checkpoint, recovery does not need to process
1278             // it at all.  It is only fetched if needed by the application,
1279             // during normal DB operations after recovery. When a delta of an
1280             // internal node is logged during a checkpoint, recovery must
1281             // process it by fetching the full version of the node from earlier
1282             // in the log, and then applying the delta to it.  This can be
1283             // pretty slow, since it is potentially a large amount of
1284             // random I/O."
1285             chkptConfig.setMinimizeRecoveryTime(true);
1286             this.bdbEnvironment.checkpoint(chkptConfig);
1287             LOGGER.fine("Finished bdb checkpoint.");
1288             
1289             // From the sleepycat folks: A trick for flipping db logs.
1290             EnvironmentImpl envImpl = 
1291                 DbInternal.getEnvironmentImpl(this.bdbEnvironment);
1292             long firstFileInNextSet =
1293                 DbLsn.getFileNumber(envImpl.forceLogFileFlip());
1294             // So the last file in the checkpoint is firstFileInNextSet - 1.
1295             // Write manifest of all log files into the bdb directory.
1296             final String lastBdbCheckpointLog =
1297                 getBdbLogFileName(firstFileInNextSet - 1);
1298             processBdbLogs(checkpointDir, lastBdbCheckpointLog);
1299             LOGGER.fine("Finished processing bdb log files.");
1300         } finally {
1301             // Restore background threads.
1302             setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "true");
1303         }
1304     }
1305     
1306     protected void processBdbLogs(final File checkpointDir,
1307             final String lastBdbCheckpointLog) throws IOException {
1308         File bdbDir = CheckpointUtils.getBdbSubDirectory(checkpointDir);
1309         if (!bdbDir.exists()) {
1310             bdbDir.mkdir();
1311         }
1312         PrintWriter pw = new PrintWriter(new FileOutputStream(new File(
1313              checkpointDir, "bdbje-logs-manifest.txt")));
1314         try {
1315             // Don't copy any beyond the last bdb log file (bdbje can keep
1316             // writing logs after checkpoint).
1317             boolean pastLastLogFile = false;
1318             Set<String> srcFilenames = null;
1319             final boolean copyFiles = getCheckpointCopyBdbjeLogs();
1320             do {
1321                 FilenameFilter filter = CheckpointUtils.getJeLogsFilter();
1322                 srcFilenames =
1323                     new HashSet<String>(Arrays.asList(
1324                             getStateDisk().list(filter)));
1325                 List tgtFilenames = Arrays.asList(bdbDir.list(filter));
1326                 if (tgtFilenames != null && tgtFilenames.size() > 0) {
1327                     srcFilenames.removeAll(tgtFilenames);
1328                 }
1329                 if (srcFilenames.size() > 0) {
1330                     // Sort files.
1331                     srcFilenames = new TreeSet<String>(srcFilenames);
1332                     int count = 0;
1333                     for (final Iterator i = srcFilenames.iterator();
1334                             i.hasNext() && !pastLastLogFile;) {
1335                         String name = (String) i.next();
1336                         if (copyFiles) {
1337                             FileUtils.copyFiles(new File(getStateDisk(), name),
1338                                 new File(bdbDir, name));
1339                         }
1340                         pw.println(name);
1341                         if (name.equals(lastBdbCheckpointLog)) {
1342                             // We're done.
1343                             pastLastLogFile = true;
1344                         }
1345                         count++;
1346                     }
1347                     if (LOGGER.isLoggable(Level.FINE)) {
1348                         LOGGER.fine("Copied " + count);
1349                     }
1350                 }
1351             } while (!pastLastLogFile && srcFilenames != null &&
1352                 srcFilenames.size() > 0);
1353         } finally {
1354             pw.close();
1355         }
1356     }
1357  
1358     protected String getBdbLogFileName(final long index) {
1359         String lastBdbLogFileHex = Long.toHexString(index);
1360         StringBuffer buffer = new StringBuffer();
1361         for (int i = 0; i < (8 - lastBdbLogFileHex.length()); i++) {
1362             buffer.append('0');
1363         }
1364         buffer.append(lastBdbLogFileHex);
1365         buffer.append(".jdb");
1366         return buffer.toString();
1367     }
1368     
1369     protected void setBdbjeBkgrdThreads(final EnvironmentConfig config,
1370             final List threads, final String setting) {
1371         for (final Iterator i = threads.iterator(); i.hasNext();) {
1372             config.setConfigParam((String)i.next(), setting);
1373         }
1374     }
1375     
1376     /***
1377      * Get recover checkpoint.
1378      * Returns null if we're NOT in recover mode.
1379      * Looks at ATTR_RECOVER_PATH and if its a directory, assumes checkpoint
1380      * recover. If checkpoint mode, returns Checkpoint instance if
1381      * checkpoint was VALID (else null).
1382      * @return Checkpoint instance if we're in recover checkpoint
1383      * mode and the pointed-to checkpoint was valid.
1384      * @see #isCheckpointRecover()
1385      */
1386     public synchronized Checkpoint getCheckpointRecover() {
1387         if (this.checkpointRecover != null) {
1388             return this.checkpointRecover;
1389         }
1390         return getCheckpointRecover(this.order);
1391     }
1392     
1393     public static Checkpoint getCheckpointRecover(final CrawlOrder order) {
1394         String path = (String)order.getUncheckedAttribute(null,
1395             CrawlOrder.ATTR_RECOVER_PATH);
1396         if (path == null || path.length() <= 0) {
1397             return null;
1398         }
1399         File rp = new File(path);
1400         // Assume if path is to a directory, its a checkpoint recover.
1401         Checkpoint result = null;
1402         if (rp.exists() && rp.isDirectory()) {
1403             Checkpoint cp = new Checkpoint(rp);
1404             if (cp.isValid()) {
1405                 // if valid, set as result.
1406                 result = cp;
1407             }
1408         }
1409         return result;
1410     }
1411     
1412     public static boolean isCheckpointRecover(final CrawlOrder order) {
1413         return getCheckpointRecover(order) != null;
1414     }
1415     
1416     /***
1417      * @return True if we're in checkpoint recover mode. Call
1418      * {@link #getCheckpointRecover()} to get at Checkpoint instance
1419      * that has info on checkpoint directory being recovered from.
1420      */
1421     public boolean isCheckpointRecover() {
1422         return this.checkpointRecover != null;
1423     }
1424 
1425     /***
1426      * Operator requested for crawl to stop.
1427      */
1428     public synchronized void requestCrawlStop() {
1429         requestCrawlStop(CrawlJob.STATUS_ABORTED);
1430     }
1431     
1432     /***
1433      * Operator requested for crawl to stop.
1434      * @param message 
1435      */
1436     public synchronized void requestCrawlStop(String message) {
1437         if (state == STOPPING || state == FINISHED) {
1438             return;
1439         }
1440         if (message == null) {
1441             throw new IllegalArgumentException("Message cannot be null.");
1442         }
1443         this.sExit = message;
1444         beginCrawlStop();
1445     }
1446 
1447     /***
1448      * Start the process of stopping the crawl. 
1449      */
1450     public void beginCrawlStop() {
1451         LOGGER.fine("Started.");
1452         sendCrawlStateChangeEvent(STOPPING, this.sExit);
1453         if (this.frontier != null) {
1454             this.frontier.terminate();
1455             this.frontier.unpause();
1456         }
1457         LOGGER.fine("Finished."); 
1458     }
1459     
1460     /***
1461      * Stop the crawl temporarly.
1462      */
1463     public synchronized void requestCrawlPause() {
1464         if (state == PAUSING || state == PAUSED) {
1465             // Already about to pause
1466             return;
1467         }
1468         sExit = CrawlJob.STATUS_WAITING_FOR_PAUSE;
1469         frontier.pause();
1470         sendCrawlStateChangeEvent(PAUSING, this.sExit);
1471         if (toePool.getActiveToeCount() == 0) {
1472             // if all threads already held, complete pause now
1473             // (no chance to trigger off later held thread)
1474             completePause();
1475         }
1476     }
1477 
1478     /***
1479      * Tell if the controller is paused
1480      * @return true if paused
1481      */
1482     public boolean isPaused() {
1483         return state == PAUSED;
1484     }
1485     
1486     public boolean isPausing() {
1487         return state == PAUSING;
1488     }
1489     
1490     public boolean isRunning() {
1491         return state == RUNNING;
1492     }
1493 
1494     /***
1495      * Resume crawl from paused state
1496      */
1497     public synchronized void requestCrawlResume() {
1498         if (state != PAUSING && state != PAUSED && state != CHECKPOINTING) {
1499             // Can't resume if not been told to pause or if we're in middle of
1500             // a checkpoint.
1501             return;
1502         }
1503         multiThreadMode();
1504         frontier.unpause();
1505         LOGGER.fine("Crawl resumed.");
1506         sendCrawlStateChangeEvent(RUNNING, CrawlJob.STATUS_RUNNING);
1507     }
1508 
1509     /***
1510      * @return Active toe thread count.
1511      */
1512     public int getActiveToeCount() {
1513         if (toePool == null) {
1514             return 0;
1515         }
1516         return toePool.getActiveToeCount();
1517     }
1518 
1519     private void setupToePool() {
1520         toePool = new ToePool(this);
1521         // TODO: make # of toes self-optimizing
1522         toePool.setSize(order.getMaxToes());
1523     }
1524 
1525     /***
1526      * @return The order file instance.
1527      */
1528     public CrawlOrder getOrder() {
1529         return order;
1530     }
1531 
1532     /***
1533      * @return The server cache instance.
1534      */
1535     public ServerCache getServerCache() {
1536         return serverCache;
1537     }
1538 
1539     /***
1540      * @param o
1541      */
1542     public void setOrder(CrawlOrder o) {
1543         order = o;
1544     }
1545 
1546 
1547     /***
1548      * @return The frontier.
1549      */
1550     public Frontier getFrontier() {
1551         return frontier;
1552     }
1553 
1554     /***
1555      * @return This crawl scope.
1556      */
1557     public CrawlScope getScope() {
1558         return scope;
1559     }
1560 
1561     /*** Get the list of processor chains.
1562      *
1563      * @return the list of processor chains.
1564      */
1565     public ProcessorChainList getProcessorChainList() {
1566         return processorChains;
1567     }
1568 
1569     /*** Get the first processor chain.
1570      *
1571      * @return the first processor chain.
1572      */
1573     public ProcessorChain getFirstProcessorChain() {
1574         return processorChains.getFirstChain();
1575     }
1576 
1577     /*** Get the postprocessor chain.
1578      *
1579      * @return the postprocessor chain.
1580      */
1581     public ProcessorChain getPostprocessorChain() {
1582         return processorChains.getLastChain();
1583     }
1584 
1585     /***
1586      * Get the 'working' directory of the current crawl.
1587      * @return the 'working' directory of the current crawl.
1588      */
1589     public File getDisk() {
1590         return disk;
1591     }
1592 
1593     /***
1594      * @return Scratch disk location.
1595      */
1596     public File getScratchDisk() {
1597         return scratchDisk;
1598     }
1599 
1600     /***
1601      * @return State disk location.
1602      */
1603     public File getStateDisk() {
1604         return stateDisk;
1605     }
1606 
1607     /***
1608      * @return The number of ToeThreads
1609      *
1610      * @see ToePool#getToeCount()
1611      */
1612     public int getToeCount() {
1613         return this.toePool == null? 0: this.toePool.getToeCount();
1614     }
1615 
1616     /***
1617      * @return The ToePool
1618      */
1619     public ToePool getToePool() {
1620         return toePool;
1621     }
1622     
1623     /***
1624      * @return toepool one-line report
1625      */
1626     public String oneLineReportThreads() {
1627         // TODO Auto-generated method stub
1628         return toePool.singleLineReport();
1629     }
1630 
1631     /***
1632      * While many settings will update automatically when the SettingsHandler is
1633      * modified, some settings need to be explicitly changed to reflect new
1634      * settings. This includes, number of toe threads and seeds.
1635      */
1636     public void kickUpdate() {
1637         
1638         installThreadContextSettingsHandler();
1639  
1640         toePool.setSize(order.getMaxToes());
1641         
1642         this.scope.kickUpdate();
1643         this.frontier.kickUpdate();
1644         this.processorChains.kickUpdate();
1645         
1646         // TODO: continue to generalize this, so that any major 
1647         // component can get a kick when it may need to refresh its data
1648 
1649         setThresholds();
1650     }
1651 
1652     /***
1653      * @return The settings handler.
1654      */
1655     public SettingsHandler getSettingsHandler() {
1656         return settingsHandler;
1657     }
1658 
1659     /***
1660      * This method iterates through processor chains to run processors' initial
1661      * tasks.
1662      *
1663      */
1664     private void runProcessorInitialTasks(){
1665         for (Iterator ic = processorChains.iterator(); ic.hasNext(); ) {
1666             for (Iterator ip = ((ProcessorChain) ic.next()).iterator();
1667                     ip.hasNext(); ) {
1668                 ((Processor) ip.next()).initialTasks();
1669             }
1670         }
1671     }
1672 
1673     /***
1674      * This method iterates through processor chains to run processors' final
1675      * tasks.
1676      *
1677      */
1678     private void runProcessorFinalTasks(){
1679         for (Iterator ic = processorChains.iterator(); ic.hasNext(); ) {
1680             for (Iterator ip = ((ProcessorChain) ic.next()).iterator();
1681                     ip.hasNext(); ) {
1682                 ((Processor) ip.next()).finalTasks();
1683             }
1684         }
1685     }
1686 
1687     /***
1688      * Kills a thread. For details see
1689      * {@link org.archive.crawler.framework.ToePool#killThread(int, boolean)
1690      * ToePool.killThread(int, boolean)}.
1691      * @param threadNumber Thread to kill.
1692      * @param replace Should thread be replaced.
1693      * @see org.archive.crawler.framework.ToePool#killThread(int, boolean)
1694      */
1695     public void killThread(int threadNumber, boolean replace){
1696         toePool.killThread(threadNumber, replace);
1697     }
1698 
1699     /***
1700      * Add a file to the manifest of files used/generated by the current
1701      * crawl.
1702      * 
1703      * TODO: Its possible for a file to be added twice if reports are
1704      * force generated midcrawl.  Fix.
1705      *
1706      * @param file The filename (with absolute path) of the file to add
1707      * @param type The type of the file
1708      * @param bundle Should the file be included in a typical bundling of
1709      *           crawler files.
1710      *
1711      * @see #MANIFEST_CONFIG_FILE
1712      * @see #MANIFEST_LOG_FILE
1713      * @see #MANIFEST_REPORT_FILE
1714      */
1715     public void addToManifest(String file, char type, boolean bundle) {
1716         manifest.append(type + (bundle? "+": "-") + " " + file + "\n");
1717     }
1718 
1719     /***
1720      * Evaluate if the crawl should stop because it is finished.
1721      */
1722     public void checkFinish() {
1723         if(atFinish()) {
1724             beginCrawlStop();
1725         }
1726     }
1727 
1728     /***
1729      * Evaluate if the crawl should stop because it is finished,
1730      * without actually stopping the crawl.
1731      * 
1732      * @return true if crawl is at a finish-possible state
1733      */
1734     public boolean atFinish() {
1735         return state == RUNNING && !shouldContinueCrawling();
1736     }
1737     
1738     private void readObject(ObjectInputStream stream)
1739     throws IOException, ClassNotFoundException {
1740         stream.defaultReadObject();
1741         // Setup status listeners
1742         this.registeredCrawlStatusListeners =
1743             Collections.synchronizedList(new ArrayList<CrawlStatusListener>());
1744         // Ensure no holdover singleThreadMode
1745         singleThreadMode = false; 
1746     }
1747 
1748     /***
1749      * Go to single thread mode, where only one ToeThread may
1750      * proceed at a time. Also acquires the single lock, so 
1751      * no further threads will proceed past an 
1752      * acquireContinuePermission. Caller mush be sure to release
1753      * lock to allow other threads to proceed one at a time. 
1754      */
1755     public void singleThreadMode() {
1756         this.singleThreadLock.lock();
1757         singleThreadMode = true; 
1758     }
1759 
1760     /***
1761      * Go to back to regular multi thread mode, where all
1762      * ToeThreads may proceed at once
1763      */
1764     public void multiThreadMode() {
1765         this.singleThreadLock.lock();
1766         singleThreadMode = false; 
1767         while(this.singleThreadLock.isHeldByCurrentThread()) {
1768             this.singleThreadLock.unlock();
1769         }
1770     }
1771     
1772     /***
1773      * Proceed only if allowed, giving CrawlController a chance
1774      * to enforce single-thread mode.
1775      */
1776     public void acquireContinuePermission() {
1777         if (singleThreadMode) {
1778             this.singleThreadLock.lock();
1779             if(!singleThreadMode) {
1780                 // If changed while waiting, ignore
1781                 while(this.singleThreadLock.isHeldByCurrentThread()) {
1782                     this.singleThreadLock.unlock();
1783                 }
1784             }
1785         } // else, permission is automatic
1786     }
1787 
1788     /***
1789      * Relinquish continue permission at end of processing (allowing
1790      * another thread to proceed if in single-thread mode). 
1791      */
1792     public void releaseContinuePermission() {
1793         if (singleThreadMode) {
1794             while(this.singleThreadLock.isHeldByCurrentThread()) {
1795                 this.singleThreadLock.unlock();
1796             }
1797         } // else do nothing; 
1798     }
1799     
1800     public void freeReserveMemory() {
1801         if(!reserveMemory.isEmpty()) {
1802             reserveMemory.removeLast();
1803             System.gc();
1804         }
1805     }
1806 
1807     /***
1808      * Note that a ToeThread reached paused condition, possibly
1809      * completing the crawl-pause. 
1810      */
1811     public synchronized void toePaused() {
1812         releaseContinuePermission();
1813         if (state ==  PAUSING && toePool.getActiveToeCount() == 0) {
1814             completePause();
1815         }
1816     }
1817     
1818     /***
1819      * Note that a ToeThread ended, possibly completing the crawl-stop. 
1820      */
1821     public synchronized void toeEnded() {
1822         if (state == STOPPING && loopingToes.get() == 0) {
1823             completeStop();
1824         }
1825     }
1826 
1827     /***
1828      * Add order file contents to manifest.
1829      * Write configuration files and any files managed by CrawlController to
1830      * it - files managed by other classes, excluding the settings framework,
1831      * are responsible for adding their files to the manifest themselves.
1832      * by calling addToManifest.
1833      * Call before writing out reports.
1834      */
1835     public void addOrderToManifest() {
1836         for (Iterator it = getSettingsHandler().getListOfAllFiles().iterator();
1837                 it.hasNext();) {
1838             addToManifest((String)it.next(),
1839                 CrawlController.MANIFEST_CONFIG_FILE, true);
1840         }
1841     }
1842     
1843     /***
1844      * Log a URIException from deep inside other components to the crawl's
1845      * shared log. 
1846      * 
1847      * @param e URIException encountered
1848      * @param u CrawlURI where problem occurred
1849      * @param l String which could not be interpreted as URI without exception
1850      */
1851     public void logUriError(URIException e, UURI u, CharSequence l) {
1852         if (e.getReasonCode() == UURIFactory.IGNORED_SCHEME) {
1853             // don't log those that are intentionally ignored
1854             return; 
1855         }
1856         Object[] array = {u, l};
1857         uriErrors.log(Level.INFO, e.getMessage(), array);
1858     }
1859     
1860     // 
1861     // Reporter
1862     //
1863     public final static String PROCESSORS_REPORT = "processors";
1864     public final static String MANIFEST_REPORT = "manifest";
1865     protected final static String[] REPORTS = {PROCESSORS_REPORT, MANIFEST_REPORT};
1866     
1867     /* (non-Javadoc)
1868      * @see org.archive.util.Reporter#getReports()
1869      */
1870     public String[] getReports() {
1871         return REPORTS;
1872     }
1873 
1874     /* (non-Javadoc)
1875      * @see org.archive.util.Reporter#reportTo(java.io.Writer)
1876      */
1877     public void reportTo(PrintWriter writer) {
1878         reportTo(null,writer);
1879     }
1880 
1881     public String singleLineReport() {
1882         return ArchiveUtils.singleLineReport(this);
1883     }
1884 
1885     public void reportTo(String name, PrintWriter writer) {
1886         if(PROCESSORS_REPORT.equals(name)) {
1887             reportProcessorsTo(writer);
1888             return;
1889         } else if (MANIFEST_REPORT.equals(name)) {
1890             reportManifestTo(writer);
1891             return;
1892         } else if (name!=null) {
1893             writer.println("requested report unknown: "+name);
1894         }
1895         singleLineReportTo(writer);
1896     }
1897 
1898     /***
1899      * @param writer Where to write report to.
1900      */
1901     protected void reportManifestTo(PrintWriter writer) {
1902         writer.print(manifest.toString());
1903     }
1904 
1905     /***
1906      * Compiles and returns a human readable report on the active processors.
1907      * @param writer Where to write to.
1908      * @see org.archive.crawler.framework.Processor#report()
1909      */
1910     protected void reportProcessorsTo(PrintWriter writer) {
1911         writer.print(
1912             "Processors report - "
1913                 + ArchiveUtils.get12DigitDate()
1914                 + "\n");
1915         writer.print("  Job being crawled:    " + getOrder().getCrawlOrderName()
1916                 + "\n");
1917 
1918         writer.print("  Number of Processors: " +
1919             processorChains.processorCount() + "\n");
1920         writer.print("  NOTE: Some processors may not return a report!\n\n");
1921 
1922         for (Iterator ic = processorChains.iterator(); ic.hasNext(); ) {
1923             for (Iterator ip = ((ProcessorChain) ic.next()).iterator();
1924                     ip.hasNext(); ) {
1925                 writer.print(((Processor) ip.next()).report());
1926             }
1927         }
1928     }
1929 
1930     public void singleLineReportTo(PrintWriter writer) {
1931         // TODO: imrpvoe to be summary of crawl state
1932         writer.write("[Crawl Controller]\n");
1933     }
1934 
1935     public String singleLineLegend() {
1936         // TODO improve
1937         return "nothingYet";
1938     }
1939     
1940     
1941     /*** controls which alternate ObjectIdentityCache implementation to use */
1942     private static boolean USE_OIBC = true;
1943 
1944     /***
1945      * Call this method to get instance of the crawler BigMap implementation.
1946      * A "BigMap" is a Map that knows how to manage ever-growing sets of
1947      * key/value pairs. If we're in a checkpoint recovery, this method will
1948      * manage reinstantiation of checkpointed bigmaps.
1949      * @param dbName Name to give any associated database.  Also used
1950      * as part of name serializing out bigmap.  Needs to be unique to a crawl.
1951      * @param keyClass Class of keys we'll be using.
1952      * @param valueClass Class of values we'll be using.
1953      * @return Map that knows how to carry large sets of key/value pairs or
1954      * if none available, returns instance of HashMap.
1955      * @throws Exception
1956      */
1957     public <V> ObjectIdentityCache<String,V> getBigMap(final String dbName, 
1958             final Class<? super V> valueClass)
1959     throws Exception {
1960         if(USE_OIBC) {
1961             return getOIBC(dbName, valueClass);
1962         } else {
1963             return getCBM(dbName, valueClass);
1964         }
1965     }
1966     
1967     /***
1968      * Implement 'big map' with ObjectIdentityBdbCache.
1969      * 
1970      * @param dbName Name to give any associated database.  Also used
1971      * as part of name serializing out bigmap.  Needs to be unique to a crawl.
1972      * @param keyClass Class of keys we'll be using.
1973      * @param valueClass Class of values we'll be using.
1974      * @return Map that knows how to carry large sets of key/value pairs or
1975      * if none available, returns instance of HashMap.
1976      * @throws Exception
1977      */
1978     protected <K,V> ObjectIdentityBdbCache<V> getOIBC(final String dbName,
1979             final Class<? super V> valueClass)
1980     throws Exception {
1981         ObjectIdentityBdbCache<V> result = new ObjectIdentityBdbCache<V>();
1982         if (isCheckpointRecover()) {
1983             File baseDir = getCheckpointRecover().getDirectory();
1984             @SuppressWarnings("unchecked")
1985             ObjectIdentityBdbCache<V> temp = CheckpointUtils.
1986                 readObjectFromFile(result.getClass(), dbName, baseDir);
1987             result = temp;
1988         }
1989         result.initialize(getBdbEnvironment(), dbName, valueClass,
1990                 getBdbEnvironment().getClassCatalog());
1991         // Save reference to all big maps made so can manage their
1992         // checkpointing.
1993         this.bigmaps.put(dbName, result);
1994         return result;
1995     }
1996     
1997     /***
1998      * Implement 'big map' with CachedBdbMap.
1999      * 
2000      * @param dbName Name to give any associated database.  Also used
2001      * as part of name serializing out bigmap.  Needs to be unique to a crawl.
2002      * @param keyClass Class of keys we'll be using.
2003      * @param valueClass Class of values we'll be using.
2004      * @return Map that knows how to carry large sets of key/value pairs or
2005      * if none available, returns instance of HashMap.
2006      * @throws Exception
2007      * @deprecated
2008      */
2009     protected <V> CachedBdbMap<String,V> getCBM(final String dbName,
2010             final Class<? super V> valueClass)
2011     throws Exception {
2012         CachedBdbMap<String,V> result = new CachedBdbMap<String,V>(dbName);
2013         if (isCheckpointRecover()) {
2014             File baseDir = getCheckpointRecover().getDirectory();
2015             @SuppressWarnings("unchecked")
2016             CachedBdbMap<String,V> temp = CheckpointUtils.
2017                 readObjectFromFile(result.getClass(), dbName, baseDir);
2018             result = temp;
2019         }
2020         result.initialize(getBdbEnvironment(), valueClass,
2021                 getBdbEnvironment().getClassCatalog());
2022         // Save reference to all big maps made so can manage their
2023         // checkpointing.
2024         this.bigmaps.put(dbName, result);
2025         return result;
2026     }
2027     
2028     protected void checkpointBigMaps(final File cpDir)
2029     throws Exception {
2030         for (final Iterator i = this.bigmaps.keySet().iterator(); i.hasNext();) {
2031             Object key = i.next();
2032             ObjectIdentityCache obj = this.bigmaps.get(key);
2033             // TODO: I tried adding sync to custom serialization of BigMap
2034             // implementation but data member counts of the BigMap
2035             // implementation were not being persisted properly.  Look at
2036             // why.  For now, do sync in advance of serialization for now.
2037             obj.sync();
2038             CheckpointUtils.writeObjectToFile(obj, (String)key, cpDir);
2039         }
2040     }
2041 
2042     /***
2043      * Called whenever progress statistics logging event.
2044      * @param e Progress statistics event.
2045      */
2046     public void progressStatisticsEvent(final EventObject e) {
2047         // Default is to do nothing.  Subclass if you want to catch this event.
2048         // Later, if demand, add publisher/listener support.  Currently hacked
2049         // in so the subclass in CrawlJob added to support JMX can send
2050         // notifications of progressStatistics change.
2051     }
2052     
2053     /***
2054      * Log to the progress statistics log.
2055      * @param msg Message to write the progress statistics log.
2056      */
2057     public void logProgressStatistics(final String msg) {
2058         this.progressStats.info(msg);
2059     }
2060 
2061     /***
2062      * @return CrawlController state.
2063      */
2064     public Object getState() {
2065         return this.state;
2066     }
2067 
2068     public File getCheckpointsDisk() {
2069         return this.checkpointsDisk;
2070     }
2071     
2072     public AtomicInteger getLoopingToes() {
2073         return loopingToes;
2074     }
2075 }