View Javadoc

1   /* RecoveryJournal
2    *
3    * $Id: RecoveryJournal.java 5870 2008-07-11 22:14:18Z gojomo $
4    *
5    * Created on Jul 20, 2004
6    *
7    * Copyright (C) 2004 Internet Archive.
8    *
9    * This file is part of the Heritrix web crawler (crawler.archive.org).
10   *
11   * Heritrix is free software; you can redistribute it and/or modify
12   * it under the terms of the GNU Lesser Public License as published by
13   * the Free Software Foundation; either version 2.1 of the License, or
14   * any later version.
15   *
16   * Heritrix is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU Lesser Public License for more details.
20   *
21   * You should have received a copy of the GNU Lesser Public License
22   * along with Heritrix; if not, write to the Free Software
23   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24   */
25  package org.archive.crawler.frontier;
26  
27  import it.unimi.dsi.mg4j.util.MutableString;
28  
29  import java.io.BufferedInputStream;
30  import java.io.EOFException;
31  import java.io.File;
32  import java.io.IOException;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.logging.Level;
35  import java.util.logging.Logger;
36  
37  import org.apache.commons.httpclient.URIException;
38  import org.archive.crawler.datamodel.CandidateURI;
39  import org.archive.crawler.datamodel.CrawlOrder;
40  import org.archive.crawler.framework.CrawlController;
41  import org.archive.crawler.framework.CrawlScope;
42  import org.archive.crawler.framework.Frontier;
43  import org.archive.crawler.io.CrawlerJournal;
44  import org.archive.net.UURI;
45  
46  /***
47   * Helper class for managing a simple Frontier change-events journal which is
48   * useful for recovering from crawl problems.
49   * 
50   * By replaying the journal into a new Frontier, its state (at least with
51   * respect to URIs alreadyIncluded and in pending queues) will match that of the
52   * original Frontier, allowing a pseudo-resume of a previous crawl, at least as
53   * far as URI visitation/coverage is concerned.
54   * 
55   * @author gojomo
56   */
57  public class RecoveryJournal extends CrawlerJournal 
58  implements FrontierJournal {
59      private static final Logger LOGGER = Logger.getLogger(
60              RecoveryJournal.class.getName());
61      
62      public final static String F_ADD = "F+ ";
63      public final static String F_EMIT = "Fe ";
64      public final static String F_DISREGARD = "Fd ";
65      public final static String F_RESCHEDULE = "Fr ";
66      public final static String F_SUCCESS = "Fs ";
67      public final static String F_FAILURE = "Ff ";
68      
69      //  show recovery progress every this many lines
70      private static final int PROGRESS_INTERVAL = 1000000;
71  
72      // once this many URIs are queued during recovery, allow 
73      // crawl to begin, while enqueuing of other URIs from log
74      // continues in background
75      private static final long ENOUGH_TO_START_CRAWLING = 100000;
76      
77      /***
78       * Create a new recovery journal at the given location
79       * 
80       * @param path Directory to make the recovery  journal in.
81       * @param filename Name to use for recovery journal file.
82       * @throws IOException
83       */
84      public RecoveryJournal(String path, String filename)
85      throws IOException {
86          super(path,filename);
87          timestamp_interval = 10000; // write timestamp lines occasionally
88      }
89      
90      public  void added(CandidateURI curi) {
91          writeLongUriLine(F_ADD, curi);
92      }
93      
94      public synchronized void writeLongUriLine(String tag, CandidateURI curi) {
95          accumulatingBuffer.length(0);
96          this.accumulatingBuffer.append(tag).
97              append(curi.toString()).
98              append(" "). 
99              append(curi.getPathFromSeed()).
100             append(" ").
101             append(curi.flattenVia());
102         writeLine(accumulatingBuffer);
103     }
104 
105     public void finishedSuccess(CandidateURI curi) {
106         writeLongUriLine(F_SUCCESS, curi);
107     }
108 
109     public void emitted(CandidateURI curi) {
110         writeLine(F_EMIT, curi.toString());
111 
112     }
113     public void finishedDisregard(CandidateURI curi) {
114         writeLine(F_DISREGARD, curi.toString());
115     }
116     
117     public void finishedFailure(CandidateURI curi) {
118         writeLongUriLine(F_FAILURE,curi);
119     }
120 
121     public void rescheduled(CandidateURI curi) {
122         writeLine(F_RESCHEDULE, curi.toString());
123     }
124 
125     /***
126      * Utility method for scanning a recovery journal and applying it to
127      * a Frontier.
128      * 
129      * @param source Recover log path.
130      * @param frontier Frontier reference.
131      * @param retainFailures
132      * @throws IOException
133      * 
134      * @see org.archive.crawler.framework.Frontier#importRecoverLog(String, boolean)
135      */
136     public static void importRecoverLog(final File source,
137         final CrawlController controller, final boolean retainFailures)
138     throws IOException {
139         if (source == null) {
140             throw new IllegalArgumentException("Passed source file is null.");
141         }
142         LOGGER.info("recovering frontier completion state from "+source);
143         
144         // first, fill alreadyIncluded with successes (and possibly failures),
145         // and count the total lines
146         final int lines =
147             importCompletionInfoFromLog(source, controller, retainFailures);
148         
149         LOGGER.info("finished completion state; recovering queues from " +
150             source);
151 
152         // now, re-add anything that was in old frontier and not already
153         // registered as finished. Do this in a separate thread that signals
154         // this thread once ENOUGH_TO_START_CRAWLING URIs have been queued. 
155         final CountDownLatch recoveredEnough = new CountDownLatch(1);
156         new Thread(new Runnable() {
157             public void run() {
158                 importQueuesFromLog(source, controller, lines, recoveredEnough);
159             }
160         }, "queuesRecoveryThread").start();
161         
162         try {
163             // wait until at least ENOUGH_TO_START_CRAWLING URIs queued
164             recoveredEnough.await();
165         } catch (InterruptedException e) {
166             // TODO Auto-generated catch block
167             e.printStackTrace();
168         }
169     }
170     
171     /***
172      * Import just the SUCCESS (and possibly FAILURE) URIs from the given
173      * recovery log into the frontier as considered included. 
174      * 
175      * @param source recovery log file to use
176      * @param frontier frontier to update
177      * @param retainFailures whether failure ('Ff') URIs should count as done
178      * @return number of lines in recovery log (for reference)
179      * @throws IOException
180      */
181     private static int importCompletionInfoFromLog(File source, 
182             CrawlController controller, boolean retainFailures) throws IOException {
183         Frontier frontier = controller.getFrontier();
184         boolean checkScope = (Boolean) controller.getOrder()
185                 .getUncheckedAttribute(null,
186                         CrawlOrder.ATTR_RECOVER_SCOPE_INCLUDES);
187         CrawlScope scope = checkScope ? controller.getScope() : null;
188         // Scan log for all 'Fs' lines: add as 'alreadyIncluded'
189         BufferedInputStream is = getBufferedInput(source);
190         // create MutableString of good starting size (will grow if necessary)
191         MutableString read = new MutableString(UURI.MAX_URL_LENGTH); 
192         int lines = 0; 
193         try {
194             while (readLine(is,read)) {
195                 lines++;
196                 boolean wasSuccess = read.startsWith(F_SUCCESS);
197                 if (wasSuccess
198 						|| (retainFailures && read.startsWith(F_FAILURE))) {
199                     try {
200                         CandidateURI cauri = CandidateURI.fromString(
201                                 read.substring(F_SUCCESS.length()).toString());
202                         if(checkScope) {
203                             if(!scope.accepts(cauri)) {
204                                 // skip out-of-scope URIs.
205                                 continue;
206                             }
207                         }
208                         frontier.considerIncluded(cauri.getUURI());
209                         if(wasSuccess) {
210                             if (frontier.getFrontierJournal() != null) {
211                                 frontier.getFrontierJournal().
212                                     finishedSuccess(cauri);
213                             }
214                         } else {
215                             // carryforward failure, in case future recovery
216                             // wants to no retain them as finished 
217                             if (frontier.getFrontierJournal() != null) {
218                                 frontier.getFrontierJournal().
219                                     finishedFailure(cauri);
220                             }
221                         }
222                     } catch (URIException e) {
223                         e.printStackTrace();
224                     }
225                 }
226                 if((lines%PROGRESS_INTERVAL)==0) {
227                     // every 1 million lines, print progress
228                     LOGGER.info(
229                             "at line " + lines 
230                             + " alreadyIncluded count = " +
231                             frontier.discoveredUriCount());
232                 }
233             }
234         } catch (EOFException e) {
235             // expected in some uncleanly-closed recovery logs; ignore
236         } finally {
237             is.close();
238         }
239         return lines;
240     }
241 
242     /***
243      * Read a line from the given bufferedinputstream into the MutableString.
244      * Return true if a line was read; false if EOF. 
245      * 
246      * @param is
247      * @param read
248      * @return True if we read a line.
249      * @throws IOException
250      */
251     private static boolean readLine(BufferedInputStream is, MutableString read)
252     throws IOException {
253         read.length(0);
254         int c = is.read();
255         while((c!=-1)&&c!='\n'&&c!='\r') {
256             read.append((char)c);
257             c = is.read();
258         }
259         if(c==-1 && read.length()==0) {
260             // EOF and none read; return false
261             return false;
262         }
263         if(c=='\n') {
264             // consume LF following CR, if present
265             is.mark(1);
266             if(is.read()!='\r') {
267                 is.reset();
268             }
269         }
270         // a line (possibly blank) was read
271         return true;
272     }
273 
274     /***
275      * Import all ADDs from given recovery log into the frontier's queues
276      * (excepting those the frontier drops as already having been included)
277      * 
278      * @param source recovery log file to use
279      * @param frontier frontier to update
280      * @param lines total lines noted in recovery log earlier
281      * @param enough latch signalling 'enough' URIs queued to begin crawling
282      */
283     private static void importQueuesFromLog(File source, CrawlController controller,
284             int lines, CountDownLatch enough) {
285         BufferedInputStream is;
286         // create MutableString of good starting size (will grow if necessary)
287         MutableString read = new MutableString(UURI.MAX_URL_LENGTH);
288         controller.installThreadContextSettingsHandler();
289         Frontier frontier = controller.getFrontier();
290         boolean checkScope = (Boolean) controller.getOrder()
291         .getUncheckedAttribute(null,
292                 CrawlOrder.ATTR_RECOVER_SCOPE_ENQUEUES);
293         CrawlScope scope = checkScope ? controller.getScope() : null;
294         long queuedAtStart = frontier.queuedUriCount();
295         long queuedDuringRecovery = 0;
296         int qLines = 0;
297         
298         try {
299             // Scan log for all 'F+' lines: if not alreadyIncluded, schedule for
300             // visitation
301             is = getBufferedInput(source);
302             try {
303                 while (readLine(is,read)) {
304                     qLines++;
305                     if (read.startsWith(F_ADD)) {
306                         try {
307                             CandidateURI caUri = CandidateURI.fromString(
308                                     read.substring(F_ADD.length()).toString());
309                             if(checkScope) {
310                                 if(!scope.accepts(caUri)) {
311                                     // skip out-of-scope URIs.
312                                     continue;
313                                 }
314                             }
315                             frontier.schedule(caUri);
316                             
317                             queuedDuringRecovery =
318                                 frontier.queuedUriCount() - queuedAtStart;
319                             if(((queuedDuringRecovery + 1) %
320                                     ENOUGH_TO_START_CRAWLING) == 0) {
321                                 enough.countDown();
322                             }
323                         } catch (URIException e) {
324                             LOGGER.log(Level.WARNING, "bad URI during " +
325                                 "log-recovery of queue contents: "+read,e);
326                             // and continue...
327                         } catch (RuntimeException e) {
328                             LOGGER.log(Level.SEVERE, "exception during " +
329                                     "log-recovery of queue contents: "+read,e);
330                             // and continue, though this may be risky
331                             // if the exception wasn't a trivial NPE 
332                             // or wrapped interrupted-exception.
333                         }
334                     }
335                     if((qLines%PROGRESS_INTERVAL)==0) {
336                         // every 1 million lines, print progress
337                         LOGGER.info(
338                                 "through line " 
339                                 + qLines + "/" + lines 
340                                 + " queued count = " +
341                                 frontier.queuedUriCount());
342                     }
343                 }
344             } catch (EOFException e) {
345                 // no problem: untidy end of recovery journal
346             } finally {
347             	    is.close(); 
348             }
349         } catch (IOException e) {
350             // serious -- but perhaps ignorable? -- error
351             LOGGER.log(Level.SEVERE, "problem importing queues", e);
352         }
353         LOGGER.info("finished recovering frontier from "+source+" "
354                 +qLines+" lines processed");
355         enough.countDown();
356     }
357 }