1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.archive.crawler.framework;
25
26 import java.io.File;
27 import java.io.FileOutputStream;
28 import java.io.FilenameFilter;
29 import java.io.IOException;
30 import java.io.ObjectInputStream;
31 import java.io.PrintWriter;
32 import java.io.Serializable;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.EventObject;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.Hashtable;
40 import java.util.Iterator;
41 import java.util.LinkedList;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Set;
45 import java.util.TreeSet;
46 import java.util.concurrent.atomic.AtomicInteger;
47 import java.util.concurrent.locks.ReentrantLock;
48 import java.util.logging.FileHandler;
49 import java.util.logging.Formatter;
50 import java.util.logging.Level;
51 import java.util.logging.Logger;
52
53 import javax.management.AttributeNotFoundException;
54 import javax.management.InvalidAttributeValueException;
55 import javax.management.MBeanException;
56 import javax.management.ReflectionException;
57
58 import org.apache.commons.httpclient.URIException;
59 import org.archive.crawler.admin.CrawlJob;
60 import org.archive.crawler.admin.StatisticsTracker;
61 import org.archive.crawler.datamodel.Checkpoint;
62 import org.archive.crawler.datamodel.CrawlOrder;
63 import org.archive.crawler.datamodel.CrawlURI;
64 import org.archive.crawler.datamodel.ServerCache;
65 import org.archive.crawler.event.CrawlStatusListener;
66 import org.archive.crawler.event.CrawlURIDispositionListener;
67 import org.archive.crawler.framework.exceptions.FatalConfigurationException;
68 import org.archive.crawler.framework.exceptions.InitializationException;
69 import org.archive.crawler.io.LocalErrorFormatter;
70 import org.archive.crawler.io.RuntimeErrorFormatter;
71 import org.archive.crawler.io.StatisticsLogFormatter;
72 import org.archive.crawler.io.UriErrorFormatter;
73 import org.archive.crawler.io.UriProcessingFormatter;
74 import org.archive.crawler.settings.MapType;
75 import org.archive.crawler.settings.SettingsHandler;
76 import org.archive.crawler.util.CheckpointUtils;
77 import org.archive.io.GenerationFileHandler;
78 import org.archive.net.UURI;
79 import org.archive.net.UURIFactory;
80 import org.archive.util.ArchiveUtils;
81 import org.archive.util.CachedBdbMap;
82 import org.archive.util.FileUtils;
83 import org.archive.util.ObjectIdentityBdbCache;
84 import org.archive.util.ObjectIdentityCache;
85 import org.archive.util.Reporter;
86 import org.archive.util.bdbje.EnhancedEnvironment;
87 import org.xbill.DNS.DClass;
88 import org.xbill.DNS.Lookup;
89
90 import com.sleepycat.bind.serial.StoredClassCatalog;
91 import com.sleepycat.je.CheckpointConfig;
92 import com.sleepycat.je.DatabaseException;
93 import com.sleepycat.je.DbInternal;
94 import com.sleepycat.je.EnvironmentConfig;
95 import com.sleepycat.je.dbi.EnvironmentImpl;
96 import com.sleepycat.je.utilint.DbLsn;
97
98 /***
99 * CrawlController collects all the classes which cooperate to
100 * perform a crawl and provides a high-level interface to the
101 * running crawl.
102 *
103 * As the "global context" for a crawl, subcomponents will
104 * often reach each other through the CrawlController.
105 *
106 * @author Gordon Mohr
107 */
108 public class CrawlController implements Serializable, Reporter {
109
110 private static final long serialVersionUID =
111 ArchiveUtils.classnameBasedUID(CrawlController.class,1);
112
113 /***
114 * Messages from the crawlcontroller.
115 *
116 * They appear on console.
117 */
118 private final static Logger LOGGER =
119 Logger.getLogger(CrawlController.class.getName());
120
121
122 /*** abbrieviation label for config files in manifest */
123 public static final char MANIFEST_CONFIG_FILE = 'C';
124 /*** abbrieviation label for report files in manifest */
125 public static final char MANIFEST_REPORT_FILE = 'R';
126 /*** abbrieviation label for log files in manifest */
127 public static final char MANIFEST_LOG_FILE = 'L';
128
129
130 public static final String LOGNAME_PROGRESS_STATISTICS =
131 "progress-statistics";
132 public static final String LOGNAME_URI_ERRORS = "uri-errors";
133 public static final String LOGNAME_RUNTIME_ERRORS = "runtime-errors";
134 public static final String LOGNAME_LOCAL_ERRORS = "local-errors";
135 public static final String LOGNAME_CRAWL = "crawl";
136
137
138 private transient CrawlOrder order;
139 private transient CrawlScope scope;
140 private transient ProcessorChainList processorChains;
141
142 private transient Frontier frontier;
143
144 private transient AtomicInteger loopingToes;
145 private transient ToePool toePool;
146
147 private transient ServerCache serverCache;
148
149
150 private transient SettingsHandler settingsHandler;
151
152
153
154 private volatile transient boolean singleThreadMode = false;
155 private transient ReentrantLock singleThreadLock = null;
156
157
158 private transient LinkedList<char[]> reserveMemory;
159 private static final int RESERVE_BLOCKS = 1;
160 private static final int RESERVE_BLOCK_SIZE = 6*2^20;
161
162
163
164 /***
165 * Crawl exit status.
166 */
167 private transient String sExit;
168
169 public static final Object NASCENT = "NASCENT".intern();
170 public static final Object RUNNING = "RUNNING".intern();
171 public static final Object PAUSED = "PAUSED".intern();
172 public static final Object PAUSING = "PAUSING".intern();
173 public static final Object CHECKPOINTING = "CHECKPOINTING".intern();
174 public static final Object STOPPING = "STOPPING".intern();
175 public static final Object FINISHED = "FINISHED".intern();
176 public static final Object STARTED = "STARTED".intern();
177 public static final Object PREPARING = "PREPARING".intern();
178
179 transient private Object state = NASCENT;
180
181
182 private transient File disk;
183 private transient File logsDisk;
184
185 /***
186 * For temp files representing state of crawler (eg queues)
187 */
188 private transient File stateDisk;
189
190 /***
191 * For discardable temp files (eg fetch buffers).
192 */
193 private transient File scratchDisk;
194
195 /***
196 * Directory that holds checkpoint.
197 */
198 private transient File checkpointsDisk;
199
200 /***
201 * Checkpointer.
202 * Knows if checkpoint in progress and what name of checkpoint is. Also runs
203 * checkpoints.
204 */
205 private Checkpointer checkpointer;
206
207 /***
208 * Gets set to checkpoint we're in recovering if in checkpoint recover
209 * mode. Gets setup by {@link #getCheckpointRecover()}.
210 */
211 private transient Checkpoint checkpointRecover = null;
212
213
214 private long maxBytes;
215 private long maxDocument;
216 private long maxTime;
217
218 /***
219 * A manifest of all files used/created during this crawl. Written to file
220 * at the end of the crawl (the absolutely last thing done).
221 */
222 private StringBuffer manifest;
223
224 /***
225 * Record of fileHandlers established for loggers,
226 * assisting file rotation.
227 */
228 transient private Map<Logger,FileHandler> fileHandlers;
229
230 /*** suffix to use on active logs */
231 public static final String CURRENT_LOG_SUFFIX = ".log";
232
233 /***
234 * Crawl progress logger.
235 *
236 * No exceptions. Logs summary result of each url processing.
237 */
238 public transient Logger uriProcessing;
239
240 /***
241 * This logger contains unexpected runtime errors.
242 *
243 * Would contain errors trying to set up a job or failures inside
244 * processors that they are not prepared to recover from.
245 */
246 public transient Logger runtimeErrors;
247
248 /***
249 * This logger is for job-scoped logging, specifically errors which
250 * happen and are handled within a particular processor.
251 *
252 * Examples would be socket timeouts, exceptions thrown by extractors, etc.
253 */
254 public transient Logger localErrors;
255
256 /***
257 * Special log for URI format problems, wherever they may occur.
258 */
259 public transient Logger uriErrors;
260
261 /***
262 * Statistics tracker writes here at regular intervals.
263 */
264 private transient Logger progressStats;
265
266 /***
267 * Logger to hold job summary report.
268 *
269 * Large state reports made at infrequent intervals (e.g. job ending) go
270 * here.
271 */
272 public transient Logger reports;
273
274 protected StatisticsTracking statistics = null;
275
276 /***
277 * List of crawl status listeners.
278 *
279 * All iterations need to synchronize on this object if they're to avoid
280 * concurrent modification exceptions.
281 * See {@link java.util.Collections#synchronizedList(List)}.
282 */
283 private transient List<CrawlStatusListener> registeredCrawlStatusListeners =
284 Collections.synchronizedList(new ArrayList<CrawlStatusListener>());
285
286
287
288 private transient CrawlURIDispositionListener
289 registeredCrawlURIDispositionListener;
290
291
292 protected transient ArrayList<CrawlURIDispositionListener>
293 registeredCrawlURIDispositionListeners;
294
295 /*** Shared bdb Environment for Frontier subcomponents */
296
297
298 private transient EnhancedEnvironment bdbEnvironment = null;
299
300 /***
301 * Keep a list of all BigMap instance made -- shouldn't be many -- so that
302 * we can checkpoint.
303 */
304 private transient Map<String,ObjectIdentityCache<?,?>> bigmaps = null;
305
306 /***
307 * Default constructor
308 */
309 public CrawlController() {
310 super();
311
312 }
313
314 /***
315 * Starting from nothing, set up CrawlController and associated
316 * classes to be ready for a first crawl.
317 *
318 * @param sH Settings handler.
319 * @throws InitializationException
320 */
321 public void initialize(SettingsHandler sH)
322 throws InitializationException {
323 sendCrawlStateChangeEvent(PREPARING, CrawlJob.STATUS_PREPARING);
324
325 this.singleThreadLock = new ReentrantLock();
326 this.settingsHandler = sH;
327 installThreadContextSettingsHandler();
328 this.order = settingsHandler.getOrder();
329 this.order.setController(this);
330 this.bigmaps = new Hashtable<String,ObjectIdentityCache<?,?>>();
331 sExit = "";
332 this.manifest = new StringBuffer();
333 String onFailMessage = "";
334 try {
335 onFailMessage = "You must set the User-Agent and From HTTP" +
336 " header values to acceptable strings. \n" +
337 " User-Agent: [software-name](+[info-url])[misc]\n" +
338 " From: [email-address]\n";
339 order.checkUserAgentAndFrom();
340
341 onFailMessage = "Unable to setup disk";
342 if (disk == null) {
343 setupDisk();
344 }
345
346 onFailMessage = "Unable to create log file(s)";
347 setupLogs();
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365 onFailMessage = "Unable to test/run checkpoint recover";
366 this.checkpointRecover = getCheckpointRecover();
367 if (this.checkpointRecover == null) {
368 this.checkpointer =
369 new Checkpointer(this, this.checkpointsDisk);
370 } else {
371 setupCheckpointRecover();
372 }
373
374 onFailMessage = "Unable to setup bdb environment.";
375 setupBdb();
376
377 onFailMessage = "Unable to setup statistics";
378 setupStatTracking();
379
380 onFailMessage = "Unable to setup crawl modules";
381 setupCrawlModules();
382 } catch (Exception e) {
383 String tmp = "On crawl: "
384 + settingsHandler.getSettingsObject(null).getName() + " " +
385 onFailMessage;
386 LOGGER.log(Level.SEVERE, tmp, e);
387 throw new InitializationException(tmp, e);
388 }
389
390
391
392 Lookup.getDefaultCache(DClass.IN).setMaxEntries(1);
393
394
395 loopingToes = new AtomicInteger(0);
396 setupToePool();
397 setThresholds();
398
399 reserveMemory = new LinkedList<char[]>();
400 for(int i = 1; i < RESERVE_BLOCKS; i++) {
401 reserveMemory.add(new char[RESERVE_BLOCK_SIZE]);
402 }
403 }
404
405 /***
406 * Utility method to install this crawl's SettingsHandler into the
407 * 'global' (for this thread) holder, so that any subsequent
408 * deserialization operations in this thread can find it.
409 *
410 * @param sH
411 */
412 public void installThreadContextSettingsHandler() {
413 SettingsHandler.setThreadContextSettingsHandler(settingsHandler);
414 }
415
416 /***
417 * Does setup of checkpoint recover.
418 * Copies bdb log files into state dir.
419 * @throws IOException
420 */
421 protected void setupCheckpointRecover()
422 throws IOException {
423 long started = System.currentTimeMillis();;
424 if (LOGGER.isLoggable(Level.FINE)) {
425 LOGGER.fine("Starting recovery setup -- copying into place " +
426 "bdbje log files -- for checkpoint named " +
427 this.checkpointRecover.getDisplayName());
428 }
429
430 this.checkpointer.recover(this);
431 this.progressStats.info("CHECKPOINT RECOVER " +
432 this.checkpointRecover.getDisplayName());
433
434
435
436
437
438
439 File bdbSubDir = CheckpointUtils.
440 getBdbSubDirectory(this.checkpointRecover.getDirectory());
441 List<IOException> errs = new ArrayList<IOException>();
442 FileUtils.copyFiles(bdbSubDir, CheckpointUtils.getJeLogsFilter(),
443 getStateDisk(), true, false, errs);
444 for (IOException ioe : errs) {
445 LOGGER.log(Level.SEVERE, "Problem copying checkpoint files: "
446 +"checkpoint may be corrupt",ioe);
447 }
448 if (LOGGER.isLoggable(Level.INFO)) {
449 LOGGER.info("Finished recovery setup for checkpoint named " +
450 this.checkpointRecover.getDisplayName() + " in " +
451 (System.currentTimeMillis() - started) + "ms.");
452 }
453 }
454
455 protected boolean getCheckpointCopyBdbjeLogs() {
456 return ((Boolean)this.order.getUncheckedAttribute(null,
457 CrawlOrder.ATTR_CHECKPOINT_COPY_BDBJE_LOGS)).booleanValue();
458 }
459
460 private void setupBdb()
461 throws FatalConfigurationException, AttributeNotFoundException {
462 EnvironmentConfig envConfig = new EnvironmentConfig();
463 envConfig.setAllowCreate(true);
464 int bdbCachePercent = ((Integer)this.order.
465 getAttribute(null, CrawlOrder.ATTR_BDB_CACHE_PERCENT)).intValue();
466 if(bdbCachePercent > 0) {
467
468
469 envConfig.setCachePercent(bdbCachePercent);
470 }
471 envConfig.setSharedCache(true);
472 envConfig.setLockTimeout(5000000);
473 if (LOGGER.isLoggable(Level.FINEST)) {
474 envConfig.setConfigParam("java.util.logging.level", "SEVERE");
475 envConfig.setConfigParam("java.util.logging.level.evictor",
476 "SEVERE");
477 envConfig.setConfigParam("java.util.logging.ConsoleHandler.on",
478 "true");
479 }
480
481 if (!getCheckpointCopyBdbjeLogs()) {
482
483
484
485
486 envConfig.setConfigParam("je.cleaner.expunge", "false");
487 }
488
489 try {
490 this.bdbEnvironment = new EnhancedEnvironment(getStateDisk(), envConfig);
491 if (LOGGER.isLoggable(Level.FINE)) {
492
493 envConfig = bdbEnvironment.getConfig();
494 LOGGER.fine("BdbConfiguration: Cache percentage " +
495 envConfig.getCachePercent() +
496 ", cache size " + envConfig.getCacheSize());
497 }
498 } catch (DatabaseException e) {
499 e.printStackTrace();
500 throw new FatalConfigurationException(e.getMessage());
501 }
502 }
503
504 /***
505 * @return the shared EnhancedEnvironment
506 */
507 public EnhancedEnvironment getBdbEnvironment() {
508 return this.bdbEnvironment;
509 }
510
511 /***
512 * @deprecated use EnhancedEnvironment's getClassCatalog() instead
513 */
514 public StoredClassCatalog getClassCatalog() {
515 return this.bdbEnvironment.getClassCatalog();
516 }
517
518 /***
519 * Register for CrawlStatus events.
520 *
521 * @param cl a class implementing the CrawlStatusListener interface
522 *
523 * @see CrawlStatusListener
524 */
525 public void addCrawlStatusListener(CrawlStatusListener cl) {
526 synchronized (this.registeredCrawlStatusListeners) {
527 this.registeredCrawlStatusListeners.add(cl);
528 }
529 }
530
531 /***
532 * Register for CrawlURIDisposition events.
533 *
534 * @param cl a class implementing the CrawlURIDispostionListener interface
535 *
536 * @see CrawlURIDispositionListener
537 */
538 public void addCrawlURIDispositionListener(CrawlURIDispositionListener cl) {
539 registeredCrawlURIDispositionListener = null;
540 if (registeredCrawlURIDispositionListeners == null) {
541
542 registeredCrawlURIDispositionListener = cl;
543
544 registeredCrawlURIDispositionListeners
545 = new ArrayList<CrawlURIDispositionListener>(1);
546
547 }
548 registeredCrawlURIDispositionListeners.add(cl);
549 }
550
551 /***
552 * Allows an external class to raise a CrawlURIDispostion
553 * crawledURISuccessful event that will be broadcast to all listeners that
554 * have registered with the CrawlController.
555 *
556 * @param curi - The CrawlURI that will be sent with the event notification.
557 *
558 * @see CrawlURIDispositionListener#crawledURISuccessful(CrawlURI)
559 */
560 public void fireCrawledURISuccessfulEvent(CrawlURI curi) {
561 if (registeredCrawlURIDispositionListener != null) {
562
563 registeredCrawlURIDispositionListener.crawledURISuccessful(curi);
564 } else {
565
566 if (registeredCrawlURIDispositionListeners != null
567 && registeredCrawlURIDispositionListeners.size() > 0) {
568 Iterator it = registeredCrawlURIDispositionListeners.iterator();
569 while (it.hasNext()) {
570 (
571 (CrawlURIDispositionListener) it
572 .next())
573 .crawledURISuccessful(
574 curi);
575 }
576 }
577 }
578 }
579
580 /***
581 * Allows an external class to raise a CrawlURIDispostion
582 * crawledURINeedRetry event that will be broadcast to all listeners that
583 * have registered with the CrawlController.
584 *
585 * @param curi - The CrawlURI that will be sent with the event notification.
586 *
587 * @see CrawlURIDispositionListener#crawledURINeedRetry(CrawlURI)
588 */
589 public void fireCrawledURINeedRetryEvent(CrawlURI curi) {
590 if (registeredCrawlURIDispositionListener != null) {
591
592 registeredCrawlURIDispositionListener.crawledURINeedRetry(curi);
593 return;
594 }
595
596
597 if (registeredCrawlURIDispositionListeners != null
598 && registeredCrawlURIDispositionListeners.size() > 0) {
599 for (Iterator i = registeredCrawlURIDispositionListeners.iterator();
600 i.hasNext();) {
601 ((CrawlURIDispositionListener)i.next()).crawledURINeedRetry(curi);
602 }
603 }
604 }
605
606 /***
607 * Allows an external class to raise a CrawlURIDispostion
608 * crawledURIDisregard event that will be broadcast to all listeners that
609 * have registered with the CrawlController.
610 *
611 * @param curi -
612 * The CrawlURI that will be sent with the event notification.
613 *
614 * @see CrawlURIDispositionListener#crawledURIDisregard(CrawlURI)
615 */
616 public void fireCrawledURIDisregardEvent(CrawlURI curi) {
617 if (registeredCrawlURIDispositionListener != null) {
618
619 registeredCrawlURIDispositionListener.crawledURIDisregard(curi);
620 } else {
621
622 if (registeredCrawlURIDispositionListeners != null
623 && registeredCrawlURIDispositionListeners.size() > 0) {
624 Iterator it = registeredCrawlURIDispositionListeners.iterator();
625 while (it.hasNext()) {
626 (
627 (CrawlURIDispositionListener) it
628 .next())
629 .crawledURIDisregard(
630 curi);
631 }
632 }
633 }
634 }
635
636 /***
637 * Allows an external class to raise a CrawlURIDispostion crawledURIFailure event
638 * that will be broadcast to all listeners that have registered with the CrawlController.
639 *
640 * @param curi - The CrawlURI that will be sent with the event notification.
641 *
642 * @see CrawlURIDispositionListener#crawledURIFailure(CrawlURI)
643 */
644 public void fireCrawledURIFailureEvent(CrawlURI curi) {
645 if (registeredCrawlURIDispositionListener != null) {
646
647 registeredCrawlURIDispositionListener.crawledURIFailure(curi);
648 } else {
649
650 if (registeredCrawlURIDispositionListeners != null
651 && registeredCrawlURIDispositionListeners.size() > 0) {
652 Iterator it = registeredCrawlURIDispositionListeners.iterator();
653 while (it.hasNext()) {
654 ((CrawlURIDispositionListener)it.next())
655 .crawledURIFailure(curi);
656 }
657 }
658 }
659 }
660
661 private void setupCrawlModules() throws FatalConfigurationException,
662 AttributeNotFoundException, MBeanException, ReflectionException {
663 if (scope == null) {
664 scope = (CrawlScope) order.getAttribute(CrawlScope.ATTR_NAME);
665 scope.initialize(this);
666 }
667 try {
668 this.serverCache = new ServerCache(this);
669 } catch (Exception e) {
670 throw new FatalConfigurationException("Unable to" +
671 " initialize frontier (Failed setup of ServerCache) " + e);
672 }
673
674 if (this.frontier == null) {
675 this.frontier = (Frontier)order.getAttribute(Frontier.ATTR_NAME);
676 try {
677 frontier.initialize(this);
678 frontier.pause();
679
680
681
682 if (!isCheckpointRecover()) {
683 runFrontierRecover((String)order.
684 getAttribute(CrawlOrder.ATTR_RECOVER_PATH));
685 }
686 } catch (IOException e) {
687 throw new FatalConfigurationException(
688 "unable to initialize frontier: " + e);
689 }
690 }
691
692
693 if (processorChains == null) {
694 processorChains = new ProcessorChainList(order);
695 }
696 }
697
698 protected void runFrontierRecover(String recoverPath)
699 throws AttributeNotFoundException, MBeanException,
700 ReflectionException, FatalConfigurationException {
701 if (recoverPath == null || recoverPath.length() <= 0) {
702 return;
703 }
704 File f = new File(recoverPath);
705 if (!f.exists()) {
706 LOGGER.severe("Recover file does not exist " + f.getAbsolutePath());
707 return;
708 }
709 if (!f.isFile()) {
710
711 return;
712 }
713 boolean retainFailures = ((Boolean)order.
714 getAttribute(CrawlOrder.ATTR_RECOVER_RETAIN_FAILURES)).booleanValue();
715 try {
716 frontier.importRecoverLog(f.getAbsolutePath(), retainFailures);
717 } catch (IOException e) {
718 e.printStackTrace();
719 throw (FatalConfigurationException) new FatalConfigurationException(
720 "Recover.log " + recoverPath + " problem: " + e).initCause(e);
721 }
722 }
723
724 private void setupDisk() throws AttributeNotFoundException {
725 String diskPath
726 = (String) order.getAttribute(null, CrawlOrder.ATTR_DISK_PATH);
727 this.disk = getSettingsHandler().
728 getPathRelativeToWorkingDirectory(diskPath);
729 this.disk.mkdirs();
730 this.logsDisk = getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
731 this.checkpointsDisk = getSettingsDir(CrawlOrder.ATTR_CHECKPOINTS_PATH);
732 this.stateDisk = getSettingsDir(CrawlOrder.ATTR_STATE_PATH);
733 this.scratchDisk = getSettingsDir(CrawlOrder.ATTR_SCRATCH_PATH);
734 }
735
736 /***
737 * @return The logging directory or null if problem reading the settings.
738 */
739 public File getLogsDir() {
740 File f = null;
741 try {
742 f = getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
743 } catch (AttributeNotFoundException e) {
744 LOGGER.severe("Failed get of logs directory: " + e.getMessage());
745 }
746 return f;
747 }
748
749 /***
750 * Return fullpath to the directory named by <code>key</code>
751 * in settings.
752 * If directory does not exist, it and all intermediary dirs
753 * will be created.
754 * @param key Key to use going to settings.
755 * @return Full path to directory named by <code>key</code>.
756 * @throws AttributeNotFoundException
757 */
758 public File getSettingsDir(String key)
759 throws AttributeNotFoundException {
760 String path = (String)order.getAttribute(null, key);
761 File f = new File(path);
762 if (!f.isAbsolute()) {
763 f = new File(disk.getPath(), path);
764 }
765 if (!f.exists()) {
766 f.mkdirs();
767 }
768 return f;
769 }
770
771 /***
772 * Setup the statistics tracker.
773 * The statistics object must be created before modules can use it.
774 * Do it here now so that when modules retrieve the object from the
775 * controller during initialization (which some do), its in place.
776 * @throws InvalidAttributeValueException
777 * @throws FatalConfigurationException
778 */
779 private void setupStatTracking()
780 throws InvalidAttributeValueException, FatalConfigurationException {
781 MapType loggers = order.getLoggers();
782 final String cstName = "crawl-statistics";
783 if (loggers.isEmpty(null)) {
784 if (!isCheckpointRecover() && this.statistics == null) {
785 this.statistics = new StatisticsTracker(cstName);
786 }
787 loggers.addElement(null, (StatisticsTracker)this.statistics);
788 }
789
790 if (isCheckpointRecover()) {
791 restoreStatisticsTracker(loggers, cstName);
792 }
793
794 for (Iterator it = loggers.iterator(null); it.hasNext();) {
795 StatisticsTracking tracker = (StatisticsTracking)it.next();
796 tracker.initialize(this);
797 if (this.statistics == null) {
798 this.statistics = tracker;
799 }
800 }
801 }
802
803 protected void restoreStatisticsTracker(MapType loggers,
804 String replaceName)
805 throws FatalConfigurationException {
806 try {
807
808 loggers.removeElement(loggers.globalSettings(), replaceName);
809 loggers.addElement(loggers.globalSettings(),
810 (StatisticsTracker)this.statistics);
811 } catch (Exception e) {
812 throw convertToFatalConfigurationException(e);
813 }
814 }
815
816 protected FatalConfigurationException
817 convertToFatalConfigurationException(Exception e) {
818 FatalConfigurationException fce =
819 new FatalConfigurationException("Converted exception: " +
820 e.getMessage());
821 fce.setStackTrace(e.getStackTrace());
822 return fce;
823 }
824
825 private void setupLogs() throws IOException {
826 String logsPath = logsDisk.getAbsolutePath() + File.separatorChar;
827 uriProcessing = Logger.getLogger(LOGNAME_CRAWL + "." + logsPath);
828 runtimeErrors = Logger.getLogger(LOGNAME_RUNTIME_ERRORS + "." +
829 logsPath);
830 localErrors = Logger.getLogger(LOGNAME_LOCAL_ERRORS + "." + logsPath);
831 uriErrors = Logger.getLogger(LOGNAME_URI_ERRORS + "." + logsPath);
832 progressStats = Logger.getLogger(LOGNAME_PROGRESS_STATISTICS + "." +
833 logsPath);
834
835 this.fileHandlers = new HashMap<Logger,FileHandler>();
836
837 setupLogFile(uriProcessing,
838 logsPath + LOGNAME_CRAWL + CURRENT_LOG_SUFFIX,
839 new UriProcessingFormatter(), true);
840
841 setupLogFile(runtimeErrors,
842 logsPath + LOGNAME_RUNTIME_ERRORS + CURRENT_LOG_SUFFIX,
843 new RuntimeErrorFormatter(), true);
844
845 setupLogFile(localErrors,
846 logsPath + LOGNAME_LOCAL_ERRORS + CURRENT_LOG_SUFFIX,
847 new LocalErrorFormatter(), true);
848
849 setupLogFile(uriErrors,
850 logsPath + LOGNAME_URI_ERRORS + CURRENT_LOG_SUFFIX,
851 new UriErrorFormatter(), true);
852
853 setupLogFile(progressStats,
854 logsPath + LOGNAME_PROGRESS_STATISTICS + CURRENT_LOG_SUFFIX,
855 new StatisticsLogFormatter(), true);
856
857 }
858
859 private void setupLogFile(Logger logger, String filename, Formatter f,
860 boolean shouldManifest) throws IOException, SecurityException {
861 GenerationFileHandler fh = new GenerationFileHandler(filename, true,
862 shouldManifest);
863 fh.setFormatter(f);
864 logger.addHandler(fh);
865 addToManifest(filename, MANIFEST_LOG_FILE, shouldManifest);
866 logger.setUseParentHandlers(false);
867 this.fileHandlers.put(logger, fh);
868 }
869
870 protected void rotateLogFiles(String generationSuffix)
871 throws IOException {
872 if (this.state != PAUSED && this.state != CHECKPOINTING) {
873 throw new IllegalStateException("Pause crawl before requesting " +
874 "log rotation.");
875 }
876 for (Iterator i = fileHandlers.keySet().iterator(); i.hasNext();) {
877 Logger l = (Logger)i.next();
878 GenerationFileHandler gfh =
879 (GenerationFileHandler)fileHandlers.get(l);
880 GenerationFileHandler newGfh =
881 gfh.rotate(generationSuffix, CURRENT_LOG_SUFFIX);
882 if (gfh.shouldManifest()) {
883 addToManifest((String) newGfh.getFilenameSeries().get(1),
884 MANIFEST_LOG_FILE, newGfh.shouldManifest());
885 }
886 l.removeHandler(gfh);
887 l.addHandler(newGfh);
888 fileHandlers.put(l, newGfh);
889 }
890 }
891
892 /***
893 * Close all log files and remove handlers from loggers.
894 */
895 public void closeLogFiles() {
896 for (Iterator i = fileHandlers.keySet().iterator(); i.hasNext();) {
897 Logger l = (Logger)i.next();
898 GenerationFileHandler gfh =
899 (GenerationFileHandler)fileHandlers.get(l);
900 gfh.close();
901 l.removeHandler(gfh);
902 }
903 }
904
905 /***
906 * Sets the values for max bytes, docs and time based on crawl order.
907 */
908 private void setThresholds() {
909 try {
910 maxBytes =
911 ((Long) order.getAttribute(CrawlOrder.ATTR_MAX_BYTES_DOWNLOAD))
912 .longValue();
913 } catch (Exception e) {
914 maxBytes = 0;
915 }
916 try {
917 maxDocument =
918 ((Long) order
919 .getAttribute(CrawlOrder.ATTR_MAX_DOCUMENT_DOWNLOAD))
920 .longValue();
921 } catch (Exception e) {
922 maxDocument = 0;
923 }
924 try {
925 maxTime =
926 ((Long) order.getAttribute(CrawlOrder.ATTR_MAX_TIME_SEC))
927 .longValue();
928 } catch (Exception e) {
929 maxTime = 0;
930 }
931 }
932
933 /***
934 * @return Object this controller is using to track crawl statistics
935 */
936 public StatisticsTracking getStatistics() {
937 return statistics==null ?
938 new StatisticsTracker("crawl-statistics"): this.statistics;
939 }
940
941 /***
942 * Send crawl change event to all listeners.
943 * @param newState State change we're to tell listeners' about.
944 * @param message Message on state change.
945 * @see #sendCheckpointEvent(File) for special case event sending
946 * telling listeners to checkpoint.
947 */
948 protected void sendCrawlStateChangeEvent(Object newState, String message) {
949 synchronized (this.registeredCrawlStatusListeners) {
950 this.state = newState;
951 for (Iterator i = this.registeredCrawlStatusListeners.iterator();
952 i.hasNext();) {
953 CrawlStatusListener l = (CrawlStatusListener)i.next();
954 if (newState.equals(PAUSED)) {
955 l.crawlPaused(message);
956 } else if (newState.equals(RUNNING)) {
957 l.crawlResuming(message);
958 } else if (newState.equals(PAUSING)) {
959 l.crawlPausing(message);
960 } else if (newState.equals(STARTED)) {
961 l.crawlStarted(message);
962 } else if (newState.equals(STOPPING)) {
963 l.crawlEnding(message);
964 } else if (newState.equals(FINISHED)) {
965 l.crawlEnded(message);
966 } else if (newState.equals(PREPARING)) {
967 l.crawlResuming(message);
968 } else {
969 throw new RuntimeException("Unknown state: " + newState);
970 }
971 if (LOGGER.isLoggable(Level.FINE)) {
972 LOGGER.fine("Sent " + newState + " to " + l);
973 }
974 }
975 LOGGER.fine("Sent " + newState);
976 }
977 }
978
979 /***
980 * Send the checkpoint event.
981 * Has its own method apart from
982 * {@link #sendCrawlStateChangeEvent(Object, String)} because checkpointing
983 * throws an Exception (Didn't want to have to wrap all of the
984 * sendCrawlStateChangeEvent in try/catches).
985 * @param checkpointDir Where to write checkpoint state to.
986 * @throws Exception
987 */
988 protected void sendCheckpointEvent(File checkpointDir) throws Exception {
989 synchronized (this.registeredCrawlStatusListeners) {
990 if (this.state != PAUSED) {
991 throw new IllegalStateException("Crawler must be completly " +
992 "paused before checkpointing can start");
993 }
994 this.state = CHECKPOINTING;
995 for (Iterator i = this.registeredCrawlStatusListeners.iterator();
996 i.hasNext();) {
997 CrawlStatusListener l = (CrawlStatusListener)i.next();
998 l.crawlCheckpoint(checkpointDir);
999 if (LOGGER.isLoggable(Level.FINE)) {
1000 LOGGER.fine("Sent " + CHECKPOINTING + " to " + l);
1001 }
1002 }
1003 LOGGER.fine("Sent " + CHECKPOINTING);
1004 }
1005 }
1006
1007 /***
1008 * Operator requested crawl begin
1009 */
1010 public void requestCrawlStart() {
1011 runProcessorInitialTasks();
1012
1013 sendCrawlStateChangeEvent(STARTED, CrawlJob.STATUS_PENDING);
1014 String jobState;
1015 state = RUNNING;
1016 jobState = CrawlJob.STATUS_RUNNING;
1017 sendCrawlStateChangeEvent(this.state, jobState);
1018
1019
1020 this.sExit = CrawlJob.STATUS_FINISHED_ABNORMAL;
1021
1022 Thread statLogger = new Thread(statistics);
1023 statLogger.setName("StatLogger");
1024 statLogger.start();
1025
1026 frontier.start();
1027 }
1028
1029 /***
1030 * Called when the last toethread exits.
1031 */
1032 protected void completeStop() {
1033 LOGGER.fine("Entered complete stop.");
1034
1035 runProcessorFinalTasks();
1036
1037 frontier.finalTasks();
1038
1039 sendCrawlStateChangeEvent(FINISHED, this.sExit);
1040 synchronized (this.registeredCrawlStatusListeners) {
1041
1042 this.registeredCrawlStatusListeners.
1043 removeAll(this.registeredCrawlStatusListeners);
1044 this.registeredCrawlStatusListeners = null;
1045 }
1046
1047 closeLogFiles();
1048
1049
1050 this.fileHandlers = null;
1051 this.uriErrors = null;
1052 this.uriProcessing = null;
1053 this.localErrors = null;
1054 this.runtimeErrors = null;
1055 this.progressStats = null;
1056 this.reports = null;
1057 this.manifest = null;
1058
1059
1060 this.statistics = null;
1061 this.frontier = null;
1062 this.disk = null;
1063 this.scratchDisk = null;
1064 this.order = null;
1065 this.scope = null;
1066 this.reserveMemory = null;
1067 this.processorChains = null;
1068 if (this.serverCache != null) {
1069 this.serverCache.cleanup();
1070 this.serverCache = null;
1071 }
1072 if (this.checkpointer != null) {
1073 this.checkpointer.cleanup();
1074 this.checkpointer = null;
1075 }
1076 if (this.bdbEnvironment != null) {
1077 try {
1078 this.bdbEnvironment.sync();
1079 this.bdbEnvironment.close();
1080 } catch (DatabaseException e) {
1081 e.printStackTrace();
1082 }
1083 this.bdbEnvironment = null;
1084 }
1085 this.bigmaps = null;
1086 if (this.settingsHandler != null) {
1087 this.settingsHandler.cleanup();
1088 }
1089 this.settingsHandler = null;
1090 if (this.toePool != null) {
1091 this.toePool.cleanup();
1092
1093
1094
1095
1096
1097
1098 }
1099 this.toePool = null;
1100 LOGGER.fine("Finished crawl.");
1101 }
1102
1103 synchronized void completePause() {
1104
1105
1106 notifyAll();
1107 sendCrawlStateChangeEvent(PAUSED, CrawlJob.STATUS_PAUSED);
1108 }
1109
1110 private boolean shouldContinueCrawling() {
1111 if (frontier.isEmpty()) {
1112 this.sExit = CrawlJob.STATUS_FINISHED;
1113 return false;
1114 }
1115
1116 if (maxBytes > 0 && statistics.totalBytesCrawled() >= maxBytes) {
1117
1118 sExit = CrawlJob.STATUS_FINISHED_DATA_LIMIT;
1119 return false;
1120 } else if (maxDocument > 0
1121 && frontier.succeededFetchCount() >= maxDocument) {
1122
1123 this.sExit = CrawlJob.STATUS_FINISHED_DOCUMENT_LIMIT;
1124 return false;
1125 } else if (maxTime > 0 &&
1126 statistics.crawlDuration() >= maxTime * 1000) {
1127
1128 this.sExit = CrawlJob.STATUS_FINISHED_TIME_LIMIT;
1129 return false;
1130 }
1131 return state == RUNNING;
1132 }
1133
1134 /***
1135 * Request a checkpoint.
1136 * Sets a checkpointing thread running.
1137 * @throws IllegalStateException Thrown if crawl is not in paused state
1138 * (Crawl must be first paused before checkpointing).
1139 */
1140 public synchronized void requestCrawlCheckpoint()
1141 throws IllegalStateException {
1142 if (this.checkpointer == null) {
1143 return;
1144 }
1145 if (this.checkpointer.isCheckpointing()) {
1146 throw new IllegalStateException("Checkpoint already running.");
1147 }
1148 this.checkpointer.checkpoint();
1149 }
1150
1151 /***
1152 * @return True if checkpointing.
1153 */
1154 public boolean isCheckpointing() {
1155 return this.state == CHECKPOINTING;
1156 }
1157
1158 /***
1159 * Run checkpointing.
1160 * CrawlController takes care of managing the checkpointing/serializing
1161 * of bdb, the StatisticsTracker, and the CheckpointContext. Other
1162 * modules that want to revive themselves on checkpoint recovery need to
1163 * save state during their {@link CrawlStatusListener#crawlCheckpoint(File)}
1164 * invocation and then in their #initialize if a module,
1165 * or in their #initialTask if a processor, check with the CrawlController
1166 * if its checkpoint recovery. If it is, read in their old state from the
1167 * pointed to checkpoint directory.
1168 * <p>Default access only to be called by Checkpointer.
1169 * @throws Exception
1170 */
1171 void checkpoint()
1172 throws Exception {
1173
1174 sendCheckpointEvent(this.checkpointer.
1175 getCheckpointInProgressDirectory());
1176
1177
1178 LOGGER.fine("Rotating log files.");
1179 rotateLogFiles(CURRENT_LOG_SUFFIX + "." +
1180 this.checkpointer.getNextCheckpointName());
1181
1182
1183 LOGGER.fine("BigMaps.");
1184 checkpointBigMaps(this.checkpointer.getCheckpointInProgressDirectory());
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194 LOGGER.fine("Bdb environment.");
1195 checkpointBdb(this.checkpointer.getCheckpointInProgressDirectory());
1196
1197
1198 LOGGER.fine("Copying settings.");
1199 copySettings(this.checkpointer.getCheckpointInProgressDirectory());
1200
1201
1202 CheckpointUtils.writeObjectToFile(this,
1203 this.checkpointer.getCheckpointInProgressDirectory());
1204 }
1205
1206 /***
1207 * Copy off the settings.
1208 * @param checkpointDir Directory to write checkpoint to.
1209 * @throws IOException
1210 */
1211 protected void copySettings(final File checkpointDir) throws IOException {
1212 final List files = this.settingsHandler.getListOfAllFiles();
1213 boolean copiedSettingsDir = false;
1214 final File settingsDir = new File(this.disk, "settings");
1215 for (final Iterator i = files.iterator(); i.hasNext();) {
1216 File f = new File((String)i.next());
1217 if (f.getAbsolutePath().startsWith(settingsDir.getAbsolutePath())) {
1218 if (copiedSettingsDir) {
1219
1220
1221 continue;
1222 }
1223
1224 copiedSettingsDir = true;
1225 FileUtils.copyFiles(settingsDir,
1226 new File(checkpointDir, settingsDir.getName()));
1227 continue;
1228 }
1229 FileUtils.copyFiles(f, f.isDirectory()? checkpointDir:
1230 new File(checkpointDir, f.getName()));
1231 }
1232 }
1233
1234 /***
1235 * Checkpoint bdb.
1236 * I used do a call to log cleaning as suggested in je-2.0 javadoc but takes
1237 * way too much time (20minutes for a crawl of 1million items). Assume
1238 * cleaner is keeping up. Below was log cleaning loop .
1239 * <pre>int totalCleaned = 0;
1240 * for (int cleaned = 0; (cleaned = this.bdbEnvironment.cleanLog()) != 0;
1241 * totalCleaned += cleaned) {
1242 * LOGGER.fine("Cleaned " + cleaned + " log files.");
1243 * }
1244 * </pre>
1245 * <p>I also used to do a sync. But, from Mark Hayes, sync and checkpoint
1246 * are effectively same thing only sync is not configurable. He suggests
1247 * doing one or the other:
1248 * <p>MS: Reading code, Environment.sync() is a checkpoint. Looks like
1249 * I don't need to call a checkpoint after calling a sync?
1250 * <p>MH: Right, they're almost the same thing -- just do one or the other,
1251 * not both. With the new API, you'll need to do a checkpoint not a
1252 * sync, because the sync() method has no config parameter. Don't worry
1253 * -- it's fine to do a checkpoint even though you're not using.
1254 * @param checkpointDir Directory to write checkpoint to.
1255 * @throws DatabaseException
1256 * @throws IOException
1257 * @throws RuntimeException Thrown if failed setup of new bdb environment.
1258 */
1259 protected void checkpointBdb(File checkpointDir)
1260 throws DatabaseException, IOException, RuntimeException {
1261 EnvironmentConfig envConfig = this.bdbEnvironment.getConfig();
1262 final List bkgrdThreads = Arrays.asList(new String []
1263 {"je.env.runCheckpointer", "je.env.runCleaner",
1264 "je.env.runINCompressor"});
1265 try {
1266
1267 setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "false");
1268
1269 CheckpointConfig chkptConfig = new CheckpointConfig();
1270 chkptConfig.setForce(true);
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285 chkptConfig.setMinimizeRecoveryTime(true);
1286 this.bdbEnvironment.checkpoint(chkptConfig);
1287 LOGGER.fine("Finished bdb checkpoint.");
1288
1289
1290 EnvironmentImpl envImpl =
1291 DbInternal.getEnvironmentImpl(this.bdbEnvironment);
1292 long firstFileInNextSet =
1293 DbLsn.getFileNumber(envImpl.forceLogFileFlip());
1294
1295
1296 final String lastBdbCheckpointLog =
1297 getBdbLogFileName(firstFileInNextSet - 1);
1298 processBdbLogs(checkpointDir, lastBdbCheckpointLog);
1299 LOGGER.fine("Finished processing bdb log files.");
1300 } finally {
1301
1302 setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "true");
1303 }
1304 }
1305
1306 protected void processBdbLogs(final File checkpointDir,
1307 final String lastBdbCheckpointLog) throws IOException {
1308 File bdbDir = CheckpointUtils.getBdbSubDirectory(checkpointDir);
1309 if (!bdbDir.exists()) {
1310 bdbDir.mkdir();
1311 }
1312 PrintWriter pw = new PrintWriter(new FileOutputStream(new File(
1313 checkpointDir, "bdbje-logs-manifest.txt")));
1314 try {
1315
1316
1317 boolean pastLastLogFile = false;
1318 Set<String> srcFilenames = null;
1319 final boolean copyFiles = getCheckpointCopyBdbjeLogs();
1320 do {
1321 FilenameFilter filter = CheckpointUtils.getJeLogsFilter();
1322 srcFilenames =
1323 new HashSet<String>(Arrays.asList(
1324 getStateDisk().list(filter)));
1325 List tgtFilenames = Arrays.asList(bdbDir.list(filter));
1326 if (tgtFilenames != null && tgtFilenames.size() > 0) {
1327 srcFilenames.removeAll(tgtFilenames);
1328 }
1329 if (srcFilenames.size() > 0) {
1330
1331 srcFilenames = new TreeSet<String>(srcFilenames);
1332 int count = 0;
1333 for (final Iterator i = srcFilenames.iterator();
1334 i.hasNext() && !pastLastLogFile;) {
1335 String name = (String) i.next();
1336 if (copyFiles) {
1337 FileUtils.copyFiles(new File(getStateDisk(), name),
1338 new File(bdbDir, name));
1339 }
1340 pw.println(name);
1341 if (name.equals(lastBdbCheckpointLog)) {
1342
1343 pastLastLogFile = true;
1344 }
1345 count++;
1346 }
1347 if (LOGGER.isLoggable(Level.FINE)) {
1348 LOGGER.fine("Copied " + count);
1349 }
1350 }
1351 } while (!pastLastLogFile && srcFilenames != null &&
1352 srcFilenames.size() > 0);
1353 } finally {
1354 pw.close();
1355 }
1356 }
1357
1358 protected String getBdbLogFileName(final long index) {
1359 String lastBdbLogFileHex = Long.toHexString(index);
1360 StringBuffer buffer = new StringBuffer();
1361 for (int i = 0; i < (8 - lastBdbLogFileHex.length()); i++) {
1362 buffer.append('0');
1363 }
1364 buffer.append(lastBdbLogFileHex);
1365 buffer.append(".jdb");
1366 return buffer.toString();
1367 }
1368
1369 protected void setBdbjeBkgrdThreads(final EnvironmentConfig config,
1370 final List threads, final String setting) {
1371 for (final Iterator i = threads.iterator(); i.hasNext();) {
1372 config.setConfigParam((String)i.next(), setting);
1373 }
1374 }
1375
1376 /***
1377 * Get recover checkpoint.
1378 * Returns null if we're NOT in recover mode.
1379 * Looks at ATTR_RECOVER_PATH and if its a directory, assumes checkpoint
1380 * recover. If checkpoint mode, returns Checkpoint instance if
1381 * checkpoint was VALID (else null).
1382 * @return Checkpoint instance if we're in recover checkpoint
1383 * mode and the pointed-to checkpoint was valid.
1384 * @see #isCheckpointRecover()
1385 */
1386 public synchronized Checkpoint getCheckpointRecover() {
1387 if (this.checkpointRecover != null) {
1388 return this.checkpointRecover;
1389 }
1390 return getCheckpointRecover(this.order);
1391 }
1392
1393 public static Checkpoint getCheckpointRecover(final CrawlOrder order) {
1394 String path = (String)order.getUncheckedAttribute(null,
1395 CrawlOrder.ATTR_RECOVER_PATH);
1396 if (path == null || path.length() <= 0) {
1397 return null;
1398 }
1399 File rp = new File(path);
1400
1401 Checkpoint result = null;
1402 if (rp.exists() && rp.isDirectory()) {
1403 Checkpoint cp = new Checkpoint(rp);
1404 if (cp.isValid()) {
1405
1406 result = cp;
1407 }
1408 }
1409 return result;
1410 }
1411
1412 public static boolean isCheckpointRecover(final CrawlOrder order) {
1413 return getCheckpointRecover(order) != null;
1414 }
1415
1416 /***
1417 * @return True if we're in checkpoint recover mode. Call
1418 * {@link #getCheckpointRecover()} to get at Checkpoint instance
1419 * that has info on checkpoint directory being recovered from.
1420 */
1421 public boolean isCheckpointRecover() {
1422 return this.checkpointRecover != null;
1423 }
1424
1425 /***
1426 * Operator requested for crawl to stop.
1427 */
1428 public synchronized void requestCrawlStop() {
1429 requestCrawlStop(CrawlJob.STATUS_ABORTED);
1430 }
1431
1432 /***
1433 * Operator requested for crawl to stop.
1434 * @param message
1435 */
1436 public synchronized void requestCrawlStop(String message) {
1437 if (state == STOPPING || state == FINISHED) {
1438 return;
1439 }
1440 if (message == null) {
1441 throw new IllegalArgumentException("Message cannot be null.");
1442 }
1443 this.sExit = message;
1444 beginCrawlStop();
1445 }
1446
1447 /***
1448 * Start the process of stopping the crawl.
1449 */
1450 public void beginCrawlStop() {
1451 LOGGER.fine("Started.");
1452 sendCrawlStateChangeEvent(STOPPING, this.sExit);
1453 if (this.frontier != null) {
1454 this.frontier.terminate();
1455 this.frontier.unpause();
1456 }
1457 LOGGER.fine("Finished.");
1458 }
1459
1460 /***
1461 * Stop the crawl temporarly.
1462 */
1463 public synchronized void requestCrawlPause() {
1464 if (state == PAUSING || state == PAUSED) {
1465
1466 return;
1467 }
1468 sExit = CrawlJob.STATUS_WAITING_FOR_PAUSE;
1469 frontier.pause();
1470 sendCrawlStateChangeEvent(PAUSING, this.sExit);
1471 if (toePool.getActiveToeCount() == 0) {
1472
1473
1474 completePause();
1475 }
1476 }
1477
1478 /***
1479 * Tell if the controller is paused
1480 * @return true if paused
1481 */
1482 public boolean isPaused() {
1483 return state == PAUSED;
1484 }
1485
1486 public boolean isPausing() {
1487 return state == PAUSING;
1488 }
1489
1490 public boolean isRunning() {
1491 return state == RUNNING;
1492 }
1493
1494 /***
1495 * Resume crawl from paused state
1496 */
1497 public synchronized void requestCrawlResume() {
1498 if (state != PAUSING && state != PAUSED && state != CHECKPOINTING) {
1499
1500
1501 return;
1502 }
1503 multiThreadMode();
1504 frontier.unpause();
1505 LOGGER.fine("Crawl resumed.");
1506 sendCrawlStateChangeEvent(RUNNING, CrawlJob.STATUS_RUNNING);
1507 }
1508
1509 /***
1510 * @return Active toe thread count.
1511 */
1512 public int getActiveToeCount() {
1513 if (toePool == null) {
1514 return 0;
1515 }
1516 return toePool.getActiveToeCount();
1517 }
1518
1519 private void setupToePool() {
1520 toePool = new ToePool(this);
1521
1522 toePool.setSize(order.getMaxToes());
1523 }
1524
1525 /***
1526 * @return The order file instance.
1527 */
1528 public CrawlOrder getOrder() {
1529 return order;
1530 }
1531
1532 /***
1533 * @return The server cache instance.
1534 */
1535 public ServerCache getServerCache() {
1536 return serverCache;
1537 }
1538
1539 /***
1540 * @param o
1541 */
1542 public void setOrder(CrawlOrder o) {
1543 order = o;
1544 }
1545
1546
1547 /***
1548 * @return The frontier.
1549 */
1550 public Frontier getFrontier() {
1551 return frontier;
1552 }
1553
1554 /***
1555 * @return This crawl scope.
1556 */
1557 public CrawlScope getScope() {
1558 return scope;
1559 }
1560
1561 /*** Get the list of processor chains.
1562 *
1563 * @return the list of processor chains.
1564 */
1565 public ProcessorChainList getProcessorChainList() {
1566 return processorChains;
1567 }
1568
1569 /*** Get the first processor chain.
1570 *
1571 * @return the first processor chain.
1572 */
1573 public ProcessorChain getFirstProcessorChain() {
1574 return processorChains.getFirstChain();
1575 }
1576
1577 /*** Get the postprocessor chain.
1578 *
1579 * @return the postprocessor chain.
1580 */
1581 public ProcessorChain getPostprocessorChain() {
1582 return processorChains.getLastChain();
1583 }
1584
1585 /***
1586 * Get the 'working' directory of the current crawl.
1587 * @return the 'working' directory of the current crawl.
1588 */
1589 public File getDisk() {
1590 return disk;
1591 }
1592
1593 /***
1594 * @return Scratch disk location.
1595 */
1596 public File getScratchDisk() {
1597 return scratchDisk;
1598 }
1599
1600 /***
1601 * @return State disk location.
1602 */
1603 public File getStateDisk() {
1604 return stateDisk;
1605 }
1606
1607 /***
1608 * @return The number of ToeThreads
1609 *
1610 * @see ToePool#getToeCount()
1611 */
1612 public int getToeCount() {
1613 return this.toePool == null? 0: this.toePool.getToeCount();
1614 }
1615
1616 /***
1617 * @return The ToePool
1618 */
1619 public ToePool getToePool() {
1620 return toePool;
1621 }
1622
1623 /***
1624 * @return toepool one-line report
1625 */
1626 public String oneLineReportThreads() {
1627
1628 return toePool.singleLineReport();
1629 }
1630
1631 /***
1632 * While many settings will update automatically when the SettingsHandler is
1633 * modified, some settings need to be explicitly changed to reflect new
1634 * settings. This includes, number of toe threads and seeds.
1635 */
1636 public void kickUpdate() {
1637
1638 installThreadContextSettingsHandler();
1639
1640 toePool.setSize(order.getMaxToes());
1641
1642 this.scope.kickUpdate();
1643 this.frontier.kickUpdate();
1644 this.processorChains.kickUpdate();
1645
1646
1647
1648
1649 setThresholds();
1650 }
1651
1652 /***
1653 * @return The settings handler.
1654 */
1655 public SettingsHandler getSettingsHandler() {
1656 return settingsHandler;
1657 }
1658
1659 /***
1660 * This method iterates through processor chains to run processors' initial
1661 * tasks.
1662 *
1663 */
1664 private void runProcessorInitialTasks(){
1665 for (Iterator ic = processorChains.iterator(); ic.hasNext(); ) {
1666 for (Iterator ip = ((ProcessorChain) ic.next()).iterator();
1667 ip.hasNext(); ) {
1668 ((Processor) ip.next()).initialTasks();
1669 }
1670 }
1671 }
1672
1673 /***
1674 * This method iterates through processor chains to run processors' final
1675 * tasks.
1676 *
1677 */
1678 private void runProcessorFinalTasks(){
1679 for (Iterator ic = processorChains.iterator(); ic.hasNext(); ) {
1680 for (Iterator ip = ((ProcessorChain) ic.next()).iterator();
1681 ip.hasNext(); ) {
1682 ((Processor) ip.next()).finalTasks();
1683 }
1684 }
1685 }
1686
1687 /***
1688 * Kills a thread. For details see
1689 * {@link org.archive.crawler.framework.ToePool#killThread(int, boolean)
1690 * ToePool.killThread(int, boolean)}.
1691 * @param threadNumber Thread to kill.
1692 * @param replace Should thread be replaced.
1693 * @see org.archive.crawler.framework.ToePool#killThread(int, boolean)
1694 */
1695 public void killThread(int threadNumber, boolean replace){
1696 toePool.killThread(threadNumber, replace);
1697 }
1698
1699 /***
1700 * Add a file to the manifest of files used/generated by the current
1701 * crawl.
1702 *
1703 * TODO: Its possible for a file to be added twice if reports are
1704 * force generated midcrawl. Fix.
1705 *
1706 * @param file The filename (with absolute path) of the file to add
1707 * @param type The type of the file
1708 * @param bundle Should the file be included in a typical bundling of
1709 * crawler files.
1710 *
1711 * @see #MANIFEST_CONFIG_FILE
1712 * @see #MANIFEST_LOG_FILE
1713 * @see #MANIFEST_REPORT_FILE
1714 */
1715 public void addToManifest(String file, char type, boolean bundle) {
1716 manifest.append(type + (bundle? "+": "-") + " " + file + "\n");
1717 }
1718
1719 /***
1720 * Evaluate if the crawl should stop because it is finished.
1721 */
1722 public void checkFinish() {
1723 if(atFinish()) {
1724 beginCrawlStop();
1725 }
1726 }
1727
1728 /***
1729 * Evaluate if the crawl should stop because it is finished,
1730 * without actually stopping the crawl.
1731 *
1732 * @return true if crawl is at a finish-possible state
1733 */
1734 public boolean atFinish() {
1735 return state == RUNNING && !shouldContinueCrawling();
1736 }
1737
1738 private void readObject(ObjectInputStream stream)
1739 throws IOException, ClassNotFoundException {
1740 stream.defaultReadObject();
1741
1742 this.registeredCrawlStatusListeners =
1743 Collections.synchronizedList(new ArrayList<CrawlStatusListener>());
1744
1745 singleThreadMode = false;
1746 }
1747
1748 /***
1749 * Go to single thread mode, where only one ToeThread may
1750 * proceed at a time. Also acquires the single lock, so
1751 * no further threads will proceed past an
1752 * acquireContinuePermission. Caller mush be sure to release
1753 * lock to allow other threads to proceed one at a time.
1754 */
1755 public void singleThreadMode() {
1756 this.singleThreadLock.lock();
1757 singleThreadMode = true;
1758 }
1759
1760 /***
1761 * Go to back to regular multi thread mode, where all
1762 * ToeThreads may proceed at once
1763 */
1764 public void multiThreadMode() {
1765 this.singleThreadLock.lock();
1766 singleThreadMode = false;
1767 while(this.singleThreadLock.isHeldByCurrentThread()) {
1768 this.singleThreadLock.unlock();
1769 }
1770 }
1771
1772 /***
1773 * Proceed only if allowed, giving CrawlController a chance
1774 * to enforce single-thread mode.
1775 */
1776 public void acquireContinuePermission() {
1777 if (singleThreadMode) {
1778 this.singleThreadLock.lock();
1779 if(!singleThreadMode) {
1780
1781 while(this.singleThreadLock.isHeldByCurrentThread()) {
1782 this.singleThreadLock.unlock();
1783 }
1784 }
1785 }
1786 }
1787
1788 /***
1789 * Relinquish continue permission at end of processing (allowing
1790 * another thread to proceed if in single-thread mode).
1791 */
1792 public void releaseContinuePermission() {
1793 if (singleThreadMode) {
1794 while(this.singleThreadLock.isHeldByCurrentThread()) {
1795 this.singleThreadLock.unlock();
1796 }
1797 }
1798 }
1799
1800 public void freeReserveMemory() {
1801 if(!reserveMemory.isEmpty()) {
1802 reserveMemory.removeLast();
1803 System.gc();
1804 }
1805 }
1806
1807 /***
1808 * Note that a ToeThread reached paused condition, possibly
1809 * completing the crawl-pause.
1810 */
1811 public synchronized void toePaused() {
1812 releaseContinuePermission();
1813 if (state == PAUSING && toePool.getActiveToeCount() == 0) {
1814 completePause();
1815 }
1816 }
1817
1818 /***
1819 * Note that a ToeThread ended, possibly completing the crawl-stop.
1820 */
1821 public synchronized void toeEnded() {
1822 if (state == STOPPING && loopingToes.get() == 0) {
1823 completeStop();
1824 }
1825 }
1826
1827 /***
1828 * Add order file contents to manifest.
1829 * Write configuration files and any files managed by CrawlController to
1830 * it - files managed by other classes, excluding the settings framework,
1831 * are responsible for adding their files to the manifest themselves.
1832 * by calling addToManifest.
1833 * Call before writing out reports.
1834 */
1835 public void addOrderToManifest() {
1836 for (Iterator it = getSettingsHandler().getListOfAllFiles().iterator();
1837 it.hasNext();) {
1838 addToManifest((String)it.next(),
1839 CrawlController.MANIFEST_CONFIG_FILE, true);
1840 }
1841 }
1842
1843 /***
1844 * Log a URIException from deep inside other components to the crawl's
1845 * shared log.
1846 *
1847 * @param e URIException encountered
1848 * @param u CrawlURI where problem occurred
1849 * @param l String which could not be interpreted as URI without exception
1850 */
1851 public void logUriError(URIException e, UURI u, CharSequence l) {
1852 if (e.getReasonCode() == UURIFactory.IGNORED_SCHEME) {
1853
1854 return;
1855 }
1856 Object[] array = {u, l};
1857 uriErrors.log(Level.INFO, e.getMessage(), array);
1858 }
1859
1860
1861
1862
1863 public final static String PROCESSORS_REPORT = "processors";
1864 public final static String MANIFEST_REPORT = "manifest";
1865 protected final static String[] REPORTS = {PROCESSORS_REPORT, MANIFEST_REPORT};
1866
1867
1868
1869
1870 public String[] getReports() {
1871 return REPORTS;
1872 }
1873
1874
1875
1876
1877 public void reportTo(PrintWriter writer) {
1878 reportTo(null,writer);
1879 }
1880
1881 public String singleLineReport() {
1882 return ArchiveUtils.singleLineReport(this);
1883 }
1884
1885 public void reportTo(String name, PrintWriter writer) {
1886 if(PROCESSORS_REPORT.equals(name)) {
1887 reportProcessorsTo(writer);
1888 return;
1889 } else if (MANIFEST_REPORT.equals(name)) {
1890 reportManifestTo(writer);
1891 return;
1892 } else if (name!=null) {
1893 writer.println("requested report unknown: "+name);
1894 }
1895 singleLineReportTo(writer);
1896 }
1897
1898 /***
1899 * @param writer Where to write report to.
1900 */
1901 protected void reportManifestTo(PrintWriter writer) {
1902 writer.print(manifest.toString());
1903 }
1904
1905 /***
1906 * Compiles and returns a human readable report on the active processors.
1907 * @param writer Where to write to.
1908 * @see org.archive.crawler.framework.Processor#report()
1909 */
1910 protected void reportProcessorsTo(PrintWriter writer) {
1911 writer.print(
1912 "Processors report - "
1913 + ArchiveUtils.get12DigitDate()
1914 + "\n");
1915 writer.print(" Job being crawled: " + getOrder().getCrawlOrderName()
1916 + "\n");
1917
1918 writer.print(" Number of Processors: " +
1919 processorChains.processorCount() + "\n");
1920 writer.print(" NOTE: Some processors may not return a report!\n\n");
1921
1922 for (Iterator ic = processorChains.iterator(); ic.hasNext(); ) {
1923 for (Iterator ip = ((ProcessorChain) ic.next()).iterator();
1924 ip.hasNext(); ) {
1925 writer.print(((Processor) ip.next()).report());
1926 }
1927 }
1928 }
1929
1930 public void singleLineReportTo(PrintWriter writer) {
1931
1932 writer.write("[Crawl Controller]\n");
1933 }
1934
1935 public String singleLineLegend() {
1936
1937 return "nothingYet";
1938 }
1939
1940
1941 /*** controls which alternate ObjectIdentityCache implementation to use */
1942 private static boolean USE_OIBC = true;
1943
1944 /***
1945 * Call this method to get instance of the crawler BigMap implementation.
1946 * A "BigMap" is a Map that knows how to manage ever-growing sets of
1947 * key/value pairs. If we're in a checkpoint recovery, this method will
1948 * manage reinstantiation of checkpointed bigmaps.
1949 * @param dbName Name to give any associated database. Also used
1950 * as part of name serializing out bigmap. Needs to be unique to a crawl.
1951 * @param keyClass Class of keys we'll be using.
1952 * @param valueClass Class of values we'll be using.
1953 * @return Map that knows how to carry large sets of key/value pairs or
1954 * if none available, returns instance of HashMap.
1955 * @throws Exception
1956 */
1957 public <V> ObjectIdentityCache<String,V> getBigMap(final String dbName,
1958 final Class<? super V> valueClass)
1959 throws Exception {
1960 if(USE_OIBC) {
1961 return getOIBC(dbName, valueClass);
1962 } else {
1963 return getCBM(dbName, valueClass);
1964 }
1965 }
1966
1967 /***
1968 * Implement 'big map' with ObjectIdentityBdbCache.
1969 *
1970 * @param dbName Name to give any associated database. Also used
1971 * as part of name serializing out bigmap. Needs to be unique to a crawl.
1972 * @param keyClass Class of keys we'll be using.
1973 * @param valueClass Class of values we'll be using.
1974 * @return Map that knows how to carry large sets of key/value pairs or
1975 * if none available, returns instance of HashMap.
1976 * @throws Exception
1977 */
1978 protected <K,V> ObjectIdentityBdbCache<V> getOIBC(final String dbName,
1979 final Class<? super V> valueClass)
1980 throws Exception {
1981 ObjectIdentityBdbCache<V> result = new ObjectIdentityBdbCache<V>();
1982 if (isCheckpointRecover()) {
1983 File baseDir = getCheckpointRecover().getDirectory();
1984 @SuppressWarnings("unchecked")
1985 ObjectIdentityBdbCache<V> temp = CheckpointUtils.
1986 readObjectFromFile(result.getClass(), dbName, baseDir);
1987 result = temp;
1988 }
1989 result.initialize(getBdbEnvironment(), dbName, valueClass,
1990 getBdbEnvironment().getClassCatalog());
1991
1992
1993 this.bigmaps.put(dbName, result);
1994 return result;
1995 }
1996
1997 /***
1998 * Implement 'big map' with CachedBdbMap.
1999 *
2000 * @param dbName Name to give any associated database. Also used
2001 * as part of name serializing out bigmap. Needs to be unique to a crawl.
2002 * @param keyClass Class of keys we'll be using.
2003 * @param valueClass Class of values we'll be using.
2004 * @return Map that knows how to carry large sets of key/value pairs or
2005 * if none available, returns instance of HashMap.
2006 * @throws Exception
2007 * @deprecated
2008 */
2009 protected <V> CachedBdbMap<String,V> getCBM(final String dbName,
2010 final Class<? super V> valueClass)
2011 throws Exception {
2012 CachedBdbMap<String,V> result = new CachedBdbMap<String,V>(dbName);
2013 if (isCheckpointRecover()) {
2014 File baseDir = getCheckpointRecover().getDirectory();
2015 @SuppressWarnings("unchecked")
2016 CachedBdbMap<String,V> temp = CheckpointUtils.
2017 readObjectFromFile(result.getClass(), dbName, baseDir);
2018 result = temp;
2019 }
2020 result.initialize(getBdbEnvironment(), valueClass,
2021 getBdbEnvironment().getClassCatalog());
2022
2023
2024 this.bigmaps.put(dbName, result);
2025 return result;
2026 }
2027
2028 protected void checkpointBigMaps(final File cpDir)
2029 throws Exception {
2030 for (final Iterator i = this.bigmaps.keySet().iterator(); i.hasNext();) {
2031 Object key = i.next();
2032 ObjectIdentityCache obj = this.bigmaps.get(key);
2033
2034
2035
2036
2037 obj.sync();
2038 CheckpointUtils.writeObjectToFile(obj, (String)key, cpDir);
2039 }
2040 }
2041
2042 /***
2043 * Called whenever progress statistics logging event.
2044 * @param e Progress statistics event.
2045 */
2046 public void progressStatisticsEvent(final EventObject e) {
2047
2048
2049
2050
2051 }
2052
2053 /***
2054 * Log to the progress statistics log.
2055 * @param msg Message to write the progress statistics log.
2056 */
2057 public void logProgressStatistics(final String msg) {
2058 this.progressStats.info(msg);
2059 }
2060
2061 /***
2062 * @return CrawlController state.
2063 */
2064 public Object getState() {
2065 return this.state;
2066 }
2067
2068 public File getCheckpointsDisk() {
2069 return this.checkpointsDisk;
2070 }
2071
2072 public AtomicInteger getLoopingToes() {
2073 return loopingToes;
2074 }
2075 }