View Javadoc

1   /* Copyright (C) 2009 Internet Archive 
2    *
3    * This file is part of the Heritrix web crawler (crawler.archive.org).
4    *
5    * Heritrix is free software; you can redistribute it and/or modify
6    * it under the terms of the GNU Lesser Public License as published by
7    * the Free Software Foundation; either version 2.1 of the License, or
8    * any later version.
9    *
10   * Heritrix is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Lesser Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser Public License
16   * along with Heritrix; if not, write to the Free Software
17   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   *
19   * Created on Jul 16, 2003
20   *
21   */
22  package org.archive.crawler.admin;
23  
24  import java.io.File;
25  import java.io.FileOutputStream;
26  import java.io.IOException;
27  import java.io.OutputStreamWriter;
28  import java.io.PrintWriter;
29  import java.io.Serializable;
30  import java.io.UnsupportedEncodingException;
31  import java.net.URLEncoder;
32  import java.util.Comparator;
33  import java.util.Date;
34  import java.util.EventObject;
35  import java.util.HashMap;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.SortedMap;
40  import java.util.TreeMap;
41  import java.util.TreeSet;
42  import java.util.Vector;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ConcurrentMap;
45  import java.util.concurrent.atomic.AtomicLong;
46  import java.util.logging.Level;
47  import java.util.logging.Logger;
48  
49  import org.apache.commons.collections.Closure;
50  import org.apache.commons.httpclient.HttpStatus;
51  import org.archive.crawler.datamodel.CrawlHost;
52  import org.archive.crawler.datamodel.CrawlURI;
53  import org.archive.crawler.datamodel.FetchStatusCodes;
54  import org.archive.crawler.deciderules.recrawl.IdenticalDigestDecideRule;
55  import org.archive.crawler.event.CrawlURIDispositionListener;
56  import org.archive.crawler.framework.AbstractTracker;
57  import org.archive.crawler.framework.CrawlController;
58  import org.archive.crawler.framework.exceptions.FatalConfigurationException;
59  import org.archive.crawler.util.CrawledBytesHistotable;
60  import org.archive.net.UURI;
61  import org.archive.util.ArchiveUtils;
62  import org.archive.util.MimetypeUtils;
63  import org.archive.util.ObjectIdentityCache;
64  import org.archive.util.PaddingStringBuffer;
65  import org.archive.util.Supplier;
66  
67  /***
68   * This is an implementation of the AbstractTracker. It is designed to function
69   * with the WUI as well as performing various logging activity.
70   * <p>
71   * At the end of each snapshot a line is written to the
72   * 'progress-statistics.log' file.
73   * <p>
74   * The header of that file is as follows:
75   * <pre> [timestamp] [discovered]    [queued] [downloaded] [doc/s(avg)]  [KB/s(avg)] [dl-failures] [busy-thread] [mem-use-KB]</pre>
76   * First there is a <b>timestamp</b>, accurate down to 1 second.
77   * <p>
78   * <b>discovered</b>, <b>queued</b>, <b>downloaded</b> and <b>dl-failures</b>
79   * are (respectively) the discovered URI count, pending URI count, successfully
80   * fetched count and failed fetch count from the frontier at the time of the
81   * snapshot.
82   * <p>
83   * <b>KB/s(avg)</b> is the bandwidth usage.  We use the total bytes downloaded
84   * to calculate average bandwidth usage (KB/sec). Since we also note the value
85   * each time a snapshot is made we can calculate the average bandwidth usage
86   * during the last snapshot period to gain a "current" rate. The first number is
87   * the current and the average is in parenthesis.
88   * <p>
89   * <b>doc/s(avg)</b> works the same way as doc/s except it show the number of
90   * documents (URIs) rather then KB downloaded.
91   * <p>
92   * <b>busy-threads</b> is the total number of ToeThreads that are not available
93   * (and thus presumably busy processing a URI). This information is extracted
94   * from the crawl controller.
95   * <p>
96   * Finally mem-use-KB is extracted from the run time environment
97   * (<code>Runtime.getRuntime().totalMemory()</code>).
98   * <p>
99   * In addition to the data collected for the above logs, various other data
100  * is gathered and stored by this tracker.
101  * <ul>
102  *   <li> Successfully downloaded documents per fetch status code
103  *   <li> Successfully downloaded documents per document mime type
104  *   <li> Amount of data per mime type
105  *   <li> Successfully downloaded documents per host
106  *   <li> Amount of data per host
107  *   <li> Disposition of all seeds (this is written to 'reports.log' at end of
108  *        crawl)
109  *   <li> Successfully downloaded documents per host per source
110  * </ul>
111  *
112  * @author Parker Thompson
113  * @author Kristinn Sigurdsson
114  * 
115  * @see org.archive.crawler.framework.StatisticsTracking
116  * @see org.archive.crawler.framework.AbstractTracker
117  */
118 public class StatisticsTracker extends AbstractTracker
119 implements CrawlURIDispositionListener, Serializable {
120     private static final long serialVersionUID = 8004878315916392305L;
121 
122     /***
123      * Messages from the StatisticsTracker.
124      */
125     private final static Logger logger =
126         Logger.getLogger(StatisticsTracker.class.getName());
127     
128     // TODO: Need to be able to specify file where the object will be
129     // written once the CrawlEnded event occurs
130 
131     protected long lastPagesFetchedCount = 0;
132     protected long lastProcessedBytesCount = 0;
133 
134     /*
135      * Snapshot data.
136      */
137     protected long discoveredUriCount = 0;
138     protected long queuedUriCount = 0;
139     protected long finishedUriCount = 0;
140 
141     protected long downloadedUriCount = 0;
142     protected long downloadFailures = 0;
143     protected long downloadDisregards = 0;
144     protected double docsPerSecond = 0;
145     protected double currentDocsPerSecond = 0;
146     protected int currentKBPerSec = 0;
147     protected long totalKBPerSec = 0;
148     protected int busyThreads = 0;
149     protected long totalProcessedBytes = 0;
150     protected float congestionRatio = 0; 
151     protected long deepestUri;
152     protected long averageDepth;
153     
154     /*
155      * Cumulative data
156      */
157     /*** tally sizes novel, verified (same hash), vouched (not-modified) */ 
158     protected CrawledBytesHistotable crawledBytes = new CrawledBytesHistotable();
159 
160     protected long notModifiedUriCount = 0;
161     protected long dupByHashUriCount = 0;
162     protected long novelUriCount = 0;
163     
164     /*** Keep track of the file types we see (mime type -> count) */
165     protected ConcurrentMap<String,AtomicLong> mimeTypeDistribution
166      = new ConcurrentHashMap<String,AtomicLong>();
167     protected ConcurrentMap<String,AtomicLong> mimeTypeBytes
168      = new ConcurrentHashMap<String,AtomicLong>();
169     
170     /*** Keep track of fetch status codes */
171     protected ConcurrentMap<String,AtomicLong> statusCodeDistribution
172      = new ConcurrentHashMap<String,AtomicLong>();
173     
174     /*** reusable Supplier for initial zero AtomicLong instances */
175     private static final Supplier<AtomicLong> ATOMIC_ZERO_SUPPLIER = 
176         new Supplier<AtomicLong>() {
177             public AtomicLong get() {
178                 return new AtomicLong(0); 
179             }};
180 
181     /*** Keep track of hosts. 
182      * 
183      * <p>They're transient because usually bigmaps that get reconstituted
184      * on recover from checkpoint.
185      */
186     protected transient ObjectIdentityCache<String,AtomicLong> hostsDistribution = null;
187     protected transient ObjectIdentityCache<String,AtomicLong> hostsBytes = null;
188     protected transient ObjectIdentityCache<String,AtomicLong> hostsLastFinished = null;
189 
190     /*** Keep track of URL counts per host per seed */
191     protected transient 
192         ObjectIdentityCache<String,ConcurrentMap<String,AtomicLong>> sourceHostDistribution = null;
193 
194     /***
195      * Record of seeds' latest actions.
196      */
197     protected transient ObjectIdentityCache<String,SeedRecord> processedSeedsRecords;
198 
199     // seeds tallies: ONLY UPDATED WHEN SEED REPORT WRITTEN
200     private int seedsCrawled;
201     private int seedsNotCrawled;
202     // sExitMessage: only set at crawl-end
203     private String sExitMessage = "Before crawl end";
204 
205 
206     public StatisticsTracker(String name) {
207         super( name, "A statistics tracker thats integrated into " +
208             "the web UI and that creates the progress-statistics log.");
209     }
210 
211     public void initialize(CrawlController c)
212     throws FatalConfigurationException {
213         super.initialize(c);
214         try {
215             this.sourceHostDistribution = c.getBigMap("sourceHostDistribution",
216             	ConcurrentMap.class);
217             this.hostsDistribution = c.getBigMap("hostsDistribution",
218                 AtomicLong.class);
219             this.hostsBytes = c.getBigMap("hostsBytes", AtomicLong.class);
220             this.hostsLastFinished = c.getBigMap("hostsLastFinished",
221                 AtomicLong.class);
222             this.processedSeedsRecords = c.getBigMap("processedSeedsRecords",
223                 SeedRecord.class);
224         } catch (Exception e) {
225             throw new FatalConfigurationException("Failed setup of" +
226                 " StatisticsTracker: " + e);
227         }
228         controller.addCrawlURIDispositionListener(this);
229     }
230 
231     protected void finalCleanup() {
232         super.finalCleanup();
233         if (this.hostsBytes != null) {
234             this.hostsBytes.close();
235             this.hostsBytes = null;
236         }
237         if (this.hostsDistribution != null) {
238             this.hostsDistribution.close();
239             this.hostsDistribution = null;
240         }
241         if (this.hostsLastFinished != null) {
242             this.hostsLastFinished.close();
243             this.hostsLastFinished = null;
244         }
245         if (this.processedSeedsRecords != null) {
246             this.processedSeedsRecords.close();
247             this.processedSeedsRecords = null;
248         }
249         if (this.sourceHostDistribution != null) {
250             this.sourceHostDistribution.close();
251             this.sourceHostDistribution = null;
252         }
253 
254     }
255 
256     protected synchronized void progressStatisticsEvent(final EventObject e) {
257         // This method loads "snapshot" data.
258         discoveredUriCount = discoveredUriCount();
259         downloadedUriCount = successfullyFetchedCount();
260         finishedUriCount = finishedUriCount();
261         queuedUriCount = queuedUriCount();
262         downloadFailures = failedFetchAttempts();
263         downloadDisregards = disregardedFetchAttempts();
264         totalProcessedBytes = totalBytesCrawled();
265         congestionRatio = congestionRatio();
266         deepestUri = deepestUri();
267         averageDepth = averageDepth();
268         
269         if (finishedUriCount() == 0) {
270             docsPerSecond = 0;
271             totalKBPerSec = 0;
272         } else if (getCrawlerTotalElapsedTime() < 1000) {
273             return; // Not enough time has passed for a decent snapshot.
274         } else {
275             docsPerSecond = (double) downloadedUriCount /
276                 (double)(getCrawlerTotalElapsedTime() / 1000);
277             // Round to nearest long.
278             totalKBPerSec = (long)(((totalProcessedBytes / 1024) /
279                  ((getCrawlerTotalElapsedTime()) / 1000)) + .5 );
280         }
281 
282         busyThreads = activeThreadCount();
283 
284         if(shouldrun ||
285             (System.currentTimeMillis() - lastLogPointTime) >= 1000) {
286             // If shouldrun is false there is a chance that the time interval
287             // since last time is too small for a good sample.  We only want
288             // to update "current" data when the interval is long enough or
289             // shouldrun is true.
290             currentDocsPerSecond = 0;
291             currentKBPerSec = 0;
292 
293             // Note time.
294             long currentTime = System.currentTimeMillis();
295             long sampleTime = currentTime - lastLogPointTime;
296 
297             // if we haven't done anyting or there isn't a reasonable sample
298             // size give up.
299             if (sampleTime >= 1000) {
300                 // Update docs/sec snapshot
301                 long currentPageCount = successfullyFetchedCount();
302                 long samplePageCount = currentPageCount - lastPagesFetchedCount;
303 
304                 currentDocsPerSecond =
305                     (double) samplePageCount / (double)(sampleTime / 1000);
306 
307                 lastPagesFetchedCount = currentPageCount;
308 
309                 // Update kbytes/sec snapshot
310                 long currentProcessedBytes = totalProcessedBytes;
311                 long sampleProcessedBytes =
312                     currentProcessedBytes - lastProcessedBytesCount;
313 
314                 currentKBPerSec =
315                     (int)(((sampleProcessedBytes/1024)/(sampleTime/1000)) + .5);
316 
317                 lastProcessedBytesCount = currentProcessedBytes;
318             }
319         }
320 
321         if (this.controller != null) {
322             this.controller.logProgressStatistics(getProgressStatisticsLine());
323         }
324         lastLogPointTime = System.currentTimeMillis();
325         super.progressStatisticsEvent(e);
326     }
327 
328     /***
329      * Return one line of current progress-statistics
330      * 
331      * @param now
332      * @return String of stats
333      */
334     public String getProgressStatisticsLine(Date now) {
335         return new PaddingStringBuffer()
336             .append(ArchiveUtils.getLog14Date(now))
337             .raAppend(32, discoveredUriCount)
338             .raAppend(44, queuedUriCount)
339             .raAppend(57, downloadedUriCount)
340             .raAppend(74, ArchiveUtils.
341                 doubleToString(currentDocsPerSecond, 2) +
342                 "(" + ArchiveUtils.doubleToString(docsPerSecond, 2) + ")")
343             .raAppend(85, currentKBPerSec + "(" + totalKBPerSec + ")")
344             .raAppend(99, downloadFailures)
345             .raAppend(113, busyThreads)
346             .raAppend(126, (Runtime.getRuntime().totalMemory() -
347                 Runtime.getRuntime().freeMemory()) / 1024)
348             .raAppend(140, Runtime.getRuntime().totalMemory() / 1024)
349             .raAppend(153, ArchiveUtils.doubleToString(congestionRatio, 2))
350             .raAppend(165, deepestUri)
351             .raAppend(177, averageDepth)
352             .toString();
353     }
354     
355     public Map<String,Number> getProgressStatistics() {
356         Map<String,Number> stats = new HashMap<String,Number>();
357         stats.put("discoveredUriCount", new Long(discoveredUriCount));
358         stats.put("queuedUriCount", new Long(queuedUriCount));
359         stats.put("downloadedUriCount", new Long(downloadedUriCount));
360         stats.put("currentDocsPerSecond", new Double(currentDocsPerSecond));
361         stats.put("docsPerSecond", new Double(docsPerSecond));
362         stats.put("totalKBPerSec", new Long(totalKBPerSec));
363         stats.put("totalProcessedBytes", new Long(totalProcessedBytes));
364         stats.put("currentKBPerSec", new Long(currentKBPerSec));
365         stats.put("downloadFailures", new Long(downloadFailures));
366         stats.put("busyThreads", new Integer(busyThreads));
367         stats.put("congestionRatio", new Double(congestionRatio));
368         stats.put("deepestUri", new Long(deepestUri));
369         stats.put("averageDepth", new Long(averageDepth));
370         stats.put("totalMemory", new Long(Runtime.getRuntime().totalMemory()));
371         stats.put("freeMemory", new Long(Runtime.getRuntime().freeMemory()));
372         return stats;
373     }
374 
375     /***
376      * Return one line of current progress-statistics
377      * 
378      * @return String of stats
379      */
380     public String getProgressStatisticsLine() {
381         return getProgressStatisticsLine(new Date());
382     }
383     
384     public double processedDocsPerSec(){
385         return docsPerSecond;
386     }
387 
388     public double currentProcessedDocsPerSec(){
389         return currentDocsPerSecond;
390     }
391 
392     public long processedKBPerSec(){
393         return totalKBPerSec;
394     }
395 
396     public int currentProcessedKBPerSec(){
397         return currentKBPerSec;
398     }
399 
400     /*** Returns a HashMap that contains information about distributions of
401      *  encountered mime types.  Key/value pairs represent
402      *  mime type -> count.
403      * <p>
404      * <b>Note:</b> All the values are wrapped with a {@link AtomicLong AtomicLong}
405      * @return mimeTypeDistribution
406      */
407     public Map<String,AtomicLong> getFileDistribution() {
408         return mimeTypeDistribution;
409     }
410 
411 
412     /***
413      * Increment a counter for a key in a given HashMap. Used for various
414      * aggregate data.
415      * 
416      * As this is used to change Maps which depend on StatisticsTracker
417      * for their synchronization, this method should only be invoked
418      * from a a block synchronized on 'this'. 
419      *
420      * @param map The HashMap
421      * @param key The key for the counter to be incremented, if it does not
422      *               exist it will be added (set to 1).  If null it will
423      *            increment the counter "unknown".
424      */
425     protected static void incrementMapCount(ConcurrentMap<String,AtomicLong> map, 
426             String key) {
427     	incrementMapCount(map,key,1);
428     }
429 
430     /***
431      * Increment a counter for a key in a given cache. Used for various
432      * aggregate data.
433      * 
434      * @param cache the ObjectIdentityCache
435      * @param key The key for the counter to be incremented, if it does not
436      *               exist it will be added (set to 1).  If null it will
437      *            increment the counter "unknown".
438      */
439     protected static void incrementCacheCount(ObjectIdentityCache<String,AtomicLong> cache, 
440             String key) {
441         incrementCacheCount(cache,key,1);
442     }
443     
444     /***
445      * Increment a counter for a key in a given cache by an arbitrary amount.
446      * Used for various aggregate data. The increment amount can be negative.
447      *
448      *
449      * @param cache
450      *            The ObjectIdentityCache
451      * @param key
452      *            The key for the counter to be incremented, if it does not exist
453      *            it will be added (set to equal to <code>increment</code>).
454      *            If null it will increment the counter "unknown".
455      * @param increment
456      *            The amount to increment counter related to the <code>key</code>.
457      */
458     protected static void incrementCacheCount(ObjectIdentityCache<String,AtomicLong> cache, 
459             String key, long increment) {
460         if (key == null) {
461             key = "unknown";
462         }
463         AtomicLong lw = cache.getOrUse(key, ATOMIC_ZERO_SUPPLIER);
464         lw.addAndGet(increment);
465     }
466     
467     /***
468      * Increment a counter for a key in a given HashMap by an arbitrary amount.
469      * Used for various aggregate data. The increment amount can be negative.
470      *
471      * @param map
472      *            The Map or ConcurrentMap
473      * @param key
474      *            The key for the counter to be incremented, if it does not exist
475      *            it will be added (set to equal to <code>increment</code>).
476      *            If null it will increment the counter "unknown".
477      * @param increment
478      *            The amount to increment counter related to the <code>key</code>.
479      */
480     protected static void incrementMapCount(ConcurrentMap<String,AtomicLong> map, 
481             String key, long increment) {
482         if (key == null) {
483             key = "unknown";
484         }
485         AtomicLong lw = map.get(key);
486         if(lw == null) {
487             lw = new AtomicLong();
488             AtomicLong prevVal = map.putIfAbsent(key, lw);
489             if(prevVal != null) {
490                 lw = prevVal;
491             }
492         } 
493         lw.addAndGet(increment);
494     }
495 
496     /***
497      * Sort the entries of the given HashMap in descending order by their
498      * values, which must be longs wrapped with <code>AtomicLong</code>.
499      * <p>
500      * Elements are sorted by value from largest to smallest. Equal values are
501      * sorted in an arbitrary, but consistent manner by their keys. Only items
502      * with identical value and key are considered equal.
503      *
504      * If the passed-in map requires access to be synchronized, the caller
505      * should ensure this synchronization. 
506      * 
507      * @param mapOfAtomicLongValues
508      *            Assumes values are wrapped with AtomicLong.
509      * @return a sorted set containing the same elements as the map.
510      */
511     public TreeMap<String,AtomicLong> getReverseSortedCopy(
512             final Map<String,AtomicLong> mapOfAtomicLongValues) {
513         TreeMap<String,AtomicLong> sortedMap = 
514           new TreeMap<String,AtomicLong>(new Comparator<String>() {
515             public int compare(String e1, String e2) {
516                 long firstVal = mapOfAtomicLongValues.get(e1).get();
517                 long secondVal = mapOfAtomicLongValues.get(e2).get();
518                 if (firstVal < secondVal) {
519                     return 1;
520                 }
521                 if (secondVal < firstVal) {
522                     return -1;
523                 }
524                 // If the values are the same, sort by keys.
525                 return e1.compareTo(e2);
526             }
527         });
528         try {
529             sortedMap.putAll(mapOfAtomicLongValues);
530         } catch (UnsupportedOperationException e) {
531             for (String key: mapOfAtomicLongValues.keySet()) {
532                 sortedMap.put(key, mapOfAtomicLongValues.get(key));
533             }
534         }
535         return sortedMap;
536     }
537     
538     /***
539      * Sort the entries of the given ObjectIdentityCache in descending order by their
540      * values, which must be longs wrapped with <code>AtomicLong</code>.
541      * <p>
542      * Elements are sorted by value from largest to smallest. Equal values are
543      * sorted in an arbitrary, but consistent manner by their keys. Only items
544      * with identical value and key are considered equal.
545      *
546      * If the passed-in map requires access to be synchronized, the caller
547      * should ensure this synchronization. 
548      * 
549      * @param mapOfAtomicLongValues
550      *            Assumes values are wrapped with AtomicLong.
551      * @return a sorted set containing the same elements as the map.
552      */
553     public TreeMap<String,AtomicLong> getReverseSortedCopy(
554             final ObjectIdentityCache<String,AtomicLong> mapOfAtomicLongValues) {
555         TreeMap<String,AtomicLong> sortedMap = 
556           new TreeMap<String,AtomicLong>(new Comparator<String>() {
557             public int compare(String e1, String e2) {
558                 long firstVal = mapOfAtomicLongValues.get(e1).get();
559                 long secondVal = mapOfAtomicLongValues.get(e2).get();
560                 if (firstVal < secondVal) {
561                     return 1;
562                 }
563                 if (secondVal < firstVal) {
564                     return -1;
565                 }
566                 // If the values are the same, sort by keys.
567                 return e1.compareTo(e2);
568             }
569         });
570         for (String key: mapOfAtomicLongValues.keySet()) {
571             sortedMap.put(key, mapOfAtomicLongValues.get(key));
572         }
573         return sortedMap;
574     }
575 
576     /***
577      * Return a HashMap representing the distribution of status codes for
578      * successfully fetched curis, as represented by a hashmap where key -&gt;
579      * val represents (string)code -&gt; (integer)count.
580      * 
581      * <b>Note: </b> All the values are wrapped with a
582      * {@link AtomicLong AtomicLong}
583      * 
584      * @return statusCodeDistribution
585      */
586     public Map<String,AtomicLong> getStatusCodeDistribution() {
587         return statusCodeDistribution;
588     }
589     
590     /***
591      * Returns the time (in millisec) when a URI belonging to a given host was
592      * last finished processing. 
593      * 
594      * @param host The host to look up time of last completed URI.
595      * @return Returns the time (in millisec) when a URI belonging to a given 
596      * host was last finished processing. If no URI has been completed for host
597      * -1 will be returned. 
598      */
599     public AtomicLong getHostLastFinished(String host){
600         AtomicLong fini = hostsLastFinished.getOrUse(host, ATOMIC_ZERO_SUPPLIER);
601         return fini;
602     }
603 
604     /***
605      * Returns the accumulated number of bytes downloaded from a given host.
606      * @param host name of the host
607      * @return the accumulated number of bytes downloaded from a given host
608      */
609     public long getBytesPerHost(String host){
610         return ((AtomicLong)hostsBytes.get(host)).get();
611     }
612 
613     /***
614      * Returns the accumulated number of bytes from files of a given file type.
615      * @param filetype Filetype to check.
616      * @return the accumulated number of bytes from files of a given mime type
617      */
618     public long getBytesPerFileType(String filetype){
619         return ((AtomicLong)mimeTypeBytes.get(filetype)).get();
620     }
621 
622     /***
623      * Get the total number of ToeThreads (sleeping and active)
624      *
625      * @return The total number of ToeThreads
626      */
627     public int threadCount() {
628         return this.controller != null? controller.getToeCount(): 0;
629     }
630 
631     /***
632      * @return Current thread count (or zero if can't figure it out).
633      */ 
634     public int activeThreadCount() {
635         return this.controller != null? controller.getActiveToeCount(): 0;
636         // note: reuse of old busy value seemed misleading: anyone asking
637         // for thread count when paused or stopped still wants accurate reading
638     }
639 
640     /***
641      * This returns the number of completed URIs as a percentage of the total
642      * number of URIs encountered (should be inverse to the discovery curve)
643      *
644      * @return The number of completed URIs as a percentage of the total
645      * number of URIs encountered
646      */
647     public int percentOfDiscoveredUrisCompleted() {
648         long completed = finishedUriCount();
649         long total = discoveredUriCount();
650 
651         if (total == 0) {
652             return 0;
653         }
654 
655         return (int) (100 * completed / total);
656     }
657 
658     /***
659      * Number of <i>discovered</i> URIs.
660      *
661      * <p>If crawl not running (paused or stopped) this will return the value of
662      * the last snapshot.
663      *
664      * @return A count of all uris encountered
665      *
666      * @see org.archive.crawler.framework.Frontier#discoveredUriCount()
667      */
668     public long discoveredUriCount() {
669         // While shouldrun is true we can use info direct from the crawler.
670         // After that our last snapshot will have to do.
671         return shouldrun && this.controller != null &&
672                 this.controller.getFrontier() != null?
673             controller.getFrontier().discoveredUriCount() : discoveredUriCount;
674     }
675 
676     /***
677      * Number of URIs that have <i>finished</i> processing.
678      *
679      * @return Number of URIs that have finished processing
680      *
681      * @see org.archive.crawler.framework.Frontier#finishedUriCount()
682      */
683     public long finishedUriCount() {
684         return shouldrun && this.controller != null &&
685                 this.controller.getFrontier() != null ?
686             controller.getFrontier().finishedUriCount() : finishedUriCount;
687     }
688 
689     /***
690      * Get the total number of failed fetch attempts (connection failures -> give up, etc)
691      *
692      * @return The total number of failed fetch attempts
693      */
694     public long failedFetchAttempts() {
695         // While shouldrun is true we can use info direct from the crawler.
696         // After that our last snapshot will have to do.
697         return shouldrun && this.controller != null &&
698                 this.controller.getFrontier() != null ?
699             controller.getFrontier().failedFetchCount() : downloadFailures;
700     }
701 
702     /***
703      * Get the total number of failed fetch attempts (connection failures -> give up, etc)
704      *
705      * @return The total number of failed fetch attempts
706      */
707     public long disregardedFetchAttempts() {
708         // While shouldrun is true we can use info direct from the crawler.
709         // After that our last snapshot will have to do.
710         return shouldrun && this.controller != null &&
711                 this.controller.getFrontier() != null?
712             controller.getFrontier().disregardedUriCount() : downloadDisregards;
713     }
714 
715     public long successfullyFetchedCount() {
716         // While shouldrun is true we can use info direct from the crawler.
717         // After that our last snapshot will have to do.
718         return shouldrun && this.controller != null &&
719                 this.controller.getFrontier() != null?
720             controller.getFrontier().succeededFetchCount() : downloadedUriCount;
721     }
722     
723     public long totalCount() {
724         return queuedUriCount() + activeThreadCount() +
725             successfullyFetchedCount();
726     }
727 
728     /***
729      * Ratio of number of threads that would theoretically allow
730      * maximum crawl progress (if each was as productive as current
731      * threads), to current number of threads.
732      * 
733      * @return float congestion ratio 
734      */
735     public float congestionRatio() {
736         // While shouldrun is true we can use info direct from the crawler.
737         // After that our last snapshot will have to do.
738         return shouldrun && this.controller != null &&
739                 this.controller.getFrontier() != null ?
740             controller.getFrontier().congestionRatio() : congestionRatio;
741     }
742     
743     /***
744      * Ordinal position of the 'deepest' URI eligible 
745      * for crawling. Essentially, the length of the longest
746      * frontier internal queue. 
747      * 
748      * @return long URI count to deepest URI
749      */
750     public long deepestUri() {
751         // While shouldrun is true we can use info direct from the crawler.
752         // After that our last snapshot will have to do.
753         return shouldrun && this.controller != null &&
754                 this.controller.getFrontier() != null ?
755             controller.getFrontier().deepestUri() : deepestUri;
756     }
757     
758     /***
759      * Average depth of the last URI in all eligible queues.
760      * That is, the average length of all eligible queues.
761      * 
762      * @return long average depth of last URIs in queues 
763      */
764     public long averageDepth() {
765         // While shouldrun is true we can use info direct from the crawler.
766         // After that our last snapshot will have to do.
767         return shouldrun && this.controller != null &&
768                 this.controller.getFrontier() != null ?
769             controller.getFrontier().averageDepth() : averageDepth;
770     }
771     
772     /***
773      * Number of URIs <i>queued</i> up and waiting for processing.
774      *
775      * <p>If crawl not running (paused or stopped) this will return the value
776      * of the last snapshot.
777      *
778      * @return Number of URIs queued up and waiting for processing.
779      *
780      * @see org.archive.crawler.framework.Frontier#queuedUriCount()
781      */
782     public long queuedUriCount() {
783         // While shouldrun is true we can use info direct from the crawler.
784         // After that our last snapshot will have to do.
785         return shouldrun && this.controller != null &&
786                 this.controller.getFrontier() != null?
787             controller.getFrontier().queuedUriCount() : queuedUriCount;
788     }
789 
790     /*** @deprecated use totalBytesCrawled */ 
791     public long totalBytesWritten() {
792         // return totalBytesCrawled(); 
793         return shouldrun && this.controller != null &&
794                 this.controller.getFrontier() != null?
795             controller.getFrontier().totalBytesWritten() : totalProcessedBytes;
796     }
797     
798     public long totalBytesCrawled() {
799         return shouldrun ?
800             crawledBytes.getTotal() : totalProcessedBytes;
801     }
802     
803     public String crawledBytesSummary() {
804         return crawledBytes.summary();
805     }
806 
807     /***
808      * If the curi is a seed, we insert into the processedSeedsRecords map.
809      *
810      * @param curi The CrawlURI that may be a seed.
811      * @param disposition The dispositino of the CrawlURI.
812      */
813     private void handleSeed(final CrawlURI curi, final String disposition) {
814         if(curi.isSeed()){
815             SeedRecord sr = processedSeedsRecords.getOrUse(
816                     curi.toString(),
817                     new Supplier<SeedRecord>() {
818                         public SeedRecord get() {
819                             return new SeedRecord(curi, disposition);
820                         }});
821             sr.updateWith(curi,disposition); 
822         }
823     }
824 
825     public void crawledURISuccessful(CrawlURI curi) {
826         handleSeed(curi,SEED_DISPOSITION_SUCCESS);
827         // save crawled bytes tally
828         crawledBytes.accumulate(curi);
829 
830         // save crawled docs tally
831         if(curi.getFetchStatus()==HttpStatus.SC_NOT_MODIFIED) {
832             notModifiedUriCount++;
833         } else if (IdenticalDigestDecideRule.hasIdenticalDigest(curi)) {
834             dupByHashUriCount++;
835         } else {
836             novelUriCount++;
837         }
838         
839         // Save status codes
840         incrementMapCount(statusCodeDistribution,
841             Integer.toString(curi.getFetchStatus()));
842 
843         // Save mime types
844         String mime = MimetypeUtils.truncate(curi.getContentType());
845         incrementMapCount(mimeTypeDistribution, mime);
846         incrementMapCount(mimeTypeBytes, mime, curi.getContentSize());
847 
848         // Save hosts stats.
849         saveHostStats(curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS ? "dns:" :
850                 this.controller.getServerCache().getHostFor(curi).getHostName(),
851                 curi.getContentSize());
852         
853         if (curi.containsKey(CrawlURI.A_SOURCE_TAG)){
854             saveSourceStats(curi.getString(CrawlURI.A_SOURCE_TAG), 
855                     this.controller.getServerCache().getHostFor(curi).
856                     getHostName()); 
857         }
858     }
859          
860     protected void saveSourceStats(String source, String hostname) {
861         synchronized(sourceHostDistribution) {
862             ConcurrentMap<String,AtomicLong> hostUriCount = 
863                 sourceHostDistribution.getOrUse(
864                         source,
865                         new Supplier<ConcurrentMap<String,AtomicLong>>() {
866                             public ConcurrentMap<String, AtomicLong> get() {
867                                 return new ConcurrentHashMap<String,AtomicLong>();
868                             }});
869             incrementMapCount(hostUriCount, hostname);
870         }
871     }
872     
873     protected void saveHostStats(String hostname, long size) {
874         incrementCacheCount(hostsDistribution, hostname);
875 
876         incrementCacheCount(hostsBytes, hostname, size);
877         
878         long time = new Long(System.currentTimeMillis());
879         getHostLastFinished(hostname).set(time); 
880     }
881 
882     public void crawledURINeedRetry(CrawlURI curi) {
883         handleSeed(curi,SEED_DISPOSITION_RETRY);
884     }
885 
886     public void crawledURIDisregard(CrawlURI curi) {
887         handleSeed(curi,SEED_DISPOSITION_DISREGARD);
888     }
889 
890     public void crawledURIFailure(CrawlURI curi) {
891         handleSeed(curi,SEED_DISPOSITION_FAILURE);
892     }
893 
894     /***
895      * Get a seed iterator for the job being monitored. 
896      * 
897      * <b>Note:</b> This iterator will iterate over a list of <i>strings</i> not
898      * UURIs like the Scope seed iterator. The strings are equal to the URIs'
899      * getURIString() values.
900      * @return the seed iterator
901      * FIXME: Consider using TransformingIterator here
902      */
903     public Iterator<String> getSeeds() {
904         List<String> seedsCopy = new Vector<String>();
905         Iterator<UURI> i = controller.getScope().seedsIterator();
906         while (i.hasNext()) {
907             seedsCopy.add(i.next().toString());
908         }
909         return seedsCopy.iterator();
910     }
911 
912     public Iterator<SeedRecord> getSeedRecordsSortedByStatusCode() {
913         return getSeedRecordsSortedByStatusCode(getSeeds());
914     }
915     
916     protected Iterator<SeedRecord> getSeedRecordsSortedByStatusCode(
917             Iterator<String> i) {
918         TreeSet<SeedRecord> sortedSet = 
919           new TreeSet<SeedRecord>(new Comparator<SeedRecord>() {
920             public int compare(SeedRecord sr1, SeedRecord sr2) {
921                 int code1 = sr1.getStatusCode();
922                 int code2 = sr2.getStatusCode();
923                 if (code1 == code2) {
924                     // If the values are equal, sort by URIs.
925                     return sr1.getUri().compareTo(sr2.getUri());
926                 }
927                 // mirror and shift the nubmer line so as to
928                 // place zero at the beginning, then all negatives 
929                 // in order of ascending absolute value, then all 
930                 // positives descending
931                 code1 = -code1 - Integer.MAX_VALUE;
932                 code2 = -code2 - Integer.MAX_VALUE;
933                 
934                 return new Integer(code1).compareTo(new Integer(code2));
935             }
936         });
937         while (i.hasNext()) {
938             String seed = i.next();
939             SeedRecord sr = (SeedRecord) processedSeedsRecords.get(seed);
940             if(sr==null) {
941                 sr = new SeedRecord(seed,SEED_DISPOSITION_NOT_PROCESSED);
942             }
943             sortedSet.add(sr);
944         }
945         return sortedSet.iterator();
946     }
947 
948     public void crawlEnded(String message) {
949         logger.info("Entered crawlEnded");
950         this.sExitMessage = message; // held for reference by reports
951         super.crawlEnded(message);
952         logger.info("Leaving crawlEnded");
953     }
954     
955     /***
956      * @param writer Where to write.
957      */
958     protected void writeSeedsReportTo(PrintWriter writer) {
959         // Build header.
960         writer.print("[code] [status] [seed] [redirect]\n");
961 
962         seedsCrawled = 0;
963         seedsNotCrawled = 0;
964         for (Iterator<SeedRecord> i = getSeedRecordsSortedByStatusCode(getSeeds());
965                 i.hasNext();) {
966             SeedRecord sr = i.next();
967             writer.print(sr.getStatusCode());
968             writer.print(" ");
969             if((sr.getStatusCode() > 0)) {
970                 seedsCrawled++;
971                 writer.print("CRAWLED");
972             } else {
973                 seedsNotCrawled++;
974                 writer.print("NOTCRAWLED");
975             }
976             writer.print(" ");
977             writer.print(sr.getUri());
978             if(sr.getRedirectUri()!=null) {
979                 writer.print(" ");
980                 writer.print(sr.getRedirectUri());
981             }
982             writer.print("\n");
983         }
984     }
985     
986     protected void writeSourceReportTo(PrintWriter writer) {
987         
988         writer.print("[source] [host] [#urls]\n");
989         // for each source
990         for (String sourceKey: sourceHostDistribution.keySet()) {
991             Map<String,AtomicLong> hostCounts = sourceHostDistribution.get(sourceKey);
992             // sort hosts by #urls
993             SortedMap<String,AtomicLong> sortedHostCounts = getReverseSortedHostCounts(hostCounts);
994             // for each host
995             for (String hostKey: sortedHostCounts.keySet()) {
996                 AtomicLong hostCount = hostCounts.get(hostKey);
997                 writer.print(sourceKey.toString());
998                 writer.print(" ");
999                 writer.print(hostKey.toString());
1000                 writer.print(" ");
1001                 writer.print(hostCount.get());
1002                 writer.print("\n");
1003             }
1004         }
1005     }
1006   
1007     /***
1008      * Return a copy of the hosts distribution in reverse-sorted (largest first)
1009      * order.
1010      * 
1011      * @return SortedMap of hosts distribution
1012      */
1013     public SortedMap<String,AtomicLong> getReverseSortedHostCounts(
1014             Map<String,AtomicLong> hostCounts) {
1015         return getReverseSortedCopy(hostCounts);
1016     }
1017 
1018     
1019     protected void writeHostsReportTo(final PrintWriter writer) {
1020         // TODO: use CrawlHosts for all stats; only perform sorting on 
1021         // manageable number of hosts
1022         SortedMap<String,AtomicLong> hd = getReverseSortedHostsDistribution();
1023         // header
1024         writer.print("[#urls] [#bytes] [host] [#robots] [#remaining] [#novel-urls] [#novel-bytes] [#dup-by-hash-urls] [#dup-by-hash-bytes] [#not-modified-urls] [#not-modified-bytes]\n");
1025         for (String key: hd.keySet()) {
1026             // Key is 'host'.
1027             CrawlHost host = controller.getServerCache().getHostFor(key);
1028             AtomicLong val = hd.get(key);
1029             writeReportLine(writer,
1030                     val == null ? "-" : val.get(),
1031                     getBytesPerHost(key),
1032                     fixup(key),
1033                     host.getSubstats().getRobotsDenials(),
1034                     host.getSubstats().getRemaining(),
1035                     host.getSubstats().getNovelUrls(),
1036                     host.getSubstats().getNovelBytes(),
1037                     host.getSubstats().getDupByHashUrls(),
1038                     host.getSubstats().getDupByHashBytes(),
1039                     host.getSubstats().getNotModifiedUrls(),
1040                     host.getSubstats().getNotModifiedBytes());
1041         }
1042         // StatisticsTracker doesn't know of zero-completion hosts; 
1043         // so supplement report with those entries from host cache
1044         Closure logZeros = new Closure() {
1045             public void execute(Object obj) {
1046                 CrawlHost host = (CrawlHost)obj;
1047                 if(host.getSubstats().getRecordedFinishes()==0) {
1048                     writeReportLine(writer,
1049                             host.getSubstats().getRecordedFinishes(),
1050                             host.getSubstats().getTotalBytes(),
1051                             fixup(host.getHostName()),
1052                             host.getSubstats().getRobotsDenials(),
1053                             host.getSubstats().getRemaining(),
1054                             host.getSubstats().getNovelUrls(),
1055                             host.getSubstats().getNovelBytes(),
1056                             host.getSubstats().getDupByHashUrls(),
1057                             host.getSubstats().getDupByHashBytes(),
1058                             host.getSubstats().getNotModifiedUrls(),
1059                             host.getSubstats().getNotModifiedBytes());
1060                 }
1061             }};
1062         controller.getServerCache().forAllHostsDo(logZeros);
1063     }
1064     
1065     protected String fixup(String hostName) {
1066         if ("dns:".equals(hostName)) {
1067             return hostName;
1068         } else {
1069             try {
1070                 return URLEncoder.encode(hostName, "UTF-8");
1071             } catch (UnsupportedEncodingException e) {
1072                 throw new RuntimeException(e);
1073             }
1074         }
1075     }
1076 
1077     protected void writeReportLine(PrintWriter writer, Object  ... fields) {
1078        for(Object field : fields) {
1079            writer.print(field);
1080            writer.print(" ");
1081        }
1082        writer.print("\n");
1083     }
1084 
1085     /***
1086      * Return a copy of the hosts distribution in reverse-sorted
1087      * (largest first) order. 
1088      * @return SortedMap of hosts distribution
1089      */
1090     public SortedMap<String,AtomicLong> getReverseSortedHostsDistribution() {
1091         return getReverseSortedCopy(hostsDistribution);
1092     }
1093 
1094     protected void writeMimetypesReportTo(PrintWriter writer) {
1095         // header
1096         writer.print("[#urls] [#bytes] [mime-types]\n");
1097         TreeMap<String,AtomicLong> fd = getReverseSortedCopy(getFileDistribution());
1098         for (String key: fd.keySet()) { 
1099             // Key is mime type.
1100             writer.print(Long.toString(fd.get(key).get()));
1101             writer.print(" ");
1102             writer.print(Long.toString(getBytesPerFileType(key)));
1103             writer.print(" ");
1104             writer.print(key);
1105             writer.print("\n");
1106         }
1107     }
1108     
1109     protected void writeResponseCodeReportTo(PrintWriter writer) {
1110         // Build header.
1111         writer.print("[rescode] [#urls]\n");
1112         TreeMap<String,AtomicLong> scd = getReverseSortedCopy(getStatusCodeDistribution());
1113         for (String key: scd.keySet()) { 
1114             writer.print(key);
1115             writer.print(" ");
1116             writer.print(Long.toString(scd.get(key).get()));
1117             writer.print("\n");
1118         }
1119     }
1120     
1121     protected void writeCrawlReportTo(PrintWriter writer) {
1122         writer.print("Crawl Name: " + controller.getOrder().getCrawlOrderName());
1123         writer.print("\nCrawl Status: " + sExitMessage);
1124         writer.print("\nDuration Time: " +
1125                 ArchiveUtils.formatMillisecondsToConventional(crawlDuration()));
1126         writer.print("\nTotal Seeds Crawled: " + seedsCrawled);
1127         writer.print("\nTotal Seeds not Crawled: " + seedsNotCrawled);
1128         // hostsDistribution contains all hosts crawled plus an entry for dns.
1129         writer.print("\nTotal Hosts Crawled: " + (hostsDistribution.size()-1));
1130         writer.print("\nTotal Documents Crawled: " + finishedUriCount);
1131         writer.print("\nDocuments Crawled Successfully: " + downloadedUriCount);
1132         writer.print("\nNovel Documents Crawled: " + novelUriCount);
1133         if (dupByHashUriCount > 0)
1134             writer.print("\nDuplicate-by-hash Documents Crawled: " + dupByHashUriCount);
1135         if (notModifiedUriCount > 0)
1136             writer.print("\nNot-modified Documents Crawled: " + notModifiedUriCount);
1137         writer.print("\nProcessed docs/sec: " +
1138                 ArchiveUtils.doubleToString(docsPerSecond,2));
1139         writer.print("\nBandwidth in Kbytes/sec: " + totalKBPerSec);
1140         writer.print("\nTotal Raw Data Size in Bytes: " + totalProcessedBytes +
1141                 " (" + ArchiveUtils.formatBytesForDisplay(totalProcessedBytes) +
1142                 ") \n");
1143         writer.print("Novel Bytes: " 
1144                 + crawledBytes.get(CrawledBytesHistotable.NOVEL)
1145                 + " (" + ArchiveUtils.formatBytesForDisplay(
1146                         crawledBytes.get(CrawledBytesHistotable.NOVEL))
1147                 +  ") \n");
1148         if(crawledBytes.containsKey(CrawledBytesHistotable.DUPLICATE)) {
1149             writer.print("Duplicate-by-hash Bytes: " 
1150                     + crawledBytes.get(CrawledBytesHistotable.DUPLICATE)
1151                     + " (" + ArchiveUtils.formatBytesForDisplay(
1152                             crawledBytes.get(CrawledBytesHistotable.DUPLICATE))
1153                     +  ") \n");
1154         }
1155         if(crawledBytes.containsKey(CrawledBytesHistotable.NOTMODIFIED)) {
1156             writer.print("Not-modified Bytes: " 
1157                     + crawledBytes.get(CrawledBytesHistotable.NOTMODIFIED)
1158                     + " (" + ArchiveUtils.formatBytesForDisplay(
1159                             crawledBytes.get(CrawledBytesHistotable.NOTMODIFIED))
1160                     +  ") \n");
1161         }
1162     }
1163     
1164     protected void writeProcessorsReportTo(PrintWriter writer) {
1165         controller.reportTo(CrawlController.PROCESSORS_REPORT,writer);
1166     }
1167     
1168     protected void writeReportFile(String reportName, String filename) {
1169         File f = new File(controller.getDisk().getPath(), filename);
1170         try {
1171             PrintWriter bw = new PrintWriter(
1172                 new OutputStreamWriter(
1173                     new FileOutputStream(f, false),
1174                     "UTF-8"));
1175             writeReportTo(reportName, bw);
1176             bw.close();
1177             controller.addToManifest(f.getAbsolutePath(),
1178                 CrawlController.MANIFEST_REPORT_FILE, true);
1179         } catch (IOException e) {
1180             logger.log(Level.SEVERE, "Unable to write " + f.getAbsolutePath() +
1181                 " at the end of crawl.", e);
1182         }
1183         logger.info("wrote report: " + f.getAbsolutePath());
1184     }
1185     
1186     /***
1187      * @param writer Where to write.
1188      */
1189     protected void writeManifestReportTo(PrintWriter writer) {
1190         controller.reportTo(CrawlController.MANIFEST_REPORT, writer);
1191     }
1192     
1193     /***
1194      * @param reportName Name of report.
1195      * @param w Where to write.
1196      */
1197     private void writeReportTo(String reportName, PrintWriter w) {
1198         if("hosts".equals(reportName)) {
1199             writeHostsReportTo(w);
1200         } else if ("mime types".equals(reportName)) {
1201             writeMimetypesReportTo(w);
1202         } else if ("response codes".equals(reportName)) {
1203             writeResponseCodeReportTo(w);
1204         } else if ("seeds".equals(reportName)) {
1205             writeSeedsReportTo(w);
1206         } else if ("crawl".equals(reportName)) {
1207             writeCrawlReportTo(w);
1208         } else if ("processors".equals(reportName)) {
1209             writeProcessorsReportTo(w);
1210         } else if ("manifest".equals(reportName)) {
1211             writeManifestReportTo(w);
1212         } else if ("frontier".equals(reportName)) {
1213             writeFrontierReportTo(w);
1214         } else if ("source".equals(reportName)) {
1215             writeSourceReportTo(w);
1216         }// / TODO else default/error
1217     }
1218 
1219     /***
1220      * Write the Frontier's 'nonempty' report (if available)
1221      * @param writer to report to
1222      */
1223     protected void writeFrontierReportTo(PrintWriter writer) {
1224         if(controller.getFrontier().isEmpty()) {
1225             writer.println("frontier empty");
1226         } else {
1227             controller.getFrontier().reportTo("nonempty", writer);
1228         }
1229     }
1230 
1231     /***
1232      * Run the reports.
1233      */
1234     public void dumpReports() {
1235         // Add all files mentioned in the crawl order to the
1236         // manifest set.
1237         controller.addOrderToManifest();
1238         controller.installThreadContextSettingsHandler();
1239         writeReportFile("hosts","hosts-report.txt");
1240         writeReportFile("mime types","mimetype-report.txt");
1241         writeReportFile("response codes","responsecode-report.txt");
1242         writeReportFile("seeds","seeds-report.txt");
1243         writeReportFile("crawl","crawl-report.txt");
1244         writeReportFile("processors","processors-report.txt");
1245         writeReportFile("manifest","crawl-manifest.txt");
1246         writeReportFile("frontier","frontier-report.txt");
1247         if (sourceHostDistribution.size()>0) {
1248             writeReportFile("source","source-report.txt");
1249         }
1250         // TODO: Save object to disk?
1251     }
1252 
1253     public void crawlCheckpoint(File cpDir) throws Exception {
1254         // CrawlController is managing the checkpointing of this object.
1255         logNote("CRAWL CHECKPOINTING TO " + cpDir.toString());
1256     }
1257 }