1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.crawler.framework;
26
27 import java.io.File;
28 import java.io.FileOutputStream;
29 import java.io.IOException;
30 import java.io.Serializable;
31 import java.text.DecimalFormat;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Timer;
35 import java.util.TimerTask;
36 import java.util.logging.Level;
37 import java.util.logging.Logger;
38
39 import org.archive.crawler.datamodel.Checkpoint;
40 import org.archive.util.ArchiveUtils;
41
42 /***
43 * Runs checkpointing.
44 * Also keeps history of crawl checkpoints Generally used by CrawlController
45 * only but also has static utility methods classes that need to participate in
46 * a checkpoint can use.
47 *
48 * @author gojomo
49 * @author stack
50 */
51 public class Checkpointer implements Serializable {
52 private static final long serialVersionUID = 7610078446694353173L;
53
54 private final static Logger LOGGER =
55 Logger.getLogger(Checkpointer.class.getName());
56
57 private static final String DEFAULT_PREFIX = "";
58
59 /***
60 * String to prefix any new checkpoint names.
61 */
62 private String checkpointPrefix = DEFAULT_PREFIX;
63
64 /***
65 * Next overall series checkpoint number.
66 */
67 private int nextCheckpoint = 1;
68
69 /***
70 * All checkpoint names in chain prior to now. May not all still
71 * exist on disk.
72 */
73 private List predecessorCheckpoints = new LinkedList();
74
75 /***
76 * If a checkpoint has begun, its directory under
77 * <code>checkpointDirectory</code>.
78 */
79 private transient File checkpointInProgressDir = null;
80
81 /***
82 * If the checkpoint in progress has encountered fatal errors.
83 */
84 private transient boolean checkpointErrors = false;
85
86 /***
87 * checkpointThread is set if a checkpoint is currently running.
88 */
89 private transient Thread checkpointThread = null;
90
91 private transient CrawlController controller;
92
93 /***
94 * Setup in constructor or on a call to revovery.
95 */
96 private transient Timer timerThread = null;
97
98 public static final DecimalFormat INDEX_FORMAT = new DecimalFormat("00000");
99
100 /***
101 * Create a new CheckpointContext with the given store directory
102 * @param cc CrawlController instance thats hosting this Checkpointer.
103 * @param checkpointDir Where to store checkpoint.
104 */
105 public Checkpointer(final CrawlController cc, final File checkpointDir) {
106 this(cc, DEFAULT_PREFIX);
107 }
108
109 /***
110 * Create a new CheckpointContext with the given store directory
111 *
112 * @param cc CrawlController instance thats hosting this Checkpointer.
113 * @param prefix Prefix for checkpoint label.
114 */
115 public Checkpointer(final CrawlController cc, final String prefix) {
116 super();
117 initialize(cc, prefix);
118
119 }
120
121 protected void initialize(final CrawlController cc, final String prefix) {
122 this.controller = cc;
123 this.checkpointPrefix = prefix;
124
125 int period = Integer.parseInt(System.getProperties().getProperty(
126 this.getClass().getName() + ".period", "-1"));
127 if (period <= 0) {
128 return;
129 }
130
131 long periodMs = period * (1000 * 60 * 60);
132 TimerTask tt = new TimerTask() {
133 private CrawlController cController = cc;
134 public void run() {
135 if (isCheckpointing()) {
136 LOGGER.info("CheckpointTimerThread skipping checkpoint, " +
137 "already checkpointing: State: " +
138 this.cController.getState());
139 return;
140 }
141 LOGGER.info("TimerThread request checkpoint");
142 this.cController.requestCrawlCheckpoint();
143 }
144 };
145 this.timerThread = new Timer(true);
146 this.timerThread.schedule(tt, periodMs, periodMs);
147 LOGGER.info("Installed Checkpoint TimerThread to checkpoint every " +
148 period + " hour(s).");
149 }
150
151 void cleanup() {
152 if (this.timerThread != null) {
153 LOGGER.info("Cleanedup Checkpoint TimerThread.");
154 this.timerThread.cancel();
155 }
156 }
157
158 /***
159 * @return Returns the nextCheckpoint index.
160 */
161 public int getNextCheckpoint() {
162 return this.nextCheckpoint;
163 }
164
165 /***
166 * Run a checkpoint of the crawler.
167 */
168 public void checkpoint() {
169 String name = "Checkpoint-" + getNextCheckpointName();
170 this.checkpointThread = new CheckpointingThread(name);
171 this.checkpointThread.setDaemon(true);
172 this.checkpointThread.start();
173 }
174
175 /***
176 * Thread to run the checkpointing.
177 * @author stack
178 */
179 public class CheckpointingThread extends Thread {
180 public CheckpointingThread(final String name) {
181 super(name);
182 }
183
184 public CrawlController getController() {
185 return Checkpointer.this.controller;
186 }
187
188 public void run() {
189 LOGGER.info("Started");
190
191
192 final boolean alreadyPaused = getController().isPaused() ||
193 getController().isPausing();
194 try {
195 getController().requestCrawlPause();
196
197 setCheckpointErrors(false);
198 if (!waitOnPaused()) {
199 checkpointFailed("Failed wait for complete pause.");
200 } else {
201 createCheckpointInProgressDirectory();
202 this.getController().checkpoint();
203 }
204 } catch (Exception e) {
205 checkpointFailed(e);
206 } finally {
207 if (!isCheckpointErrors()) {
208 writeValidity();
209 }
210 Checkpointer.this.nextCheckpoint++;
211 clearCheckpointInProgressDirectory();
212 LOGGER.info("Finished");
213 getController().completePause();
214 if (!alreadyPaused) {
215 getController().requestCrawlResume();
216 }
217 }
218 }
219
220 private synchronized boolean waitOnPaused() {
221
222
223 while(!getController().isPaused() && !getController().isRunning()) {
224 try {
225 wait(1000 * 3);
226 } catch (InterruptedException e) {
227
228 }
229 }
230 return getController().isPaused();
231 }
232 }
233
234 protected File createCheckpointInProgressDirectory() {
235 this.checkpointInProgressDir =
236 new File(Checkpointer.this.controller.getCheckpointsDisk(),
237 getNextCheckpointName());
238 this.checkpointInProgressDir.mkdirs();
239 return this.checkpointInProgressDir;
240 }
241
242 protected void clearCheckpointInProgressDirectory() {
243 this.checkpointInProgressDir = null;
244 }
245
246 protected CrawlController getController() {
247 return this.controller;
248 }
249
250 /***
251 * @return next checkpoint name (zero-padding string).
252 */
253 public String getNextCheckpointName() {
254 return formatCheckpointName(this.checkpointPrefix, this.nextCheckpoint);
255 }
256
257 public static String formatCheckpointName(final String prefix,
258 final int index) {
259 return prefix + INDEX_FORMAT.format(index);
260 }
261
262 protected void writeValidity() {
263 File valid = new File(this.checkpointInProgressDir,
264 Checkpoint.VALIDITY_STAMP_FILENAME);
265 try {
266 FileOutputStream fos = new FileOutputStream(valid);
267 fos.write(ArchiveUtils.get14DigitDate().getBytes());
268 fos.close();
269 } catch (IOException e) {
270 valid.delete();
271 }
272 }
273
274 /***
275 * @return Checkpoint directory. Name of the directory is the name of this
276 * current checkpoint. Null if no checkpoint in progress.
277 */
278 public File getCheckpointInProgressDirectory() {
279 return this.checkpointInProgressDir;
280 }
281
282 /***
283 * @return True if a checkpoint is in progress.
284 */
285 public boolean isCheckpointing() {
286 return this.checkpointThread != null && this.checkpointThread.isAlive();
287 }
288
289 /***
290 * Note that a checkpoint failed
291 *
292 * @param e Exception checkpoint failed on.
293 */
294 protected void checkpointFailed(Exception e) {
295 LOGGER.log(Level.WARNING, " Checkpoint failed", e);
296 checkpointFailed();
297 }
298
299 protected void checkpointFailed(final String message) {
300 LOGGER.warning(message);
301 checkpointFailed();
302 }
303
304 protected void checkpointFailed() {
305 this.checkpointErrors = true;
306 }
307
308 /***
309 * @return True if current/last checkpoint failed.
310 */
311 public boolean isCheckpointFailed() {
312 return this.checkpointErrors;
313 }
314
315 /***
316 * @return Return whether this context is at a new crawl, never-
317 * checkpointed state.
318 */
319 public boolean isAtBeginning() {
320 return nextCheckpoint == 1;
321 }
322
323 /***
324 * Call when recovering from a checkpoint.
325 * Call this after instance has been revivifyied post-serialization to
326 * amend counters and directories that effect where checkpoints get stored
327 * from here on out.
328 * @param cc CrawlController instance.
329 */
330 public void recover(final CrawlController cc) {
331
332
333
334 initialize(cc, 'r' + this.checkpointPrefix);
335 }
336
337 /***
338 * @return Returns the predecessorCheckpoints.
339 */
340 public List getPredecessorCheckpoints() {
341 return this.predecessorCheckpoints;
342 }
343
344 protected boolean isCheckpointErrors() {
345 return this.checkpointErrors;
346 }
347
348 protected void setCheckpointErrors(boolean checkpointErrors) {
349 this.checkpointErrors = checkpointErrors;
350 }
351 }