1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.crawler.frontier;
26
27 import it.unimi.dsi.mg4j.util.MutableString;
28
29 import java.io.BufferedInputStream;
30 import java.io.EOFException;
31 import java.io.File;
32 import java.io.IOException;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36
37 import org.apache.commons.httpclient.URIException;
38 import org.archive.crawler.datamodel.CandidateURI;
39 import org.archive.crawler.datamodel.CrawlOrder;
40 import org.archive.crawler.framework.CrawlController;
41 import org.archive.crawler.framework.CrawlScope;
42 import org.archive.crawler.framework.Frontier;
43 import org.archive.crawler.io.CrawlerJournal;
44 import org.archive.net.UURI;
45
46 /***
47 * Helper class for managing a simple Frontier change-events journal which is
48 * useful for recovering from crawl problems.
49 *
50 * By replaying the journal into a new Frontier, its state (at least with
51 * respect to URIs alreadyIncluded and in pending queues) will match that of the
52 * original Frontier, allowing a pseudo-resume of a previous crawl, at least as
53 * far as URI visitation/coverage is concerned.
54 *
55 * @author gojomo
56 */
57 public class RecoveryJournal extends CrawlerJournal
58 implements FrontierJournal {
59 private static final Logger LOGGER = Logger.getLogger(
60 RecoveryJournal.class.getName());
61
62 public final static String F_ADD = "F+ ";
63 public final static String F_EMIT = "Fe ";
64 public final static String F_DISREGARD = "Fd ";
65 public final static String F_RESCHEDULE = "Fr ";
66 public final static String F_SUCCESS = "Fs ";
67 public final static String F_FAILURE = "Ff ";
68
69
70 private static final int PROGRESS_INTERVAL = 1000000;
71
72
73
74
75 private static final long ENOUGH_TO_START_CRAWLING = 100000;
76
77 /***
78 * Create a new recovery journal at the given location
79 *
80 * @param path Directory to make the recovery journal in.
81 * @param filename Name to use for recovery journal file.
82 * @throws IOException
83 */
84 public RecoveryJournal(String path, String filename)
85 throws IOException {
86 super(path,filename);
87 timestamp_interval = 10000;
88 }
89
90 public void added(CandidateURI curi) {
91 writeLongUriLine(F_ADD, curi);
92 }
93
94 public synchronized void writeLongUriLine(String tag, CandidateURI curi) {
95 accumulatingBuffer.length(0);
96 this.accumulatingBuffer.append(tag).
97 append(curi.toString()).
98 append(" ").
99 append(curi.getPathFromSeed()).
100 append(" ").
101 append(curi.flattenVia());
102 writeLine(accumulatingBuffer);
103 }
104
105 public void finishedSuccess(CandidateURI curi) {
106 writeLongUriLine(F_SUCCESS, curi);
107 }
108
109 public void emitted(CandidateURI curi) {
110 writeLine(F_EMIT, curi.toString());
111
112 }
113 public void finishedDisregard(CandidateURI curi) {
114 writeLine(F_DISREGARD, curi.toString());
115 }
116
117 public void finishedFailure(CandidateURI curi) {
118 writeLongUriLine(F_FAILURE,curi);
119 }
120
121 public void rescheduled(CandidateURI curi) {
122 writeLine(F_RESCHEDULE, curi.toString());
123 }
124
125 /***
126 * Utility method for scanning a recovery journal and applying it to
127 * a Frontier.
128 *
129 * @param source Recover log path.
130 * @param frontier Frontier reference.
131 * @param retainFailures
132 * @throws IOException
133 *
134 * @see org.archive.crawler.framework.Frontier#importRecoverLog(String, boolean)
135 */
136 public static void importRecoverLog(final File source,
137 final CrawlController controller, final boolean retainFailures)
138 throws IOException {
139 if (source == null) {
140 throw new IllegalArgumentException("Passed source file is null.");
141 }
142 LOGGER.info("recovering frontier completion state from "+source);
143
144
145
146 final int lines =
147 importCompletionInfoFromLog(source, controller, retainFailures);
148
149 LOGGER.info("finished completion state; recovering queues from " +
150 source);
151
152
153
154
155 final CountDownLatch recoveredEnough = new CountDownLatch(1);
156 new Thread(new Runnable() {
157 public void run() {
158 importQueuesFromLog(source, controller, lines, recoveredEnough);
159 }
160 }, "queuesRecoveryThread").start();
161
162 try {
163
164 recoveredEnough.await();
165 } catch (InterruptedException e) {
166
167 e.printStackTrace();
168 }
169 }
170
171 /***
172 * Import just the SUCCESS (and possibly FAILURE) URIs from the given
173 * recovery log into the frontier as considered included.
174 *
175 * @param source recovery log file to use
176 * @param frontier frontier to update
177 * @param retainFailures whether failure ('Ff') URIs should count as done
178 * @return number of lines in recovery log (for reference)
179 * @throws IOException
180 */
181 private static int importCompletionInfoFromLog(File source,
182 CrawlController controller, boolean retainFailures) throws IOException {
183 Frontier frontier = controller.getFrontier();
184 boolean checkScope = (Boolean) controller.getOrder()
185 .getUncheckedAttribute(null,
186 CrawlOrder.ATTR_RECOVER_SCOPE_INCLUDES);
187 CrawlScope scope = checkScope ? controller.getScope() : null;
188
189 BufferedInputStream is = getBufferedInput(source);
190
191 MutableString read = new MutableString(UURI.MAX_URL_LENGTH);
192 int lines = 0;
193 try {
194 while (readLine(is,read)) {
195 lines++;
196 boolean wasSuccess = read.startsWith(F_SUCCESS);
197 if (wasSuccess
198 || (retainFailures && read.startsWith(F_FAILURE))) {
199 try {
200 CandidateURI cauri = CandidateURI.fromString(
201 read.substring(F_SUCCESS.length()).toString());
202 if(checkScope) {
203 if(!scope.accepts(cauri)) {
204
205 continue;
206 }
207 }
208 frontier.considerIncluded(cauri.getUURI());
209 if(wasSuccess) {
210 if (frontier.getFrontierJournal() != null) {
211 frontier.getFrontierJournal().
212 finishedSuccess(cauri);
213 }
214 } else {
215
216
217 if (frontier.getFrontierJournal() != null) {
218 frontier.getFrontierJournal().
219 finishedFailure(cauri);
220 }
221 }
222 } catch (URIException e) {
223 e.printStackTrace();
224 }
225 }
226 if((lines%PROGRESS_INTERVAL)==0) {
227
228 LOGGER.info(
229 "at line " + lines
230 + " alreadyIncluded count = " +
231 frontier.discoveredUriCount());
232 }
233 }
234 } catch (EOFException e) {
235
236 } finally {
237 is.close();
238 }
239 return lines;
240 }
241
242 /***
243 * Read a line from the given bufferedinputstream into the MutableString.
244 * Return true if a line was read; false if EOF.
245 *
246 * @param is
247 * @param read
248 * @return True if we read a line.
249 * @throws IOException
250 */
251 private static boolean readLine(BufferedInputStream is, MutableString read)
252 throws IOException {
253 read.length(0);
254 int c = is.read();
255 while((c!=-1)&&c!='\n'&&c!='\r') {
256 read.append((char)c);
257 c = is.read();
258 }
259 if(c==-1 && read.length()==0) {
260
261 return false;
262 }
263 if(c=='\n') {
264
265 is.mark(1);
266 if(is.read()!='\r') {
267 is.reset();
268 }
269 }
270
271 return true;
272 }
273
274 /***
275 * Import all ADDs from given recovery log into the frontier's queues
276 * (excepting those the frontier drops as already having been included)
277 *
278 * @param source recovery log file to use
279 * @param frontier frontier to update
280 * @param lines total lines noted in recovery log earlier
281 * @param enough latch signalling 'enough' URIs queued to begin crawling
282 */
283 private static void importQueuesFromLog(File source, CrawlController controller,
284 int lines, CountDownLatch enough) {
285 BufferedInputStream is;
286
287 MutableString read = new MutableString(UURI.MAX_URL_LENGTH);
288 controller.installThreadContextSettingsHandler();
289 Frontier frontier = controller.getFrontier();
290 boolean checkScope = (Boolean) controller.getOrder()
291 .getUncheckedAttribute(null,
292 CrawlOrder.ATTR_RECOVER_SCOPE_ENQUEUES);
293 CrawlScope scope = checkScope ? controller.getScope() : null;
294 long queuedAtStart = frontier.queuedUriCount();
295 long queuedDuringRecovery = 0;
296 int qLines = 0;
297
298 try {
299
300
301 is = getBufferedInput(source);
302 try {
303 while (readLine(is,read)) {
304 qLines++;
305 if (read.startsWith(F_ADD)) {
306 try {
307 CandidateURI caUri = CandidateURI.fromString(
308 read.substring(F_ADD.length()).toString());
309 if(checkScope) {
310 if(!scope.accepts(caUri)) {
311
312 continue;
313 }
314 }
315 frontier.schedule(caUri);
316
317 queuedDuringRecovery =
318 frontier.queuedUriCount() - queuedAtStart;
319 if(((queuedDuringRecovery + 1) %
320 ENOUGH_TO_START_CRAWLING) == 0) {
321 enough.countDown();
322 }
323 } catch (URIException e) {
324 LOGGER.log(Level.WARNING, "bad URI during " +
325 "log-recovery of queue contents: "+read,e);
326
327 } catch (RuntimeException e) {
328 LOGGER.log(Level.SEVERE, "exception during " +
329 "log-recovery of queue contents: "+read,e);
330
331
332
333 }
334 }
335 if((qLines%PROGRESS_INTERVAL)==0) {
336
337 LOGGER.info(
338 "through line "
339 + qLines + "/" + lines
340 + " queued count = " +
341 frontier.queuedUriCount());
342 }
343 }
344 } catch (EOFException e) {
345
346 } finally {
347 is.close();
348 }
349 } catch (IOException e) {
350
351 LOGGER.log(Level.SEVERE, "problem importing queues", e);
352 }
353 LOGGER.info("finished recovering frontier from "+source+" "
354 +qLines+" lines processed");
355 enough.countDown();
356 }
357 }