1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.archive.crawler.admin;
23
24 import java.io.File;
25 import java.io.FileOutputStream;
26 import java.io.IOException;
27 import java.io.OutputStreamWriter;
28 import java.io.PrintWriter;
29 import java.io.Serializable;
30 import java.io.UnsupportedEncodingException;
31 import java.net.URLEncoder;
32 import java.util.Comparator;
33 import java.util.Date;
34 import java.util.EventObject;
35 import java.util.HashMap;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.SortedMap;
40 import java.util.TreeMap;
41 import java.util.TreeSet;
42 import java.util.Vector;
43 import java.util.concurrent.ConcurrentHashMap;
44 import java.util.concurrent.ConcurrentMap;
45 import java.util.concurrent.atomic.AtomicLong;
46 import java.util.logging.Level;
47 import java.util.logging.Logger;
48
49 import org.apache.commons.collections.Closure;
50 import org.apache.commons.httpclient.HttpStatus;
51 import org.archive.crawler.datamodel.CrawlHost;
52 import org.archive.crawler.datamodel.CrawlURI;
53 import org.archive.crawler.datamodel.FetchStatusCodes;
54 import org.archive.crawler.deciderules.recrawl.IdenticalDigestDecideRule;
55 import org.archive.crawler.event.CrawlURIDispositionListener;
56 import org.archive.crawler.framework.AbstractTracker;
57 import org.archive.crawler.framework.CrawlController;
58 import org.archive.crawler.framework.exceptions.FatalConfigurationException;
59 import org.archive.crawler.util.CrawledBytesHistotable;
60 import org.archive.net.UURI;
61 import org.archive.util.ArchiveUtils;
62 import org.archive.util.MimetypeUtils;
63 import org.archive.util.ObjectIdentityCache;
64 import org.archive.util.PaddingStringBuffer;
65 import org.archive.util.Supplier;
66
67 /***
68 * This is an implementation of the AbstractTracker. It is designed to function
69 * with the WUI as well as performing various logging activity.
70 * <p>
71 * At the end of each snapshot a line is written to the
72 * 'progress-statistics.log' file.
73 * <p>
74 * The header of that file is as follows:
75 * <pre> [timestamp] [discovered] [queued] [downloaded] [doc/s(avg)] [KB/s(avg)] [dl-failures] [busy-thread] [mem-use-KB]</pre>
76 * First there is a <b>timestamp</b>, accurate down to 1 second.
77 * <p>
78 * <b>discovered</b>, <b>queued</b>, <b>downloaded</b> and <b>dl-failures</b>
79 * are (respectively) the discovered URI count, pending URI count, successfully
80 * fetched count and failed fetch count from the frontier at the time of the
81 * snapshot.
82 * <p>
83 * <b>KB/s(avg)</b> is the bandwidth usage. We use the total bytes downloaded
84 * to calculate average bandwidth usage (KB/sec). Since we also note the value
85 * each time a snapshot is made we can calculate the average bandwidth usage
86 * during the last snapshot period to gain a "current" rate. The first number is
87 * the current and the average is in parenthesis.
88 * <p>
89 * <b>doc/s(avg)</b> works the same way as doc/s except it show the number of
90 * documents (URIs) rather then KB downloaded.
91 * <p>
92 * <b>busy-threads</b> is the total number of ToeThreads that are not available
93 * (and thus presumably busy processing a URI). This information is extracted
94 * from the crawl controller.
95 * <p>
96 * Finally mem-use-KB is extracted from the run time environment
97 * (<code>Runtime.getRuntime().totalMemory()</code>).
98 * <p>
99 * In addition to the data collected for the above logs, various other data
100 * is gathered and stored by this tracker.
101 * <ul>
102 * <li> Successfully downloaded documents per fetch status code
103 * <li> Successfully downloaded documents per document mime type
104 * <li> Amount of data per mime type
105 * <li> Successfully downloaded documents per host
106 * <li> Amount of data per host
107 * <li> Disposition of all seeds (this is written to 'reports.log' at end of
108 * crawl)
109 * <li> Successfully downloaded documents per host per source
110 * </ul>
111 *
112 * @author Parker Thompson
113 * @author Kristinn Sigurdsson
114 *
115 * @see org.archive.crawler.framework.StatisticsTracking
116 * @see org.archive.crawler.framework.AbstractTracker
117 */
118 public class StatisticsTracker extends AbstractTracker
119 implements CrawlURIDispositionListener, Serializable {
120 private static final long serialVersionUID = 8004878315916392305L;
121
122 /***
123 * Messages from the StatisticsTracker.
124 */
125 private final static Logger logger =
126 Logger.getLogger(StatisticsTracker.class.getName());
127
128
129
130
131 protected long lastPagesFetchedCount = 0;
132 protected long lastProcessedBytesCount = 0;
133
134
135
136
137 protected long discoveredUriCount = 0;
138 protected long queuedUriCount = 0;
139 protected long finishedUriCount = 0;
140
141 protected long downloadedUriCount = 0;
142 protected long downloadFailures = 0;
143 protected long downloadDisregards = 0;
144 protected double docsPerSecond = 0;
145 protected double currentDocsPerSecond = 0;
146 protected int currentKBPerSec = 0;
147 protected long totalKBPerSec = 0;
148 protected int busyThreads = 0;
149 protected long totalProcessedBytes = 0;
150 protected float congestionRatio = 0;
151 protected long deepestUri;
152 protected long averageDepth;
153
154
155
156
157 /*** tally sizes novel, verified (same hash), vouched (not-modified) */
158 protected CrawledBytesHistotable crawledBytes = new CrawledBytesHistotable();
159
160 protected long notModifiedUriCount = 0;
161 protected long dupByHashUriCount = 0;
162 protected long novelUriCount = 0;
163
164 /*** Keep track of the file types we see (mime type -> count) */
165 protected ConcurrentMap<String,AtomicLong> mimeTypeDistribution
166 = new ConcurrentHashMap<String,AtomicLong>();
167 protected ConcurrentMap<String,AtomicLong> mimeTypeBytes
168 = new ConcurrentHashMap<String,AtomicLong>();
169
170 /*** Keep track of fetch status codes */
171 protected ConcurrentMap<String,AtomicLong> statusCodeDistribution
172 = new ConcurrentHashMap<String,AtomicLong>();
173
174 /*** reusable Supplier for initial zero AtomicLong instances */
175 private static final Supplier<AtomicLong> ATOMIC_ZERO_SUPPLIER =
176 new Supplier<AtomicLong>() {
177 public AtomicLong get() {
178 return new AtomicLong(0);
179 }};
180
181 /*** Keep track of hosts.
182 *
183 * <p>They're transient because usually bigmaps that get reconstituted
184 * on recover from checkpoint.
185 */
186 protected transient ObjectIdentityCache<String,AtomicLong> hostsDistribution = null;
187 protected transient ObjectIdentityCache<String,AtomicLong> hostsBytes = null;
188 protected transient ObjectIdentityCache<String,AtomicLong> hostsLastFinished = null;
189
190 /*** Keep track of URL counts per host per seed */
191 protected transient
192 ObjectIdentityCache<String,ConcurrentMap<String,AtomicLong>> sourceHostDistribution = null;
193
194 /***
195 * Record of seeds' latest actions.
196 */
197 protected transient ObjectIdentityCache<String,SeedRecord> processedSeedsRecords;
198
199
200 private int seedsCrawled;
201 private int seedsNotCrawled;
202
203 private String sExitMessage = "Before crawl end";
204
205
206 public StatisticsTracker(String name) {
207 super( name, "A statistics tracker thats integrated into " +
208 "the web UI and that creates the progress-statistics log.");
209 }
210
211 public void initialize(CrawlController c)
212 throws FatalConfigurationException {
213 super.initialize(c);
214 try {
215 this.sourceHostDistribution = c.getBigMap("sourceHostDistribution",
216 ConcurrentMap.class);
217 this.hostsDistribution = c.getBigMap("hostsDistribution",
218 AtomicLong.class);
219 this.hostsBytes = c.getBigMap("hostsBytes", AtomicLong.class);
220 this.hostsLastFinished = c.getBigMap("hostsLastFinished",
221 AtomicLong.class);
222 this.processedSeedsRecords = c.getBigMap("processedSeedsRecords",
223 SeedRecord.class);
224 } catch (Exception e) {
225 throw new FatalConfigurationException("Failed setup of" +
226 " StatisticsTracker: " + e);
227 }
228 controller.addCrawlURIDispositionListener(this);
229 }
230
231 protected void finalCleanup() {
232 super.finalCleanup();
233 if (this.hostsBytes != null) {
234 this.hostsBytes.close();
235 this.hostsBytes = null;
236 }
237 if (this.hostsDistribution != null) {
238 this.hostsDistribution.close();
239 this.hostsDistribution = null;
240 }
241 if (this.hostsLastFinished != null) {
242 this.hostsLastFinished.close();
243 this.hostsLastFinished = null;
244 }
245 if (this.processedSeedsRecords != null) {
246 this.processedSeedsRecords.close();
247 this.processedSeedsRecords = null;
248 }
249 if (this.sourceHostDistribution != null) {
250 this.sourceHostDistribution.close();
251 this.sourceHostDistribution = null;
252 }
253
254 }
255
256 protected synchronized void progressStatisticsEvent(final EventObject e) {
257
258 discoveredUriCount = discoveredUriCount();
259 downloadedUriCount = successfullyFetchedCount();
260 finishedUriCount = finishedUriCount();
261 queuedUriCount = queuedUriCount();
262 downloadFailures = failedFetchAttempts();
263 downloadDisregards = disregardedFetchAttempts();
264 totalProcessedBytes = totalBytesCrawled();
265 congestionRatio = congestionRatio();
266 deepestUri = deepestUri();
267 averageDepth = averageDepth();
268
269 if (finishedUriCount() == 0) {
270 docsPerSecond = 0;
271 totalKBPerSec = 0;
272 } else if (getCrawlerTotalElapsedTime() < 1000) {
273 return;
274 } else {
275 docsPerSecond = (double) downloadedUriCount /
276 (double)(getCrawlerTotalElapsedTime() / 1000);
277
278 totalKBPerSec = (long)(((totalProcessedBytes / 1024) /
279 ((getCrawlerTotalElapsedTime()) / 1000)) + .5 );
280 }
281
282 busyThreads = activeThreadCount();
283
284 if(shouldrun ||
285 (System.currentTimeMillis() - lastLogPointTime) >= 1000) {
286
287
288
289
290 currentDocsPerSecond = 0;
291 currentKBPerSec = 0;
292
293
294 long currentTime = System.currentTimeMillis();
295 long sampleTime = currentTime - lastLogPointTime;
296
297
298
299 if (sampleTime >= 1000) {
300
301 long currentPageCount = successfullyFetchedCount();
302 long samplePageCount = currentPageCount - lastPagesFetchedCount;
303
304 currentDocsPerSecond =
305 (double) samplePageCount / (double)(sampleTime / 1000);
306
307 lastPagesFetchedCount = currentPageCount;
308
309
310 long currentProcessedBytes = totalProcessedBytes;
311 long sampleProcessedBytes =
312 currentProcessedBytes - lastProcessedBytesCount;
313
314 currentKBPerSec =
315 (int)(((sampleProcessedBytes/1024)/(sampleTime/1000)) + .5);
316
317 lastProcessedBytesCount = currentProcessedBytes;
318 }
319 }
320
321 if (this.controller != null) {
322 this.controller.logProgressStatistics(getProgressStatisticsLine());
323 }
324 lastLogPointTime = System.currentTimeMillis();
325 super.progressStatisticsEvent(e);
326 }
327
328 /***
329 * Return one line of current progress-statistics
330 *
331 * @param now
332 * @return String of stats
333 */
334 public String getProgressStatisticsLine(Date now) {
335 return new PaddingStringBuffer()
336 .append(ArchiveUtils.getLog14Date(now))
337 .raAppend(32, discoveredUriCount)
338 .raAppend(44, queuedUriCount)
339 .raAppend(57, downloadedUriCount)
340 .raAppend(74, ArchiveUtils.
341 doubleToString(currentDocsPerSecond, 2) +
342 "(" + ArchiveUtils.doubleToString(docsPerSecond, 2) + ")")
343 .raAppend(85, currentKBPerSec + "(" + totalKBPerSec + ")")
344 .raAppend(99, downloadFailures)
345 .raAppend(113, busyThreads)
346 .raAppend(126, (Runtime.getRuntime().totalMemory() -
347 Runtime.getRuntime().freeMemory()) / 1024)
348 .raAppend(140, Runtime.getRuntime().totalMemory() / 1024)
349 .raAppend(153, ArchiveUtils.doubleToString(congestionRatio, 2))
350 .raAppend(165, deepestUri)
351 .raAppend(177, averageDepth)
352 .toString();
353 }
354
355 public Map<String,Number> getProgressStatistics() {
356 Map<String,Number> stats = new HashMap<String,Number>();
357 stats.put("discoveredUriCount", new Long(discoveredUriCount));
358 stats.put("queuedUriCount", new Long(queuedUriCount));
359 stats.put("downloadedUriCount", new Long(downloadedUriCount));
360 stats.put("currentDocsPerSecond", new Double(currentDocsPerSecond));
361 stats.put("docsPerSecond", new Double(docsPerSecond));
362 stats.put("totalKBPerSec", new Long(totalKBPerSec));
363 stats.put("totalProcessedBytes", new Long(totalProcessedBytes));
364 stats.put("currentKBPerSec", new Long(currentKBPerSec));
365 stats.put("downloadFailures", new Long(downloadFailures));
366 stats.put("busyThreads", new Integer(busyThreads));
367 stats.put("congestionRatio", new Double(congestionRatio));
368 stats.put("deepestUri", new Long(deepestUri));
369 stats.put("averageDepth", new Long(averageDepth));
370 stats.put("totalMemory", new Long(Runtime.getRuntime().totalMemory()));
371 stats.put("freeMemory", new Long(Runtime.getRuntime().freeMemory()));
372 return stats;
373 }
374
375 /***
376 * Return one line of current progress-statistics
377 *
378 * @return String of stats
379 */
380 public String getProgressStatisticsLine() {
381 return getProgressStatisticsLine(new Date());
382 }
383
384 public double processedDocsPerSec(){
385 return docsPerSecond;
386 }
387
388 public double currentProcessedDocsPerSec(){
389 return currentDocsPerSecond;
390 }
391
392 public long processedKBPerSec(){
393 return totalKBPerSec;
394 }
395
396 public int currentProcessedKBPerSec(){
397 return currentKBPerSec;
398 }
399
400 /*** Returns a HashMap that contains information about distributions of
401 * encountered mime types. Key/value pairs represent
402 * mime type -> count.
403 * <p>
404 * <b>Note:</b> All the values are wrapped with a {@link AtomicLong AtomicLong}
405 * @return mimeTypeDistribution
406 */
407 public Map<String,AtomicLong> getFileDistribution() {
408 return mimeTypeDistribution;
409 }
410
411
412 /***
413 * Increment a counter for a key in a given HashMap. Used for various
414 * aggregate data.
415 *
416 * As this is used to change Maps which depend on StatisticsTracker
417 * for their synchronization, this method should only be invoked
418 * from a a block synchronized on 'this'.
419 *
420 * @param map The HashMap
421 * @param key The key for the counter to be incremented, if it does not
422 * exist it will be added (set to 1). If null it will
423 * increment the counter "unknown".
424 */
425 protected static void incrementMapCount(ConcurrentMap<String,AtomicLong> map,
426 String key) {
427 incrementMapCount(map,key,1);
428 }
429
430 /***
431 * Increment a counter for a key in a given cache. Used for various
432 * aggregate data.
433 *
434 * @param cache the ObjectIdentityCache
435 * @param key The key for the counter to be incremented, if it does not
436 * exist it will be added (set to 1). If null it will
437 * increment the counter "unknown".
438 */
439 protected static void incrementCacheCount(ObjectIdentityCache<String,AtomicLong> cache,
440 String key) {
441 incrementCacheCount(cache,key,1);
442 }
443
444 /***
445 * Increment a counter for a key in a given cache by an arbitrary amount.
446 * Used for various aggregate data. The increment amount can be negative.
447 *
448 *
449 * @param cache
450 * The ObjectIdentityCache
451 * @param key
452 * The key for the counter to be incremented, if it does not exist
453 * it will be added (set to equal to <code>increment</code>).
454 * If null it will increment the counter "unknown".
455 * @param increment
456 * The amount to increment counter related to the <code>key</code>.
457 */
458 protected static void incrementCacheCount(ObjectIdentityCache<String,AtomicLong> cache,
459 String key, long increment) {
460 if (key == null) {
461 key = "unknown";
462 }
463 AtomicLong lw = cache.getOrUse(key, ATOMIC_ZERO_SUPPLIER);
464 lw.addAndGet(increment);
465 }
466
467 /***
468 * Increment a counter for a key in a given HashMap by an arbitrary amount.
469 * Used for various aggregate data. The increment amount can be negative.
470 *
471 * @param map
472 * The Map or ConcurrentMap
473 * @param key
474 * The key for the counter to be incremented, if it does not exist
475 * it will be added (set to equal to <code>increment</code>).
476 * If null it will increment the counter "unknown".
477 * @param increment
478 * The amount to increment counter related to the <code>key</code>.
479 */
480 protected static void incrementMapCount(ConcurrentMap<String,AtomicLong> map,
481 String key, long increment) {
482 if (key == null) {
483 key = "unknown";
484 }
485 AtomicLong lw = map.get(key);
486 if(lw == null) {
487 lw = new AtomicLong();
488 AtomicLong prevVal = map.putIfAbsent(key, lw);
489 if(prevVal != null) {
490 lw = prevVal;
491 }
492 }
493 lw.addAndGet(increment);
494 }
495
496 /***
497 * Sort the entries of the given HashMap in descending order by their
498 * values, which must be longs wrapped with <code>AtomicLong</code>.
499 * <p>
500 * Elements are sorted by value from largest to smallest. Equal values are
501 * sorted in an arbitrary, but consistent manner by their keys. Only items
502 * with identical value and key are considered equal.
503 *
504 * If the passed-in map requires access to be synchronized, the caller
505 * should ensure this synchronization.
506 *
507 * @param mapOfAtomicLongValues
508 * Assumes values are wrapped with AtomicLong.
509 * @return a sorted set containing the same elements as the map.
510 */
511 public TreeMap<String,AtomicLong> getReverseSortedCopy(
512 final Map<String,AtomicLong> mapOfAtomicLongValues) {
513 TreeMap<String,AtomicLong> sortedMap =
514 new TreeMap<String,AtomicLong>(new Comparator<String>() {
515 public int compare(String e1, String e2) {
516 long firstVal = mapOfAtomicLongValues.get(e1).get();
517 long secondVal = mapOfAtomicLongValues.get(e2).get();
518 if (firstVal < secondVal) {
519 return 1;
520 }
521 if (secondVal < firstVal) {
522 return -1;
523 }
524
525 return e1.compareTo(e2);
526 }
527 });
528 try {
529 sortedMap.putAll(mapOfAtomicLongValues);
530 } catch (UnsupportedOperationException e) {
531 for (String key: mapOfAtomicLongValues.keySet()) {
532 sortedMap.put(key, mapOfAtomicLongValues.get(key));
533 }
534 }
535 return sortedMap;
536 }
537
538 /***
539 * Sort the entries of the given ObjectIdentityCache in descending order by their
540 * values, which must be longs wrapped with <code>AtomicLong</code>.
541 * <p>
542 * Elements are sorted by value from largest to smallest. Equal values are
543 * sorted in an arbitrary, but consistent manner by their keys. Only items
544 * with identical value and key are considered equal.
545 *
546 * If the passed-in map requires access to be synchronized, the caller
547 * should ensure this synchronization.
548 *
549 * @param mapOfAtomicLongValues
550 * Assumes values are wrapped with AtomicLong.
551 * @return a sorted set containing the same elements as the map.
552 */
553 public TreeMap<String,AtomicLong> getReverseSortedCopy(
554 final ObjectIdentityCache<String,AtomicLong> mapOfAtomicLongValues) {
555 TreeMap<String,AtomicLong> sortedMap =
556 new TreeMap<String,AtomicLong>(new Comparator<String>() {
557 public int compare(String e1, String e2) {
558 long firstVal = mapOfAtomicLongValues.get(e1).get();
559 long secondVal = mapOfAtomicLongValues.get(e2).get();
560 if (firstVal < secondVal) {
561 return 1;
562 }
563 if (secondVal < firstVal) {
564 return -1;
565 }
566
567 return e1.compareTo(e2);
568 }
569 });
570 for (String key: mapOfAtomicLongValues.keySet()) {
571 sortedMap.put(key, mapOfAtomicLongValues.get(key));
572 }
573 return sortedMap;
574 }
575
576 /***
577 * Return a HashMap representing the distribution of status codes for
578 * successfully fetched curis, as represented by a hashmap where key ->
579 * val represents (string)code -> (integer)count.
580 *
581 * <b>Note: </b> All the values are wrapped with a
582 * {@link AtomicLong AtomicLong}
583 *
584 * @return statusCodeDistribution
585 */
586 public Map<String,AtomicLong> getStatusCodeDistribution() {
587 return statusCodeDistribution;
588 }
589
590 /***
591 * Returns the time (in millisec) when a URI belonging to a given host was
592 * last finished processing.
593 *
594 * @param host The host to look up time of last completed URI.
595 * @return Returns the time (in millisec) when a URI belonging to a given
596 * host was last finished processing. If no URI has been completed for host
597 * -1 will be returned.
598 */
599 public AtomicLong getHostLastFinished(String host){
600 AtomicLong fini = hostsLastFinished.getOrUse(host, ATOMIC_ZERO_SUPPLIER);
601 return fini;
602 }
603
604 /***
605 * Returns the accumulated number of bytes downloaded from a given host.
606 * @param host name of the host
607 * @return the accumulated number of bytes downloaded from a given host
608 */
609 public long getBytesPerHost(String host){
610 return ((AtomicLong)hostsBytes.get(host)).get();
611 }
612
613 /***
614 * Returns the accumulated number of bytes from files of a given file type.
615 * @param filetype Filetype to check.
616 * @return the accumulated number of bytes from files of a given mime type
617 */
618 public long getBytesPerFileType(String filetype){
619 return ((AtomicLong)mimeTypeBytes.get(filetype)).get();
620 }
621
622 /***
623 * Get the total number of ToeThreads (sleeping and active)
624 *
625 * @return The total number of ToeThreads
626 */
627 public int threadCount() {
628 return this.controller != null? controller.getToeCount(): 0;
629 }
630
631 /***
632 * @return Current thread count (or zero if can't figure it out).
633 */
634 public int activeThreadCount() {
635 return this.controller != null? controller.getActiveToeCount(): 0;
636
637
638 }
639
640 /***
641 * This returns the number of completed URIs as a percentage of the total
642 * number of URIs encountered (should be inverse to the discovery curve)
643 *
644 * @return The number of completed URIs as a percentage of the total
645 * number of URIs encountered
646 */
647 public int percentOfDiscoveredUrisCompleted() {
648 long completed = finishedUriCount();
649 long total = discoveredUriCount();
650
651 if (total == 0) {
652 return 0;
653 }
654
655 return (int) (100 * completed / total);
656 }
657
658 /***
659 * Number of <i>discovered</i> URIs.
660 *
661 * <p>If crawl not running (paused or stopped) this will return the value of
662 * the last snapshot.
663 *
664 * @return A count of all uris encountered
665 *
666 * @see org.archive.crawler.framework.Frontier#discoveredUriCount()
667 */
668 public long discoveredUriCount() {
669
670
671 return shouldrun && this.controller != null &&
672 this.controller.getFrontier() != null?
673 controller.getFrontier().discoveredUriCount() : discoveredUriCount;
674 }
675
676 /***
677 * Number of URIs that have <i>finished</i> processing.
678 *
679 * @return Number of URIs that have finished processing
680 *
681 * @see org.archive.crawler.framework.Frontier#finishedUriCount()
682 */
683 public long finishedUriCount() {
684 return shouldrun && this.controller != null &&
685 this.controller.getFrontier() != null ?
686 controller.getFrontier().finishedUriCount() : finishedUriCount;
687 }
688
689 /***
690 * Get the total number of failed fetch attempts (connection failures -> give up, etc)
691 *
692 * @return The total number of failed fetch attempts
693 */
694 public long failedFetchAttempts() {
695
696
697 return shouldrun && this.controller != null &&
698 this.controller.getFrontier() != null ?
699 controller.getFrontier().failedFetchCount() : downloadFailures;
700 }
701
702 /***
703 * Get the total number of failed fetch attempts (connection failures -> give up, etc)
704 *
705 * @return The total number of failed fetch attempts
706 */
707 public long disregardedFetchAttempts() {
708
709
710 return shouldrun && this.controller != null &&
711 this.controller.getFrontier() != null?
712 controller.getFrontier().disregardedUriCount() : downloadDisregards;
713 }
714
715 public long successfullyFetchedCount() {
716
717
718 return shouldrun && this.controller != null &&
719 this.controller.getFrontier() != null?
720 controller.getFrontier().succeededFetchCount() : downloadedUriCount;
721 }
722
723 public long totalCount() {
724 return queuedUriCount() + activeThreadCount() +
725 successfullyFetchedCount();
726 }
727
728 /***
729 * Ratio of number of threads that would theoretically allow
730 * maximum crawl progress (if each was as productive as current
731 * threads), to current number of threads.
732 *
733 * @return float congestion ratio
734 */
735 public float congestionRatio() {
736
737
738 return shouldrun && this.controller != null &&
739 this.controller.getFrontier() != null ?
740 controller.getFrontier().congestionRatio() : congestionRatio;
741 }
742
743 /***
744 * Ordinal position of the 'deepest' URI eligible
745 * for crawling. Essentially, the length of the longest
746 * frontier internal queue.
747 *
748 * @return long URI count to deepest URI
749 */
750 public long deepestUri() {
751
752
753 return shouldrun && this.controller != null &&
754 this.controller.getFrontier() != null ?
755 controller.getFrontier().deepestUri() : deepestUri;
756 }
757
758 /***
759 * Average depth of the last URI in all eligible queues.
760 * That is, the average length of all eligible queues.
761 *
762 * @return long average depth of last URIs in queues
763 */
764 public long averageDepth() {
765
766
767 return shouldrun && this.controller != null &&
768 this.controller.getFrontier() != null ?
769 controller.getFrontier().averageDepth() : averageDepth;
770 }
771
772 /***
773 * Number of URIs <i>queued</i> up and waiting for processing.
774 *
775 * <p>If crawl not running (paused or stopped) this will return the value
776 * of the last snapshot.
777 *
778 * @return Number of URIs queued up and waiting for processing.
779 *
780 * @see org.archive.crawler.framework.Frontier#queuedUriCount()
781 */
782 public long queuedUriCount() {
783
784
785 return shouldrun && this.controller != null &&
786 this.controller.getFrontier() != null?
787 controller.getFrontier().queuedUriCount() : queuedUriCount;
788 }
789
790 /*** @deprecated use totalBytesCrawled */
791 public long totalBytesWritten() {
792
793 return shouldrun && this.controller != null &&
794 this.controller.getFrontier() != null?
795 controller.getFrontier().totalBytesWritten() : totalProcessedBytes;
796 }
797
798 public long totalBytesCrawled() {
799 return shouldrun ?
800 crawledBytes.getTotal() : totalProcessedBytes;
801 }
802
803 public String crawledBytesSummary() {
804 return crawledBytes.summary();
805 }
806
807 /***
808 * If the curi is a seed, we insert into the processedSeedsRecords map.
809 *
810 * @param curi The CrawlURI that may be a seed.
811 * @param disposition The dispositino of the CrawlURI.
812 */
813 private void handleSeed(final CrawlURI curi, final String disposition) {
814 if(curi.isSeed()){
815 SeedRecord sr = processedSeedsRecords.getOrUse(
816 curi.toString(),
817 new Supplier<SeedRecord>() {
818 public SeedRecord get() {
819 return new SeedRecord(curi, disposition);
820 }});
821 sr.updateWith(curi,disposition);
822 }
823 }
824
825 public void crawledURISuccessful(CrawlURI curi) {
826 handleSeed(curi,SEED_DISPOSITION_SUCCESS);
827
828 crawledBytes.accumulate(curi);
829
830
831 if(curi.getFetchStatus()==HttpStatus.SC_NOT_MODIFIED) {
832 notModifiedUriCount++;
833 } else if (IdenticalDigestDecideRule.hasIdenticalDigest(curi)) {
834 dupByHashUriCount++;
835 } else {
836 novelUriCount++;
837 }
838
839
840 incrementMapCount(statusCodeDistribution,
841 Integer.toString(curi.getFetchStatus()));
842
843
844 String mime = MimetypeUtils.truncate(curi.getContentType());
845 incrementMapCount(mimeTypeDistribution, mime);
846 incrementMapCount(mimeTypeBytes, mime, curi.getContentSize());
847
848
849 saveHostStats(curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS ? "dns:" :
850 this.controller.getServerCache().getHostFor(curi).getHostName(),
851 curi.getContentSize());
852
853 if (curi.containsKey(CrawlURI.A_SOURCE_TAG)){
854 saveSourceStats(curi.getString(CrawlURI.A_SOURCE_TAG),
855 this.controller.getServerCache().getHostFor(curi).
856 getHostName());
857 }
858 }
859
860 protected void saveSourceStats(String source, String hostname) {
861 synchronized(sourceHostDistribution) {
862 ConcurrentMap<String,AtomicLong> hostUriCount =
863 sourceHostDistribution.getOrUse(
864 source,
865 new Supplier<ConcurrentMap<String,AtomicLong>>() {
866 public ConcurrentMap<String, AtomicLong> get() {
867 return new ConcurrentHashMap<String,AtomicLong>();
868 }});
869 incrementMapCount(hostUriCount, hostname);
870 }
871 }
872
873 protected void saveHostStats(String hostname, long size) {
874 incrementCacheCount(hostsDistribution, hostname);
875
876 incrementCacheCount(hostsBytes, hostname, size);
877
878 long time = new Long(System.currentTimeMillis());
879 getHostLastFinished(hostname).set(time);
880 }
881
882 public void crawledURINeedRetry(CrawlURI curi) {
883 handleSeed(curi,SEED_DISPOSITION_RETRY);
884 }
885
886 public void crawledURIDisregard(CrawlURI curi) {
887 handleSeed(curi,SEED_DISPOSITION_DISREGARD);
888 }
889
890 public void crawledURIFailure(CrawlURI curi) {
891 handleSeed(curi,SEED_DISPOSITION_FAILURE);
892 }
893
894 /***
895 * Get a seed iterator for the job being monitored.
896 *
897 * <b>Note:</b> This iterator will iterate over a list of <i>strings</i> not
898 * UURIs like the Scope seed iterator. The strings are equal to the URIs'
899 * getURIString() values.
900 * @return the seed iterator
901 * FIXME: Consider using TransformingIterator here
902 */
903 public Iterator<String> getSeeds() {
904 List<String> seedsCopy = new Vector<String>();
905 Iterator<UURI> i = controller.getScope().seedsIterator();
906 while (i.hasNext()) {
907 seedsCopy.add(i.next().toString());
908 }
909 return seedsCopy.iterator();
910 }
911
912 public Iterator<SeedRecord> getSeedRecordsSortedByStatusCode() {
913 return getSeedRecordsSortedByStatusCode(getSeeds());
914 }
915
916 protected Iterator<SeedRecord> getSeedRecordsSortedByStatusCode(
917 Iterator<String> i) {
918 TreeSet<SeedRecord> sortedSet =
919 new TreeSet<SeedRecord>(new Comparator<SeedRecord>() {
920 public int compare(SeedRecord sr1, SeedRecord sr2) {
921 int code1 = sr1.getStatusCode();
922 int code2 = sr2.getStatusCode();
923 if (code1 == code2) {
924
925 return sr1.getUri().compareTo(sr2.getUri());
926 }
927
928
929
930
931 code1 = -code1 - Integer.MAX_VALUE;
932 code2 = -code2 - Integer.MAX_VALUE;
933
934 return new Integer(code1).compareTo(new Integer(code2));
935 }
936 });
937 while (i.hasNext()) {
938 String seed = i.next();
939 SeedRecord sr = (SeedRecord) processedSeedsRecords.get(seed);
940 if(sr==null) {
941 sr = new SeedRecord(seed,SEED_DISPOSITION_NOT_PROCESSED);
942 }
943 sortedSet.add(sr);
944 }
945 return sortedSet.iterator();
946 }
947
948 public void crawlEnded(String message) {
949 logger.info("Entered crawlEnded");
950 this.sExitMessage = message;
951 super.crawlEnded(message);
952 logger.info("Leaving crawlEnded");
953 }
954
955 /***
956 * @param writer Where to write.
957 */
958 protected void writeSeedsReportTo(PrintWriter writer) {
959
960 writer.print("[code] [status] [seed] [redirect]\n");
961
962 seedsCrawled = 0;
963 seedsNotCrawled = 0;
964 for (Iterator<SeedRecord> i = getSeedRecordsSortedByStatusCode(getSeeds());
965 i.hasNext();) {
966 SeedRecord sr = i.next();
967 writer.print(sr.getStatusCode());
968 writer.print(" ");
969 if((sr.getStatusCode() > 0)) {
970 seedsCrawled++;
971 writer.print("CRAWLED");
972 } else {
973 seedsNotCrawled++;
974 writer.print("NOTCRAWLED");
975 }
976 writer.print(" ");
977 writer.print(sr.getUri());
978 if(sr.getRedirectUri()!=null) {
979 writer.print(" ");
980 writer.print(sr.getRedirectUri());
981 }
982 writer.print("\n");
983 }
984 }
985
986 protected void writeSourceReportTo(PrintWriter writer) {
987
988 writer.print("[source] [host] [#urls]\n");
989
990 for (String sourceKey: sourceHostDistribution.keySet()) {
991 Map<String,AtomicLong> hostCounts = sourceHostDistribution.get(sourceKey);
992
993 SortedMap<String,AtomicLong> sortedHostCounts = getReverseSortedHostCounts(hostCounts);
994
995 for (String hostKey: sortedHostCounts.keySet()) {
996 AtomicLong hostCount = hostCounts.get(hostKey);
997 writer.print(sourceKey.toString());
998 writer.print(" ");
999 writer.print(hostKey.toString());
1000 writer.print(" ");
1001 writer.print(hostCount.get());
1002 writer.print("\n");
1003 }
1004 }
1005 }
1006
1007 /***
1008 * Return a copy of the hosts distribution in reverse-sorted (largest first)
1009 * order.
1010 *
1011 * @return SortedMap of hosts distribution
1012 */
1013 public SortedMap<String,AtomicLong> getReverseSortedHostCounts(
1014 Map<String,AtomicLong> hostCounts) {
1015 return getReverseSortedCopy(hostCounts);
1016 }
1017
1018
1019 protected void writeHostsReportTo(final PrintWriter writer) {
1020
1021
1022 SortedMap<String,AtomicLong> hd = getReverseSortedHostsDistribution();
1023
1024 writer.print("[#urls] [#bytes] [host] [#robots] [#remaining] [#novel-urls] [#novel-bytes] [#dup-by-hash-urls] [#dup-by-hash-bytes] [#not-modified-urls] [#not-modified-bytes]\n");
1025 for (String key: hd.keySet()) {
1026
1027 CrawlHost host = controller.getServerCache().getHostFor(key);
1028 AtomicLong val = hd.get(key);
1029 writeReportLine(writer,
1030 val == null ? "-" : val.get(),
1031 getBytesPerHost(key),
1032 fixup(key),
1033 host.getSubstats().getRobotsDenials(),
1034 host.getSubstats().getRemaining(),
1035 host.getSubstats().getNovelUrls(),
1036 host.getSubstats().getNovelBytes(),
1037 host.getSubstats().getDupByHashUrls(),
1038 host.getSubstats().getDupByHashBytes(),
1039 host.getSubstats().getNotModifiedUrls(),
1040 host.getSubstats().getNotModifiedBytes());
1041 }
1042
1043
1044 Closure logZeros = new Closure() {
1045 public void execute(Object obj) {
1046 CrawlHost host = (CrawlHost)obj;
1047 if(host.getSubstats().getRecordedFinishes()==0) {
1048 writeReportLine(writer,
1049 host.getSubstats().getRecordedFinishes(),
1050 host.getSubstats().getTotalBytes(),
1051 fixup(host.getHostName()),
1052 host.getSubstats().getRobotsDenials(),
1053 host.getSubstats().getRemaining(),
1054 host.getSubstats().getNovelUrls(),
1055 host.getSubstats().getNovelBytes(),
1056 host.getSubstats().getDupByHashUrls(),
1057 host.getSubstats().getDupByHashBytes(),
1058 host.getSubstats().getNotModifiedUrls(),
1059 host.getSubstats().getNotModifiedBytes());
1060 }
1061 }};
1062 controller.getServerCache().forAllHostsDo(logZeros);
1063 }
1064
1065 protected String fixup(String hostName) {
1066 if ("dns:".equals(hostName)) {
1067 return hostName;
1068 } else {
1069 try {
1070 return URLEncoder.encode(hostName, "UTF-8");
1071 } catch (UnsupportedEncodingException e) {
1072 throw new RuntimeException(e);
1073 }
1074 }
1075 }
1076
1077 protected void writeReportLine(PrintWriter writer, Object ... fields) {
1078 for(Object field : fields) {
1079 writer.print(field);
1080 writer.print(" ");
1081 }
1082 writer.print("\n");
1083 }
1084
1085 /***
1086 * Return a copy of the hosts distribution in reverse-sorted
1087 * (largest first) order.
1088 * @return SortedMap of hosts distribution
1089 */
1090 public SortedMap<String,AtomicLong> getReverseSortedHostsDistribution() {
1091 return getReverseSortedCopy(hostsDistribution);
1092 }
1093
1094 protected void writeMimetypesReportTo(PrintWriter writer) {
1095
1096 writer.print("[#urls] [#bytes] [mime-types]\n");
1097 TreeMap<String,AtomicLong> fd = getReverseSortedCopy(getFileDistribution());
1098 for (String key: fd.keySet()) {
1099
1100 writer.print(Long.toString(fd.get(key).get()));
1101 writer.print(" ");
1102 writer.print(Long.toString(getBytesPerFileType(key)));
1103 writer.print(" ");
1104 writer.print(key);
1105 writer.print("\n");
1106 }
1107 }
1108
1109 protected void writeResponseCodeReportTo(PrintWriter writer) {
1110
1111 writer.print("[rescode] [#urls]\n");
1112 TreeMap<String,AtomicLong> scd = getReverseSortedCopy(getStatusCodeDistribution());
1113 for (String key: scd.keySet()) {
1114 writer.print(key);
1115 writer.print(" ");
1116 writer.print(Long.toString(scd.get(key).get()));
1117 writer.print("\n");
1118 }
1119 }
1120
1121 protected void writeCrawlReportTo(PrintWriter writer) {
1122 writer.print("Crawl Name: " + controller.getOrder().getCrawlOrderName());
1123 writer.print("\nCrawl Status: " + sExitMessage);
1124 writer.print("\nDuration Time: " +
1125 ArchiveUtils.formatMillisecondsToConventional(crawlDuration()));
1126 writer.print("\nTotal Seeds Crawled: " + seedsCrawled);
1127 writer.print("\nTotal Seeds not Crawled: " + seedsNotCrawled);
1128
1129 writer.print("\nTotal Hosts Crawled: " + (hostsDistribution.size()-1));
1130 writer.print("\nTotal Documents Crawled: " + finishedUriCount);
1131 writer.print("\nDocuments Crawled Successfully: " + downloadedUriCount);
1132 writer.print("\nNovel Documents Crawled: " + novelUriCount);
1133 if (dupByHashUriCount > 0)
1134 writer.print("\nDuplicate-by-hash Documents Crawled: " + dupByHashUriCount);
1135 if (notModifiedUriCount > 0)
1136 writer.print("\nNot-modified Documents Crawled: " + notModifiedUriCount);
1137 writer.print("\nProcessed docs/sec: " +
1138 ArchiveUtils.doubleToString(docsPerSecond,2));
1139 writer.print("\nBandwidth in Kbytes/sec: " + totalKBPerSec);
1140 writer.print("\nTotal Raw Data Size in Bytes: " + totalProcessedBytes +
1141 " (" + ArchiveUtils.formatBytesForDisplay(totalProcessedBytes) +
1142 ") \n");
1143 writer.print("Novel Bytes: "
1144 + crawledBytes.get(CrawledBytesHistotable.NOVEL)
1145 + " (" + ArchiveUtils.formatBytesForDisplay(
1146 crawledBytes.get(CrawledBytesHistotable.NOVEL))
1147 + ") \n");
1148 if(crawledBytes.containsKey(CrawledBytesHistotable.DUPLICATE)) {
1149 writer.print("Duplicate-by-hash Bytes: "
1150 + crawledBytes.get(CrawledBytesHistotable.DUPLICATE)
1151 + " (" + ArchiveUtils.formatBytesForDisplay(
1152 crawledBytes.get(CrawledBytesHistotable.DUPLICATE))
1153 + ") \n");
1154 }
1155 if(crawledBytes.containsKey(CrawledBytesHistotable.NOTMODIFIED)) {
1156 writer.print("Not-modified Bytes: "
1157 + crawledBytes.get(CrawledBytesHistotable.NOTMODIFIED)
1158 + " (" + ArchiveUtils.formatBytesForDisplay(
1159 crawledBytes.get(CrawledBytesHistotable.NOTMODIFIED))
1160 + ") \n");
1161 }
1162 }
1163
1164 protected void writeProcessorsReportTo(PrintWriter writer) {
1165 controller.reportTo(CrawlController.PROCESSORS_REPORT,writer);
1166 }
1167
1168 protected void writeReportFile(String reportName, String filename) {
1169 File f = new File(controller.getDisk().getPath(), filename);
1170 try {
1171 PrintWriter bw = new PrintWriter(
1172 new OutputStreamWriter(
1173 new FileOutputStream(f, false),
1174 "UTF-8"));
1175 writeReportTo(reportName, bw);
1176 bw.close();
1177 controller.addToManifest(f.getAbsolutePath(),
1178 CrawlController.MANIFEST_REPORT_FILE, true);
1179 } catch (IOException e) {
1180 logger.log(Level.SEVERE, "Unable to write " + f.getAbsolutePath() +
1181 " at the end of crawl.", e);
1182 }
1183 logger.info("wrote report: " + f.getAbsolutePath());
1184 }
1185
1186 /***
1187 * @param writer Where to write.
1188 */
1189 protected void writeManifestReportTo(PrintWriter writer) {
1190 controller.reportTo(CrawlController.MANIFEST_REPORT, writer);
1191 }
1192
1193 /***
1194 * @param reportName Name of report.
1195 * @param w Where to write.
1196 */
1197 private void writeReportTo(String reportName, PrintWriter w) {
1198 if("hosts".equals(reportName)) {
1199 writeHostsReportTo(w);
1200 } else if ("mime types".equals(reportName)) {
1201 writeMimetypesReportTo(w);
1202 } else if ("response codes".equals(reportName)) {
1203 writeResponseCodeReportTo(w);
1204 } else if ("seeds".equals(reportName)) {
1205 writeSeedsReportTo(w);
1206 } else if ("crawl".equals(reportName)) {
1207 writeCrawlReportTo(w);
1208 } else if ("processors".equals(reportName)) {
1209 writeProcessorsReportTo(w);
1210 } else if ("manifest".equals(reportName)) {
1211 writeManifestReportTo(w);
1212 } else if ("frontier".equals(reportName)) {
1213 writeFrontierReportTo(w);
1214 } else if ("source".equals(reportName)) {
1215 writeSourceReportTo(w);
1216 }
1217 }
1218
1219 /***
1220 * Write the Frontier's 'nonempty' report (if available)
1221 * @param writer to report to
1222 */
1223 protected void writeFrontierReportTo(PrintWriter writer) {
1224 if(controller.getFrontier().isEmpty()) {
1225 writer.println("frontier empty");
1226 } else {
1227 controller.getFrontier().reportTo("nonempty", writer);
1228 }
1229 }
1230
1231 /***
1232 * Run the reports.
1233 */
1234 public void dumpReports() {
1235
1236
1237 controller.addOrderToManifest();
1238 controller.installThreadContextSettingsHandler();
1239 writeReportFile("hosts","hosts-report.txt");
1240 writeReportFile("mime types","mimetype-report.txt");
1241 writeReportFile("response codes","responsecode-report.txt");
1242 writeReportFile("seeds","seeds-report.txt");
1243 writeReportFile("crawl","crawl-report.txt");
1244 writeReportFile("processors","processors-report.txt");
1245 writeReportFile("manifest","crawl-manifest.txt");
1246 writeReportFile("frontier","frontier-report.txt");
1247 if (sourceHostDistribution.size()>0) {
1248 writeReportFile("source","source-report.txt");
1249 }
1250
1251 }
1252
1253 public void crawlCheckpoint(File cpDir) throws Exception {
1254
1255 logNote("CRAWL CHECKPOINTING TO " + cpDir.toString());
1256 }
1257 }