View Javadoc

1   /* Copyright (C) 2003 Internet Archive.
2    *
3    * This file is part of the Heritrix web crawler (crawler.archive.org).
4    *
5    * Heritrix is free software; you can redistribute it and/or modify
6    * it under the terms of the GNU Lesser Public License as published by
7    * the Free Software Foundation; either version 2.1 of the License, or
8    * any later version.
9    *
10   * Heritrix is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Lesser Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser Public License
16   * along with Heritrix; if not, write to the Free Software
17   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18   *
19   * ToeThread.java
20   * Created on May 14, 2003
21   *
22   * $Header$
23   */
24  package org.archive.crawler.framework;
25  
26  import java.io.PrintWriter;
27  import java.util.HashMap;
28  import java.util.concurrent.atomic.AtomicInteger;
29  import java.util.logging.Level;
30  import java.util.logging.Logger;
31  
32  import org.archive.crawler.datamodel.CoreAttributeConstants;
33  import org.archive.crawler.datamodel.CrawlOrder;
34  import org.archive.crawler.datamodel.CrawlURI;
35  import org.archive.crawler.datamodel.FetchStatusCodes;
36  import org.archive.crawler.datamodel.InstancePerThread;
37  import org.archive.crawler.framework.exceptions.EndedException;
38  import org.archive.util.ArchiveUtils;
39  import org.archive.util.DevUtils;
40  import org.archive.util.HttpRecorder;
41  import org.archive.util.HttpRecorderMarker;
42  import org.archive.util.ProgressStatisticsReporter;
43  import org.archive.util.Reporter;
44  
45  import com.sleepycat.util.RuntimeExceptionWrapper;
46  
47  /***
48   * One "worker thread"; asks for CrawlURIs, processes them,
49   * repeats unless told otherwise.
50   *
51   * @author Gordon Mohr
52   */
53  public class ToeThread extends Thread
54  implements CoreAttributeConstants, FetchStatusCodes, HttpRecorderMarker,
55  Reporter, ProgressStatisticsReporter {
56      private static final String STEP_NASCENT = "NASCENT";
57      private static final String STEP_ABOUT_TO_GET_URI = "ABOUT_TO_GET_URI";
58      private static final String STEP_FINISHED = "FINISHED";
59      private static final String STEP_ABOUT_TO_BEGIN_CHAIN =
60          "ABOUT_TO_BEGIN_CHAIN";
61      private static final String STEP_ABOUT_TO_BEGIN_PROCESSOR =
62          "ABOUT_TO_BEGIN_PROCESSOR";
63      private static final String STEP_DONE_WITH_PROCESSORS =
64          "DONE_WITH_PROCESSORS";
65      private static final String STEP_HANDLING_RUNTIME_EXCEPTION =
66          "HANDLING_RUNTIME_EXCEPTION";
67      private static final String STEP_ABOUT_TO_RETURN_URI =
68          "ABOUT_TO_RETURN_URI";
69      private static final String STEP_FINISHING_PROCESS = "FINISHING_PROCESS";
70  
71      private static Logger logger =
72          Logger.getLogger("org.archive.crawler.framework.ToeThread");
73  
74      private CrawlController controller;
75      private int serialNumber;
76      
77      /***
78       * Each ToeThead has an instance of HttpRecord that gets used
79       * over and over by each request.
80       * 
81       * @see org.archive.util.HttpRecorderMarker
82       */
83      private HttpRecorder httpRecorder = null;
84      
85      private HashMap<String,Processor> localProcessors
86       = new HashMap<String,Processor>();
87      private String currentProcessorName = "";
88  
89      private String coreName;
90      private CrawlURI currentCuri;
91      private long lastStartTime;
92      private long lastFinishTime;
93  
94      // activity monitoring, debugging, and problem detection
95      private String step = STEP_NASCENT;
96      private long atStepSince;
97      
98      // default priority; may not be meaningful in recent JVMs
99      private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY-2;
100     
101     // indicator that a thread is now surplus based on current desired
102     // count; it should wrap up cleanly
103     private volatile boolean shouldRetire = false;
104     
105     /***
106      * Create a ToeThread
107      * 
108      * @param g ToeThreadGroup
109      * @param sn serial number
110      */
111     public ToeThread(ToePool g, int sn) {
112         // TODO: add crawl name?
113         super(g,"ToeThread #" + sn);
114         coreName="ToeThread #" + sn + ": ";
115         controller = g.getController();
116         serialNumber = sn;
117         setPriority(DEFAULT_PRIORITY);
118         int outBufferSize = ((Integer) controller
119                 .getOrder()
120                 .getUncheckedAttribute(null,CrawlOrder.ATTR_RECORDER_OUT_BUFFER))
121                         .intValue();
122         int inBufferSize = ((Integer) controller
123                 .getOrder()
124                 .getUncheckedAttribute(null, CrawlOrder.ATTR_RECORDER_IN_BUFFER))
125                 .intValue();  
126         httpRecorder = new HttpRecorder(controller.getScratchDisk(),
127             "tt" + sn + "http", outBufferSize, inBufferSize);
128         lastFinishTime = System.currentTimeMillis();
129     }
130 
131     /*** (non-Javadoc)
132      * @see java.lang.Thread#run()
133      */
134     public void run() {
135         String name = controller.getOrder().getCrawlOrderName();
136         logger.fine(getName()+" started for order '"+name+"'");
137 
138         try {
139             controller.getLoopingToes().incrementAndGet();
140             
141             while ( true ) {
142                 // TODO check for thread-abort? or is waiting for interrupt enough?
143                 continueCheck();
144                 
145                 setStep(STEP_ABOUT_TO_GET_URI);
146                 
147                 CrawlURI curi = controller.getFrontier().next();
148                 
149                 synchronized(this) {
150                     continueCheck();
151                     setCurrentCuri(curi);
152                 }
153                 
154                 processCrawlUri();
155                 
156                 setStep(STEP_ABOUT_TO_RETURN_URI);
157                 continueCheck();
158 
159                 synchronized(this) {
160                     controller.getFrontier().finished(currentCuri);
161                     setCurrentCuri(null);
162                 }
163                 
164                 setStep(STEP_FINISHING_PROCESS);
165                 lastFinishTime = System.currentTimeMillis();
166                 controller.releaseContinuePermission();
167                 if(shouldRetire) {
168                     break; // from while(true)
169                 }
170             }
171         } catch (EndedException e) {
172             // crawl ended (or thread was retired), so allow thread to end
173         } catch (Exception e) {
174             // everything else (including interruption)
175             logger.log(Level.SEVERE,"Fatal exception in "+getName(),e);
176         } catch (OutOfMemoryError err) {
177             seriousError(err);
178         } finally {
179             controller.getLoopingToes().decrementAndGet();
180             controller.releaseContinuePermission();
181         }
182         setCurrentCuri(null);
183         // Do cleanup so that objects can be GC.
184         this.httpRecorder.closeRecorders();
185         this.httpRecorder = null;
186         localProcessors = null;
187 
188         logger.fine(getName()+" finished for order '"+name+"'");
189         setStep(STEP_FINISHED);
190         controller.toeEnded();
191         controller = null;
192     }
193 
194     /***
195      * Set currentCuri, updating thread name as appropriate
196      * @param curi
197      */
198     private void setCurrentCuri(CrawlURI curi) {
199         if(curi==null) {
200             setName(coreName);
201         } else {
202             setName(coreName+curi);
203         }
204         currentCuri = curi;
205     }
206 
207     /***
208      * @param s
209      */
210     private void setStep(String s) {
211         step=s;
212         atStepSince = System.currentTimeMillis();
213     }
214 
215 	private void seriousError(Error err) {
216 	    // try to prevent timeslicing until we have a chance to deal with OOM
217         // TODO: recognize that new JVM priority indifference may make this
218         // priority-jumbling pointless
219         setPriority(DEFAULT_PRIORITY+1);  
220         if (controller!=null) {
221             // hold all ToeThreads from proceeding to next processor
222             controller.singleThreadMode();
223             // TODO: consider if SoftReferences would be a better way to 
224             // engineer a soft-landing for low-memory conditions
225             controller.freeReserveMemory();
226             controller.requestCrawlPause();
227             if (controller.getFrontier().getFrontierJournal() != null) {
228                 controller.getFrontier().getFrontierJournal().seriousError(
229                     getName() + err.getMessage());
230             }
231         }
232         
233         // OutOfMemory etc.
234         String extraInfo = DevUtils.extraInfo();
235         System.err.println("<<<");
236         System.err.println(ArchiveUtils.getLog17Date());
237         System.err.println(err);
238         System.err.println(extraInfo);
239         err.printStackTrace(System.err);
240         
241         if (controller!=null) {
242             PrintWriter pw = new PrintWriter(System.err);
243             controller.getToePool().compactReportTo(pw);
244             pw.flush();
245         }
246         System.err.println(">>>");
247 //        DevUtils.sigquitSelf();
248         
249         String context = "unknown";
250 		if(currentCuri!=null) {
251             // update fetch-status, saving original as annotation
252             currentCuri.addAnnotation("err="+err.getClass().getName());
253             currentCuri.addAnnotation("os"+currentCuri.getFetchStatus());
254 			currentCuri.setFetchStatus(S_SERIOUS_ERROR);
255             context = currentCuri.singleLineReport() + " in " + currentProcessorName;
256 		}
257         String message = "Serious error occured trying " +
258             "to process '" + context + "'\n" + extraInfo;
259         logger.log(Level.SEVERE, message.toString(), err);
260         setPriority(DEFAULT_PRIORITY);
261 	}
262 
263 	/***
264      * Perform checks as to whether normal execution should proceed.
265      * 
266      * If an external interrupt is detected, throw an interrupted exception.
267      * Used before anything that should not be attempted by a 'zombie' thread
268      * that the Frontier/Crawl has given up on.
269      * 
270      * Otherwise, if the controller's memoryGate has been closed,
271      * hold until it is opened. (Provides a better chance of 
272      * being able to complete some tasks after an OutOfMemoryError.)
273      *
274      * @throws InterruptedException
275      */
276     private void continueCheck() throws InterruptedException {
277         if(Thread.interrupted()) {
278             throw new InterruptedException("die request detected");
279         }
280         controller.acquireContinuePermission();
281     }
282 
283     /***
284      * Pass the CrawlURI to all appropriate processors
285      *
286      * @throws InterruptedException
287      */
288     private void processCrawlUri() throws InterruptedException {
289         currentCuri.setThreadNumber(this.serialNumber);
290         currentCuri.setNextProcessorChain(controller.getFirstProcessorChain());
291         lastStartTime = System.currentTimeMillis();
292 //        System.out.println(currentCuri);
293         try {
294             while (currentCuri.nextProcessorChain() != null) {
295                 setStep(STEP_ABOUT_TO_BEGIN_CHAIN);
296                 // Starting on a new processor chain.
297                 currentCuri.setNextProcessor(currentCuri.nextProcessorChain().getFirstProcessor());
298                 currentCuri.setNextProcessorChain(currentCuri.nextProcessorChain().getNextProcessorChain());
299 
300                 while (currentCuri.nextProcessor() != null) {
301                     setStep(STEP_ABOUT_TO_BEGIN_PROCESSOR);
302                     Processor currentProcessor = getProcessor(currentCuri.nextProcessor());
303                     currentProcessorName = currentProcessor.getName();
304                     continueCheck();
305 //                    long memBefore = (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())/1024;
306                     currentProcessor.process(currentCuri);
307 //                    long memAfter = (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())/1024;
308 //                    System.out.println((memAfter-memBefore)+"K in "+currentProcessorName);
309                 }
310             }
311             setStep(STEP_DONE_WITH_PROCESSORS);
312             currentProcessorName = "";
313         } catch (RuntimeExceptionWrapper e) {
314             // Workaround to get cause from BDB
315             if(e.getCause() == null) {
316                 e.initCause(e.getCause());
317             }
318             recoverableProblem(e);
319         } catch (AssertionError ae) {
320             // This risks leaving crawl in fatally inconsistent state, 
321             // but is often reasonable for per-Processor assertion problems 
322             recoverableProblem(ae);
323         } catch (RuntimeException e) {
324             recoverableProblem(e);
325         } catch (StackOverflowError err) {
326             recoverableProblem(err);
327         } catch (Error err) {
328             // OutOfMemory and any others
329             seriousError(err); 
330         }
331     }
332 
333 
334     /***
335      * Handling for exceptions and errors that are possibly recoverable.
336      * 
337      * @param e
338      */
339     private void recoverableProblem(Throwable e) {
340         Object previousStep = step;
341         setStep(STEP_HANDLING_RUNTIME_EXCEPTION);
342         e.printStackTrace(System.err);
343         currentCuri.setFetchStatus(S_RUNTIME_EXCEPTION);
344         // store exception temporarily for logging
345         currentCuri.addAnnotation("err="+e.getClass().getName());
346         currentCuri.putObject(A_RUNTIME_EXCEPTION, e);
347         String message = "Problem " + e + 
348                 " occured when trying to process '"
349                 + currentCuri.toString()
350                 + "' at step " + previousStep 
351                 + " in " + currentProcessorName +"\n";
352         logger.log(Level.SEVERE, message.toString(), e);
353     }
354 
355     private Processor getProcessor(Processor processor) {
356         if(!(processor instanceof InstancePerThread)) {
357             // just use the shared Processor
358              return processor;
359         }
360         // must use local copy of processor
361         Processor localProcessor = (Processor) localProcessors.get(
362                     processor.getClass().getName());
363         if (localProcessor == null) {
364             localProcessor = processor.spawn(this.getSerialNumber());
365             localProcessors.put(processor.getClass().getName(),localProcessor);
366         }
367         return localProcessor;
368     }
369 
370     /***
371      * @return Return toe thread serial number.
372      */
373     public int getSerialNumber() {
374         return this.serialNumber;
375     }
376 
377     /***
378      * Used to get current threads HttpRecorder instance.
379      * Implementation of the HttpRecorderMarker interface.
380      * @return Returns instance of HttpRecorder carried by this thread.
381      * @see org.archive.util.HttpRecorderMarker#getHttpRecorder()
382      */
383     public HttpRecorder getHttpRecorder() {
384         return this.httpRecorder;
385     }
386     
387     /*** Get the CrawlController acossiated with this thread.
388      *
389      * @return Returns the CrawlController.
390      */
391     public CrawlController getController() {
392         return controller;
393     }
394 
395     /***
396      * Terminates a thread.
397      *
398      * <p> Calling this method will ensure that the current thread will stop
399      * processing as soon as possible (note: this may be never). Meant to
400      * 'short circuit' hung threads.
401      *
402      * <p> Current crawl uri will have its fetch status set accordingly and
403      * will be immediately returned to the frontier.
404      *
405      * <p> As noted before, this does not ensure that the thread will stop
406      * running (ever). But once evoked it will not try and communicate with
407      * other parts of crawler and will terminate as soon as control is
408      * established.
409      */
410     protected void kill(){
411         this.interrupt();
412         synchronized(this) {
413             if (currentCuri!=null) {
414                 currentCuri.setFetchStatus(S_PROCESSING_THREAD_KILLED);
415                 controller.getFrontier().finished(currentCuri);
416              }
417         }
418     }
419 
420 	/***
421 	 * @return Current step (For debugging/reporting, give abstract step
422      * where this thread is).
423 	 */
424 	public Object getStep() {
425 		return step;
426 	}
427 
428     /***
429      * Is this thread validly processing a URI, not paused, waiting for 
430      * a URI, or interrupted?
431      * @return whether thread is actively processing a URI
432      */
433     public boolean isActive() {
434         // if alive and not waiting in/for frontier.next(), we're 'active'
435         return this.isAlive() && (currentCuri != null) && !isInterrupted();
436     }
437     
438     /***
439      * Request that this thread retire (exit cleanly) at the earliest
440      * opportunity.
441      */
442     public void retire() {
443         shouldRetire = true;
444     }
445 
446     /***
447      * Whether this thread should cleanly retire at the earliest 
448      * opportunity. 
449      * 
450      * @return True if should retire.
451      */
452     public boolean shouldRetire() {
453         return shouldRetire;
454     }
455 
456     //
457     // Reporter implementation
458     // 
459     
460     /***
461      * Compiles and returns a report on its status.
462      * @param name Report name.
463      * @param pw Where to print.
464      */
465     public void reportTo(String name, PrintWriter pw) {
466         // name is ignored for now: only one kind of report
467         
468         pw.print("[");
469         pw.println(getName());
470 
471         // Make a local copy of the currentCuri reference in case it gets
472         // nulled while we're using it.  We're doing this because
473         // alternative is synchronizing and we don't want to do this --
474         // it causes hang ups as controller waits on a lock for this thread,
475         // something it gets easily enough on old threading model but something
476         // it can wait interminably for on NPTL threading model.
477         // See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM.
478         CrawlURI c = currentCuri;
479         if(c != null) {
480             pw.print(" ");
481             c.singleLineReportTo(pw);
482             pw.print("    ");
483             pw.print(c.getFetchAttempts());
484             pw.print(" attempts");
485             pw.println();
486             pw.print("    ");
487             pw.print("in processor: ");
488             pw.print(currentProcessorName);
489         } else {
490             pw.print(" -no CrawlURI- ");
491         }
492         pw.println();
493 
494         long now = System.currentTimeMillis();
495         long time = 0;
496 
497         pw.print("    ");
498         if(lastFinishTime > lastStartTime) {
499             // That means we finished something after we last started something
500             // or in other words we are not working on anything.
501             pw.print("WAITING for ");
502             time = now - lastFinishTime;
503         } else if(lastStartTime > 0) {
504             // We are working on something
505             pw.print("ACTIVE for ");
506             time = now-lastStartTime;
507         }
508         pw.print(ArchiveUtils.formatMillisecondsToConventional(time));
509         pw.println();
510 
511         pw.print("    ");
512         pw.print("step: ");
513         pw.print(step);
514         pw.print(" for ");
515         pw.print(ArchiveUtils.formatMillisecondsToConventional(System.currentTimeMillis()-atStepSince));
516         pw.println();
517 
518         StackTraceElement[] ste = this.getStackTrace();
519         for(int i=0;i<ste.length;i++) {
520             pw.print("    ");
521             pw.print(ste[i].toString());
522             pw.println();
523         }
524         pw.print("]");
525         pw.println();
526         
527         pw.flush();
528     }
529 
530     /***
531      * @param w PrintWriter to write to.
532      */
533     public void singleLineReportTo(PrintWriter w)
534     {
535         w.print("#");
536         w.print(this.serialNumber);
537 
538         // Make a local copy of the currentCuri reference in case it gets
539         // nulled while we're using it.  We're doing this because
540         // alternative is synchronizing and we don't want to do this --
541         // it causes hang ups as controller waits on a lock for this thread,
542         // something it gets easily enough on old threading model but something
543         // it can wait interminably for on NPTL threading model.
544         // See [ 994946 ] Pause/Terminate ignored on 2.6 kernel 1.5 JVM.
545         CrawlURI c = currentCuri;
546         if(c != null) {
547             w.print(" ");
548             w.print(currentProcessorName);
549             w.print(" ");
550             w.print(c.toString());
551             w.print(" (");
552             w.print(c.getFetchAttempts());
553             w.print(") ");
554         } else {
555             w.print(" [no CrawlURI] ");
556         }
557         
558         long now = System.currentTimeMillis();
559         long time = 0;
560 
561         if(lastFinishTime > lastStartTime) {
562             // That means we finished something after we last started something
563             // or in other words we are not working on anything.
564             w.print("WAITING for ");
565             time = now - lastFinishTime;
566         } else if(lastStartTime > 0) {
567             // We are working on something
568             w.print("ACTIVE for ");
569             time = now-lastStartTime;
570         }
571         w.print(ArchiveUtils.formatMillisecondsToConventional(time));
572         w.print(" at ");
573         w.print(step);
574         w.print(" for ");
575         w.print(ArchiveUtils.formatMillisecondsToConventional(now-atStepSince));
576         w.print("\n");
577         w.flush();
578     }
579 
580     /* (non-Javadoc)
581      * @see org.archive.util.Reporter#singleLineLegend()
582      */
583     public String singleLineLegend() {
584         return "#serialNumber processorName currentUri (fetchAttempts) threadState threadStep";
585     }
586     
587     /* (non-Javadoc)
588      * @see org.archive.util.Reporter#getReports()
589      */
590     public String[] getReports() {
591         // for now none but the default
592         return new String[] {};
593     }
594 
595     public void reportTo(PrintWriter writer) {
596         reportTo(null, writer);
597     }
598 
599     /* (non-Javadoc)
600      * @see org.archive.util.Reporter#singleLineReport()
601      */
602     public String singleLineReport() {
603         return ArchiveUtils.singleLineReport(this);
604     }
605 
606     public void progressStatisticsLine(PrintWriter writer) {
607         writer.print(getController().getStatistics()
608             .getProgressStatisticsLine());
609         writer.print("\n");
610     }
611 
612     public void progressStatisticsLegend(PrintWriter writer) {
613         writer.print(getController().getStatistics()
614             .progressStatisticsLegend());
615         writer.print("\n");
616     }
617     
618     public String getCurrentProcessorName() {
619         return currentProcessorName;
620     }
621 }