View Javadoc

1   /* Checkpointer
2   *
3   * $Id: Checkpointer.java 4550 2006-08-29 00:19:31Z stack-sf $
4   *
5   * Created on Apr 19, 2004
6   *
7   * Copyright (C) 2004 Internet Archive.
8   *
9   * This file is part of the Heritrix web crawler (crawler.archive.org).
10  *
11  * Heritrix is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser Public License as published by
13  * the Free Software Foundation; either version 2.1 of the License, or
14  * any later version.
15  *
16  * Heritrix is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU Lesser Public License for more details.
20  *
21  * You should have received a copy of the GNU Lesser Public License
22  * along with Heritrix; if not, write to the Free Software
23  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24  */
25  package org.archive.crawler.framework;
26  
27  import java.io.File;
28  import java.io.FileOutputStream;
29  import java.io.IOException;
30  import java.io.Serializable;
31  import java.text.DecimalFormat;
32  import java.util.LinkedList;
33  import java.util.List;
34  import java.util.Timer;
35  import java.util.TimerTask;
36  import java.util.logging.Level;
37  import java.util.logging.Logger;
38  
39  import org.archive.crawler.datamodel.Checkpoint;
40  import org.archive.util.ArchiveUtils;
41  
42  /***
43   * Runs checkpointing.
44   * Also keeps history of crawl checkpoints  Generally used by CrawlController
45   * only but also has static utility methods classes that need to participate in
46   * a checkpoint can use.
47   *
48   * @author gojomo
49   * @author stack
50   */
51  public class Checkpointer implements Serializable {
52      private static final long serialVersionUID = 7610078446694353173L;
53  
54      private final static Logger LOGGER =
55          Logger.getLogger(Checkpointer.class.getName());
56  
57      private static final String DEFAULT_PREFIX = "";
58      
59      /***
60       * String to prefix any new checkpoint names.
61       */
62      private  String checkpointPrefix = DEFAULT_PREFIX;
63      
64      /***
65       * Next  overall series checkpoint number.
66       */
67      private int nextCheckpoint = 1;
68  
69      /***
70       * All checkpoint names in chain prior to now. May not all still
71       * exist on disk.
72       */
73      private List predecessorCheckpoints = new LinkedList();
74  
75      /***
76       * If a checkpoint has begun, its directory under
77       * <code>checkpointDirectory</code>.
78       */
79      private transient File checkpointInProgressDir = null;
80  
81      /***
82       * If the checkpoint in progress has encountered fatal errors.
83       */
84      private transient boolean checkpointErrors = false;
85      
86      /***
87       * checkpointThread is set if a checkpoint is currently running.
88       */
89      private transient Thread checkpointThread = null;
90      
91      private transient CrawlController controller;
92      
93      /***
94       * Setup in constructor or on a call to revovery.
95       */
96      private transient Timer timerThread = null;
97      
98      public static final DecimalFormat INDEX_FORMAT = new DecimalFormat("00000");
99  
100     /***
101      * Create a new CheckpointContext with the given store directory
102      * @param cc CrawlController instance thats hosting this Checkpointer.
103      * @param checkpointDir Where to store checkpoint.
104      */
105     public Checkpointer(final CrawlController cc, final File checkpointDir) {
106         this(cc, DEFAULT_PREFIX);
107     }
108     
109     /***
110      * Create a new CheckpointContext with the given store directory
111      *
112      * @param cc CrawlController instance thats hosting this Checkpointer.
113      * @param prefix Prefix for checkpoint label.
114      */
115     public Checkpointer(final CrawlController cc, final String prefix) {
116         super();
117         initialize(cc, prefix);
118         
119     }
120     
121     protected void initialize(final CrawlController cc, final String prefix) {
122         this.controller = cc;
123         this.checkpointPrefix = prefix;
124         // Period is in hours.
125         int period = Integer.parseInt(System.getProperties().getProperty(
126             this.getClass().getName() + ".period", "-1"));
127         if (period <= 0) {
128             return;
129         }
130         // Convert period from hours to milliseconds.
131         long periodMs = period * (1000 * 60 * 60);
132         TimerTask tt = new TimerTask() {
133             private CrawlController cController = cc;
134             public void run() {
135                 if (isCheckpointing()) {
136                     LOGGER.info("CheckpointTimerThread skipping checkpoint, " +
137                         "already checkpointing: State: " +
138                         this.cController.getState());
139                     return;
140                 }
141                 LOGGER.info("TimerThread request checkpoint");
142                 this.cController.requestCrawlCheckpoint();
143             }
144         };
145         this.timerThread = new Timer(true);
146         this.timerThread.schedule(tt, periodMs, periodMs);
147         LOGGER.info("Installed Checkpoint TimerThread to checkpoint every " +
148             period + " hour(s).");
149     }
150     
151     void cleanup() {
152         if (this.timerThread != null) {
153             LOGGER.info("Cleanedup Checkpoint TimerThread.");
154             this.timerThread.cancel();
155         }
156     }
157     
158     /***
159      * @return Returns the nextCheckpoint index.
160      */
161     public int getNextCheckpoint() {
162         return this.nextCheckpoint;
163     }
164 
165     /***
166      * Run a checkpoint of the crawler.
167      */
168     public void checkpoint() {
169         String name = "Checkpoint-" + getNextCheckpointName();
170         this.checkpointThread = new CheckpointingThread(name);
171         this.checkpointThread.setDaemon(true);
172         this.checkpointThread.start();
173     }
174 
175     /***
176      * Thread to run the checkpointing.
177      * @author stack
178      */
179     public class CheckpointingThread extends Thread {
180         public CheckpointingThread(final String name) {
181             super(name);
182         }
183 
184         public CrawlController getController() {
185         	return Checkpointer.this.controller;
186         }
187         
188         public void run() {
189             LOGGER.info("Started");
190             // If crawler already paused, don't resume crawling after
191             // finishing checkpointing.
192             final boolean alreadyPaused = getController().isPaused() ||
193                 getController().isPausing();
194             try {
195                 getController().requestCrawlPause();
196                 // Clear any checkpoint errors.
197                 setCheckpointErrors(false);
198                 if (!waitOnPaused()) {
199                     checkpointFailed("Failed wait for complete pause.");
200                 } else {
201                     createCheckpointInProgressDirectory();
202                     this.getController().checkpoint();
203                 }
204             } catch (Exception e) {
205                 checkpointFailed(e);
206             } finally {
207                 if (!isCheckpointErrors()) {
208                     writeValidity();
209                 }
210                 Checkpointer.this.nextCheckpoint++;
211                 clearCheckpointInProgressDirectory();
212                 LOGGER.info("Finished");
213                 getController().completePause();
214                 if (!alreadyPaused) {
215                     getController().requestCrawlResume();
216                 }
217             }
218         }
219         
220         private synchronized boolean waitOnPaused() {
221             // If we're paused we can exit but also exit if the crawl has been
222             // resumed by the operator.
223             while(!getController().isPaused() && !getController().isRunning()) {
224                 try {
225                     wait(1000 * 3);
226                 } catch (InterruptedException e) {
227                     // May be for us.
228                 }
229             }
230             return getController().isPaused();
231         }
232     }
233     
234     protected File createCheckpointInProgressDirectory() {
235         this.checkpointInProgressDir =
236             new File(Checkpointer.this.controller.getCheckpointsDisk(),
237                 getNextCheckpointName());
238         this.checkpointInProgressDir.mkdirs();
239         return this.checkpointInProgressDir;
240     }
241     
242     protected void clearCheckpointInProgressDirectory() {
243         this.checkpointInProgressDir = null;
244     }
245     
246     protected CrawlController getController() {
247         return this.controller;
248     }
249     
250     /***
251      * @return next checkpoint name (zero-padding string).
252      */
253     public String getNextCheckpointName() {
254         return formatCheckpointName(this.checkpointPrefix, this.nextCheckpoint);
255     }
256     
257     public static String formatCheckpointName(final String prefix,
258     		final int index) {
259     	return prefix + INDEX_FORMAT.format(index);
260     }
261 
262     protected void writeValidity() {
263         File valid = new File(this.checkpointInProgressDir,
264             Checkpoint.VALIDITY_STAMP_FILENAME);
265         try {
266             FileOutputStream fos = new FileOutputStream(valid);
267             fos.write(ArchiveUtils.get14DigitDate().getBytes());
268             fos.close();
269         } catch (IOException e) {
270             valid.delete();
271         }
272     }
273 
274     /***
275      * @return Checkpoint directory. Name of the directory is the name of this
276      * current checkpoint.  Null if no checkpoint in progress.
277      */
278     public File getCheckpointInProgressDirectory() {
279         return this.checkpointInProgressDir;
280     }
281     
282     /***
283      * @return True if a checkpoint is in progress.
284      */
285     public boolean isCheckpointing() {
286         return this.checkpointThread != null && this.checkpointThread.isAlive();
287     }
288 
289     /***
290      * Note that a checkpoint failed
291      *
292      * @param e Exception checkpoint failed on.
293      */
294     protected void checkpointFailed(Exception e) {
295         LOGGER.log(Level.WARNING, " Checkpoint failed", e);
296         checkpointFailed();
297     }
298     
299     protected void checkpointFailed(final String message) {
300         LOGGER.warning(message);
301         checkpointFailed();
302     }
303     
304     protected void checkpointFailed() {
305         this.checkpointErrors = true;
306     }
307     
308     /***
309      * @return True if current/last checkpoint failed.
310      */
311     public boolean isCheckpointFailed() {
312         return this.checkpointErrors;
313     }
314 
315     /***
316      * @return Return whether this context is at a new crawl, never-
317      * checkpointed state.
318      */
319     public boolean isAtBeginning() {
320         return nextCheckpoint == 1;
321     }
322 
323     /***
324      * Call when recovering from a checkpoint.
325      * Call this after instance has been revivifyied post-serialization to
326      * amend counters and directories that effect where checkpoints get stored
327      * from here on out.
328      * @param cc CrawlController instance.
329      */
330     public void recover(final CrawlController cc) {
331         // Prepend the checkpoint name with a little 'r' so we tell apart
332         // checkpoints made from a recovery.  Allow for there being
333         // multiple 'r' prefixes.
334         initialize(cc, 'r' + this.checkpointPrefix);
335     }
336     
337     /***
338      * @return Returns the predecessorCheckpoints.
339      */
340     public List getPredecessorCheckpoints() {
341         return this.predecessorCheckpoints;
342     }
343 
344     protected boolean isCheckpointErrors() {
345         return this.checkpointErrors;
346     }
347 
348     protected void setCheckpointErrors(boolean checkpointErrors) {
349         this.checkpointErrors = checkpointErrors;
350     }
351 }