1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.archive.crawler.framework;
25
26 import java.io.PrintWriter;
27 import java.util.HashMap;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import java.util.logging.Level;
30 import java.util.logging.Logger;
31
32 import org.archive.crawler.datamodel.CoreAttributeConstants;
33 import org.archive.crawler.datamodel.CrawlOrder;
34 import org.archive.crawler.datamodel.CrawlURI;
35 import org.archive.crawler.datamodel.FetchStatusCodes;
36 import org.archive.crawler.datamodel.InstancePerThread;
37 import org.archive.crawler.framework.exceptions.EndedException;
38 import org.archive.util.ArchiveUtils;
39 import org.archive.util.DevUtils;
40 import org.archive.util.HttpRecorder;
41 import org.archive.util.HttpRecorderMarker;
42 import org.archive.util.ProgressStatisticsReporter;
43 import org.archive.util.Reporter;
44
45 import com.sleepycat.util.RuntimeExceptionWrapper;
46
47 /***
48 * One "worker thread"; asks for CrawlURIs, processes them,
49 * repeats unless told otherwise.
50 *
51 * @author Gordon Mohr
52 */
53 public class ToeThread extends Thread
54 implements CoreAttributeConstants, FetchStatusCodes, HttpRecorderMarker,
55 Reporter, ProgressStatisticsReporter {
56 private static final String STEP_NASCENT = "NASCENT";
57 private static final String STEP_ABOUT_TO_GET_URI = "ABOUT_TO_GET_URI";
58 private static final String STEP_FINISHED = "FINISHED";
59 private static final String STEP_ABOUT_TO_BEGIN_CHAIN =
60 "ABOUT_TO_BEGIN_CHAIN";
61 private static final String STEP_ABOUT_TO_BEGIN_PROCESSOR =
62 "ABOUT_TO_BEGIN_PROCESSOR";
63 private static final String STEP_DONE_WITH_PROCESSORS =
64 "DONE_WITH_PROCESSORS";
65 private static final String STEP_HANDLING_RUNTIME_EXCEPTION =
66 "HANDLING_RUNTIME_EXCEPTION";
67 private static final String STEP_ABOUT_TO_RETURN_URI =
68 "ABOUT_TO_RETURN_URI";
69 private static final String STEP_FINISHING_PROCESS = "FINISHING_PROCESS";
70
71 private static Logger logger =
72 Logger.getLogger("org.archive.crawler.framework.ToeThread");
73
74 private CrawlController controller;
75 private int serialNumber;
76
77 /***
78 * Each ToeThead has an instance of HttpRecord that gets used
79 * over and over by each request.
80 *
81 * @see org.archive.util.HttpRecorderMarker
82 */
83 private HttpRecorder httpRecorder = null;
84
85 private HashMap<String,Processor> localProcessors
86 = new HashMap<String,Processor>();
87 private String currentProcessorName = "";
88
89 private String coreName;
90 private CrawlURI currentCuri;
91 private long lastStartTime;
92 private long lastFinishTime;
93
94
95 private String step = STEP_NASCENT;
96 private long atStepSince;
97
98
99 private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY-2;
100
101
102
103 private volatile boolean shouldRetire = false;
104
105 /***
106 * Create a ToeThread
107 *
108 * @param g ToeThreadGroup
109 * @param sn serial number
110 */
111 public ToeThread(ToePool g, int sn) {
112
113 super(g,"ToeThread #" + sn);
114 coreName="ToeThread #" + sn + ": ";
115 controller = g.getController();
116 serialNumber = sn;
117 setPriority(DEFAULT_PRIORITY);
118 int outBufferSize = ((Integer) controller
119 .getOrder()
120 .getUncheckedAttribute(null,CrawlOrder.ATTR_RECORDER_OUT_BUFFER))
121 .intValue();
122 int inBufferSize = ((Integer) controller
123 .getOrder()
124 .getUncheckedAttribute(null, CrawlOrder.ATTR_RECORDER_IN_BUFFER))
125 .intValue();
126 httpRecorder = new HttpRecorder(controller.getScratchDisk(),
127 "tt" + sn + "http", outBufferSize, inBufferSize);
128 lastFinishTime = System.currentTimeMillis();
129 }
130
131 /*** (non-Javadoc)
132 * @see java.lang.Thread#run()
133 */
134 public void run() {
135 String name = controller.getOrder().getCrawlOrderName();
136 logger.fine(getName()+" started for order '"+name+"'");
137
138 try {
139 controller.getLoopingToes().incrementAndGet();
140
141 while ( true ) {
142
143 continueCheck();
144
145 setStep(STEP_ABOUT_TO_GET_URI);
146
147 CrawlURI curi = controller.getFrontier().next();
148
149 synchronized(this) {
150 continueCheck();
151 setCurrentCuri(curi);
152 }
153
154 processCrawlUri();
155
156 setStep(STEP_ABOUT_TO_RETURN_URI);
157 continueCheck();
158
159 synchronized(this) {
160 controller.getFrontier().finished(currentCuri);
161 setCurrentCuri(null);
162 }
163
164 setStep(STEP_FINISHING_PROCESS);
165 lastFinishTime = System.currentTimeMillis();
166 controller.releaseContinuePermission();
167 if(shouldRetire) {
168 break;
169 }
170 }
171 } catch (EndedException e) {
172
173 } catch (Exception e) {
174
175 logger.log(Level.SEVERE,"Fatal exception in "+getName(),e);
176 } catch (OutOfMemoryError err) {
177 seriousError(err);
178 } finally {
179 controller.getLoopingToes().decrementAndGet();
180 controller.releaseContinuePermission();
181 }
182 setCurrentCuri(null);
183
184 this.httpRecorder.closeRecorders();
185 this.httpRecorder = null;
186 localProcessors = null;
187
188 logger.fine(getName()+" finished for order '"+name+"'");
189 setStep(STEP_FINISHED);
190 controller.toeEnded();
191 controller = null;
192 }
193
194 /***
195 * Set currentCuri, updating thread name as appropriate
196 * @param curi
197 */
198 private void setCurrentCuri(CrawlURI curi) {
199 if(curi==null) {
200 setName(coreName);
201 } else {
202 setName(coreName+curi);
203 }
204 currentCuri = curi;
205 }
206
207 /***
208 * @param s
209 */
210 private void setStep(String s) {
211 step=s;
212 atStepSince = System.currentTimeMillis();
213 }
214
215 private void seriousError(Error err) {
216
217
218
219 setPriority(DEFAULT_PRIORITY+1);
220 if (controller!=null) {
221
222 controller.singleThreadMode();
223
224
225 controller.freeReserveMemory();
226 controller.requestCrawlPause();
227 if (controller.getFrontier().getFrontierJournal() != null) {
228 controller.getFrontier().getFrontierJournal().seriousError(
229 getName() + err.getMessage());
230 }
231 }
232
233
234 String extraInfo = DevUtils.extraInfo();
235 System.err.println("<<<");
236 System.err.println(ArchiveUtils.getLog17Date());
237 System.err.println(err);
238 System.err.println(extraInfo);
239 err.printStackTrace(System.err);
240
241 if (controller!=null) {
242 PrintWriter pw = new PrintWriter(System.err);
243 controller.getToePool().compactReportTo(pw);
244 pw.flush();
245 }
246 System.err.println(">>>");
247
248
249 String context = "unknown";
250 if(currentCuri!=null) {
251
252 currentCuri.addAnnotation("err="+err.getClass().getName());
253 currentCuri.addAnnotation("os"+currentCuri.getFetchStatus());
254 currentCuri.setFetchStatus(S_SERIOUS_ERROR);
255 context = currentCuri.singleLineReport() + " in " + currentProcessorName;
256 }
257 String message = "Serious error occured trying " +
258 "to process '" + context + "'\n" + extraInfo;
259 logger.log(Level.SEVERE, message.toString(), err);
260 setPriority(DEFAULT_PRIORITY);
261 }
262
263 /***
264 * Perform checks as to whether normal execution should proceed.
265 *
266 * If an external interrupt is detected, throw an interrupted exception.
267 * Used before anything that should not be attempted by a 'zombie' thread
268 * that the Frontier/Crawl has given up on.
269 *
270 * Otherwise, if the controller's memoryGate has been closed,
271 * hold until it is opened. (Provides a better chance of
272 * being able to complete some tasks after an OutOfMemoryError.)
273 *
274 * @throws InterruptedException
275 */
276 private void continueCheck() throws InterruptedException {
277 if(Thread.interrupted()) {
278 throw new InterruptedException("die request detected");
279 }
280 controller.acquireContinuePermission();
281 }
282
283 /***
284 * Pass the CrawlURI to all appropriate processors
285 *
286 * @throws InterruptedException
287 */
288 private void processCrawlUri() throws InterruptedException {
289 currentCuri.setThreadNumber(this.serialNumber);
290 currentCuri.setNextProcessorChain(controller.getFirstProcessorChain());
291 lastStartTime = System.currentTimeMillis();
292
293 try {
294 while (currentCuri.nextProcessorChain() != null) {
295 setStep(STEP_ABOUT_TO_BEGIN_CHAIN);
296
297 currentCuri.setNextProcessor(currentCuri.nextProcessorChain().getFirstProcessor());
298 currentCuri.setNextProcessorChain(currentCuri.nextProcessorChain().getNextProcessorChain());
299
300 while (currentCuri.nextProcessor() != null) {
301 setStep(STEP_ABOUT_TO_BEGIN_PROCESSOR);
302 Processor currentProcessor = getProcessor(currentCuri.nextProcessor());
303 currentProcessorName = currentProcessor.getName();
304 continueCheck();
305
306 currentProcessor.process(currentCuri);
307
308
309 }
310 }
311 setStep(STEP_DONE_WITH_PROCESSORS);
312 currentProcessorName = "";
313 } catch (RuntimeExceptionWrapper e) {
314
315 if(e.getCause() == null) {
316 e.initCause(e.getCause());
317 }
318 recoverableProblem(e);
319 } catch (AssertionError ae) {
320
321
322 recoverableProblem(ae);
323 } catch (RuntimeException e) {
324 recoverableProblem(e);
325 } catch (StackOverflowError err) {
326 recoverableProblem(err);
327 } catch (Error err) {
328
329 seriousError(err);
330 }
331 }
332
333
334 /***
335 * Handling for exceptions and errors that are possibly recoverable.
336 *
337 * @param e
338 */
339 private void recoverableProblem(Throwable e) {
340 Object previousStep = step;
341 setStep(STEP_HANDLING_RUNTIME_EXCEPTION);
342 e.printStackTrace(System.err);
343 currentCuri.setFetchStatus(S_RUNTIME_EXCEPTION);
344
345 currentCuri.addAnnotation("err="+e.getClass().getName());
346 currentCuri.putObject(A_RUNTIME_EXCEPTION, e);
347 String message = "Problem " + e +
348 " occured when trying to process '"
349 + currentCuri.toString()
350 + "' at step " + previousStep
351 + " in " + currentProcessorName +"\n";
352 logger.log(Level.SEVERE, message.toString(), e);
353 }
354
355 private Processor getProcessor(Processor processor) {
356 if(!(processor instanceof InstancePerThread)) {
357
358 return processor;
359 }
360
361 Processor localProcessor = (Processor) localProcessors.get(
362 processor.getClass().getName());
363 if (localProcessor == null) {
364 localProcessor = processor.spawn(this.getSerialNumber());
365 localProcessors.put(processor.getClass().getName(),localProcessor);
366 }
367 return localProcessor;
368 }
369
370 /***
371 * @return Return toe thread serial number.
372 */
373 public int getSerialNumber() {
374 return this.serialNumber;
375 }
376
377 /***
378 * Used to get current threads HttpRecorder instance.
379 * Implementation of the HttpRecorderMarker interface.
380 * @return Returns instance of HttpRecorder carried by this thread.
381 * @see org.archive.util.HttpRecorderMarker#getHttpRecorder()
382 */
383 public HttpRecorder getHttpRecorder() {
384 return this.httpRecorder;
385 }
386
387 /*** Get the CrawlController acossiated with this thread.
388 *
389 * @return Returns the CrawlController.
390 */
391 public CrawlController getController() {
392 return controller;
393 }
394
395 /***
396 * Terminates a thread.
397 *
398 * <p> Calling this method will ensure that the current thread will stop
399 * processing as soon as possible (note: this may be never). Meant to
400 * 'short circuit' hung threads.
401 *
402 * <p> Current crawl uri will have its fetch status set accordingly and
403 * will be immediately returned to the frontier.
404 *
405 * <p> As noted before, this does not ensure that the thread will stop
406 * running (ever). But once evoked it will not try and communicate with
407 * other parts of crawler and will terminate as soon as control is
408 * established.
409 */
410 protected void kill(){
411 this.interrupt();
412 synchronized(this) {
413 if (currentCuri!=null) {
414 currentCuri.setFetchStatus(S_PROCESSING_THREAD_KILLED);
415 controller.getFrontier().finished(currentCuri);
416 }
417 }
418 }
419
420 /***
421 * @return Current step (For debugging/reporting, give abstract step
422 * where this thread is).
423 */
424 public Object getStep() {
425 return step;
426 }
427
428 /***
429 * Is this thread validly processing a URI, not paused, waiting for
430 * a URI, or interrupted?
431 * @return whether thread is actively processing a URI
432 */
433 public boolean isActive() {
434
435 return this.isAlive() && (currentCuri != null) && !isInterrupted();
436 }
437
438 /***
439 * Request that this thread retire (exit cleanly) at the earliest
440 * opportunity.
441 */
442 public void retire() {
443 shouldRetire = true;
444 }
445
446 /***
447 * Whether this thread should cleanly retire at the earliest
448 * opportunity.
449 *
450 * @return True if should retire.
451 */
452 public boolean shouldRetire() {
453 return shouldRetire;
454 }
455
456
457
458
459
460 /***
461 * Compiles and returns a report on its status.
462 * @param name Report name.
463 * @param pw Where to print.
464 */
465 public void reportTo(String name, PrintWriter pw) {
466
467
468 pw.print("[");
469 pw.println(getName());
470
471
472
473
474
475
476
477
478 CrawlURI c = currentCuri;
479 if(c != null) {
480 pw.print(" ");
481 c.singleLineReportTo(pw);
482 pw.print(" ");
483 pw.print(c.getFetchAttempts());
484 pw.print(" attempts");
485 pw.println();
486 pw.print(" ");
487 pw.print("in processor: ");
488 pw.print(currentProcessorName);
489 } else {
490 pw.print(" -no CrawlURI- ");
491 }
492 pw.println();
493
494 long now = System.currentTimeMillis();
495 long time = 0;
496
497 pw.print(" ");
498 if(lastFinishTime > lastStartTime) {
499
500
501 pw.print("WAITING for ");
502 time = now - lastFinishTime;
503 } else if(lastStartTime > 0) {
504
505 pw.print("ACTIVE for ");
506 time = now-lastStartTime;
507 }
508 pw.print(ArchiveUtils.formatMillisecondsToConventional(time));
509 pw.println();
510
511 pw.print(" ");
512 pw.print("step: ");
513 pw.print(step);
514 pw.print(" for ");
515 pw.print(ArchiveUtils.formatMillisecondsToConventional(System.currentTimeMillis()-atStepSince));
516 pw.println();
517
518 StackTraceElement[] ste = this.getStackTrace();
519 for(int i=0;i<ste.length;i++) {
520 pw.print(" ");
521 pw.print(ste[i].toString());
522 pw.println();
523 }
524 pw.print("]");
525 pw.println();
526
527 pw.flush();
528 }
529
530 /***
531 * @param w PrintWriter to write to.
532 */
533 public void singleLineReportTo(PrintWriter w)
534 {
535 w.print("#");
536 w.print(this.serialNumber);
537
538
539
540
541
542
543
544
545 CrawlURI c = currentCuri;
546 if(c != null) {
547 w.print(" ");
548 w.print(currentProcessorName);
549 w.print(" ");
550 w.print(c.toString());
551 w.print(" (");
552 w.print(c.getFetchAttempts());
553 w.print(") ");
554 } else {
555 w.print(" [no CrawlURI] ");
556 }
557
558 long now = System.currentTimeMillis();
559 long time = 0;
560
561 if(lastFinishTime > lastStartTime) {
562
563
564 w.print("WAITING for ");
565 time = now - lastFinishTime;
566 } else if(lastStartTime > 0) {
567
568 w.print("ACTIVE for ");
569 time = now-lastStartTime;
570 }
571 w.print(ArchiveUtils.formatMillisecondsToConventional(time));
572 w.print(" at ");
573 w.print(step);
574 w.print(" for ");
575 w.print(ArchiveUtils.formatMillisecondsToConventional(now-atStepSince));
576 w.print("\n");
577 w.flush();
578 }
579
580
581
582
583 public String singleLineLegend() {
584 return "#serialNumber processorName currentUri (fetchAttempts) threadState threadStep";
585 }
586
587
588
589
590 public String[] getReports() {
591
592 return new String[] {};
593 }
594
595 public void reportTo(PrintWriter writer) {
596 reportTo(null, writer);
597 }
598
599
600
601
602 public String singleLineReport() {
603 return ArchiveUtils.singleLineReport(this);
604 }
605
606 public void progressStatisticsLine(PrintWriter writer) {
607 writer.print(getController().getStatistics()
608 .getProgressStatisticsLine());
609 writer.print("\n");
610 }
611
612 public void progressStatisticsLegend(PrintWriter writer) {
613 writer.print(getController().getStatistics()
614 .progressStatisticsLegend());
615 writer.print("\n");
616 }
617
618 public String getCurrentProcessorName() {
619 return currentProcessorName;
620 }
621 }