1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.crawler.framework;
26
27 import java.io.DataInputStream;
28 import java.io.DataOutputStream;
29 import java.io.File;
30 import java.io.FileInputStream;
31 import java.io.FileNotFoundException;
32 import java.io.FileOutputStream;
33 import java.io.IOException;
34 import java.io.ObjectInputStream;
35 import java.io.StringWriter;
36 import java.net.InetAddress;
37 import java.net.UnknownHostException;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.logging.Logger;
44
45 import javax.management.AttributeNotFoundException;
46 import javax.management.MBeanException;
47 import javax.management.ReflectionException;
48 import javax.xml.transform.SourceLocator;
49 import javax.xml.transform.Templates;
50 import javax.xml.transform.Transformer;
51 import javax.xml.transform.TransformerConfigurationException;
52 import javax.xml.transform.TransformerException;
53 import javax.xml.transform.TransformerFactory;
54 import javax.xml.transform.stream.StreamResult;
55 import javax.xml.transform.stream.StreamSource;
56
57 import org.archive.crawler.Heritrix;
58 import org.archive.crawler.datamodel.CoreAttributeConstants;
59 import org.archive.crawler.datamodel.CrawlHost;
60 import org.archive.crawler.datamodel.CrawlOrder;
61 import org.archive.crawler.datamodel.CrawlURI;
62 import org.archive.crawler.datamodel.FetchStatusCodes;
63 import org.archive.crawler.deciderules.recrawl.IdenticalDigestDecideRule;
64 import org.archive.crawler.event.CrawlStatusListener;
65 import org.archive.crawler.settings.SimpleType;
66 import org.archive.crawler.settings.StringList;
67 import org.archive.crawler.settings.Type;
68 import org.archive.crawler.settings.XMLSettingsHandler;
69 import org.archive.io.ObjectPlusFilesInputStream;
70 import org.archive.io.WriterPool;
71 import org.archive.io.WriterPoolMember;
72
73 /***
74 * Abstract implementation of a file pool processor.
75 * Subclass to implement for a particular {@link WriterPoolMember} instance.
76 * @author Parker Thompson
77 * @author stack
78 */
79 public abstract class WriterPoolProcessor extends Processor
80 implements CoreAttributeConstants, CrawlStatusListener, FetchStatusCodes {
81 private static final long serialVersionUID = 1L;
82 private final Logger logger = Logger.getLogger(this.getClass().getName());
83
84 /***
85 * Key to use asking settings for file compression value.
86 */
87 public static final String ATTR_COMPRESS = "compress";
88
89 /***
90 * Default as to whether we do compression of files.
91 */
92 public static final boolean DEFAULT_COMPRESS = true;
93
94 /***
95 * Key to use asking settings for file prefix value.
96 */
97 public static final String ATTR_PREFIX = "prefix";
98
99 /***
100 * Key to use asking settings for arc path value.
101 */
102 public static final String ATTR_PATH ="path";
103
104 /***
105 * Key to use asking settings for file suffix value.
106 */
107 public static final String ATTR_SUFFIX = "suffix";
108
109 /***
110 * Key to use asking settings for file max size value.
111 */
112 public static final String ATTR_MAX_SIZE_BYTES = "max-size-bytes";
113
114 /***
115 * Key to get maximum pool size.
116 *
117 * This key is for maximum files active in the pool.
118 */
119 public static final String ATTR_POOL_MAX_ACTIVE = "pool-max-active";
120
121 /***
122 * Key to get maximum wait on pool object before we give up and
123 * throw IOException.
124 */
125 public static final String ATTR_POOL_MAX_WAIT = "pool-max-wait";
126
127 /***
128 * Key for the maximum bytes to write attribute.
129 */
130 public static final String ATTR_MAX_BYTES_WRITTEN =
131 "total-bytes-to-write";
132
133 /***
134 * Key for whether to skip writing records of content-digest repeats
135 */
136 public static final String ATTR_SKIP_IDENTICAL_DIGESTS =
137 "skip-identical-digests";
138
139 /***
140 * CrawlURI annotation indicating no record was written
141 */
142 protected static final String ANNOTATION_UNWRITTEN = "unwritten";
143
144 /***
145 * Default maximum file size.
146 */
147 public abstract long getDefaultMaxFileSize();
148
149 /***
150 * Default path list.
151 *
152 * TODO: Confirm this one gets picked up.
153 */
154 private static final String [] DEFAULT_PATH = {"crawl-store"};
155
156 /***
157 * Reference to pool.
158 */
159 transient private WriterPool pool = null;
160
161 /***
162 * Total number of bytes written to disc.
163 */
164 private long totalBytesWritten = 0;
165
166 /***
167 * Calculate metadata once only.
168 */
169 transient private List<String> cachedMetadata = null;
170
171
172 /***
173 * @param name Name of this processor.
174 */
175 public WriterPoolProcessor(String name) {
176 this(name, "Pool of files processor");
177 }
178
179 /***
180 * @param name Name of this processor.
181 * @param description Description for this processor.
182 */
183 public WriterPoolProcessor(final String name,
184 final String description) {
185 super(name, description);
186 Type e = addElementToDefinition(
187 new SimpleType(ATTR_COMPRESS, "Compress files when " +
188 "writing to disk.", new Boolean(DEFAULT_COMPRESS)));
189 e.setOverrideable(false);
190 e = addElementToDefinition(
191 new SimpleType(ATTR_PREFIX,
192 "File prefix. " +
193 "The text supplied here will be used as a prefix naming " +
194 "writer files. For example if the prefix is 'IAH', " +
195 "then file names will look like " +
196 "IAH-20040808101010-0001-HOSTNAME.arc.gz " +
197 "...if writing ARCs (The prefix will be " +
198 "separated from the date by a hyphen).",
199 WriterPoolMember.DEFAULT_PREFIX));
200 e = addElementToDefinition(
201 new SimpleType(ATTR_SUFFIX, "Suffix to tag onto " +
202 "files. '${HOSTNAME_ADMINPORT}' in the suffix " +
203 "will be replaced with the local hostname and " +
204 "web UI port. '${HOSTNAME}' in the suffix will be " +
205 "replaced with the local hostname. If empty, no "+
206 "suffix will be added.",
207 WriterPoolMember.DEFAULT_SUFFIX));
208 e.setOverrideable(false);
209 e = addElementToDefinition(
210 new SimpleType(ATTR_MAX_SIZE_BYTES, "Max size of each file",
211 new Long(getDefaultMaxFileSize())));
212 e.setOverrideable(false);
213 e = addElementToDefinition(
214 new StringList(ATTR_PATH, "Where to files. " +
215 "Supply absolute or relative path. If relative, files " +
216 "will be written relative to " +
217 "the " + CrawlOrder.ATTR_DISK_PATH + "setting." +
218 " If more than one path specified, we'll round-robin" +
219 " dropping files to each. This setting is safe" +
220 " to change midcrawl (You can remove and add new dirs" +
221 " as the crawler progresses).", getDefaultPath()));
222 e.setOverrideable(false);
223 e = addElementToDefinition(new SimpleType(ATTR_POOL_MAX_ACTIVE,
224 "Maximum active files in pool. " +
225 "This setting cannot be varied over the life of a crawl.",
226 new Integer(WriterPool.DEFAULT_MAX_ACTIVE)));
227 e.setOverrideable(false);
228 e = addElementToDefinition(new SimpleType(ATTR_POOL_MAX_WAIT,
229 "Maximum time to wait on pool element" +
230 " (milliseconds). This setting cannot be varied over the life" +
231 " of a crawl.",
232 new Integer(WriterPool.DEFAULT_MAXIMUM_WAIT)));
233 e.setOverrideable(false);
234 e = addElementToDefinition(new SimpleType(ATTR_MAX_BYTES_WRITTEN,
235 "Total file bytes to write to disk." +
236 " Once the size of all files on disk has exceeded this " +
237 "limit, this processor will stop the crawler. " +
238 "A value of zero means no upper limit.", new Long(0)));
239 e.setOverrideable(false);
240 e.setExpertSetting(true);
241 e = addElementToDefinition(new SimpleType(ATTR_SKIP_IDENTICAL_DIGESTS,
242 "Whether to skip the writing of a record when URI " +
243 "history information is available and indicates the " +
244 "prior fetch had an identical content digest. " +
245 "Default is false.", new Boolean(false)));
246 e.setOverrideable(true);
247 e.setExpertSetting(true);
248 }
249
250 protected String [] getDefaultPath() {
251 return DEFAULT_PATH;
252 }
253
254 public synchronized void initialTasks() {
255
256 getSettingsHandler().getOrder().getController().
257 addCrawlStatusListener(this);
258 setupPool(new AtomicInteger());
259
260 if (getSettingsHandler().getOrder().getController().
261 isCheckpointRecover()) {
262 checkpointRecover();
263 }
264 }
265
266 protected AtomicInteger getSerialNo() {
267 return ((WriterPool)getPool()).getSerialNo();
268 }
269
270 /***
271 * Set up pool of files.
272 */
273 protected abstract void setupPool(final AtomicInteger serialNo);
274
275 /***
276 * Writes a CrawlURI and its associated data to store file.
277 *
278 * Currently this method understands the following uri types: dns, http,
279 * and https.
280 *
281 * @param curi CrawlURI to process.
282 */
283 protected abstract void innerProcess(CrawlURI curi);
284
285 protected void checkBytesWritten() {
286 long max = getMaxToWrite();
287 if (max <= 0) {
288 return;
289 }
290 if (max <= this.totalBytesWritten) {
291 getController().requestCrawlStop("Finished - Maximum bytes (" +
292 Long.toString(max) + ") written");
293 }
294 }
295
296 /***
297 * Whether the given CrawlURI should be written to archive files.
298 * Annotates CrawlURI with a reason for any negative answer.
299 *
300 * @param curi CrawlURI
301 * @return true if URI should be written; false otherwise
302 */
303 protected boolean shouldWrite(CrawlURI curi) {
304
305 if(((Boolean)getUncheckedAttribute(curi, ATTR_SKIP_IDENTICAL_DIGESTS))
306 && IdenticalDigestDecideRule.hasIdenticalDigest(curi)) {
307 curi.addAnnotation(ANNOTATION_UNWRITTEN + ":identicalDigest");
308 return false;
309 }
310 String scheme = curi.getUURI().getScheme().toLowerCase();
311
312 boolean retVal;
313 if (scheme.equals("dns")) {
314 retVal = curi.getFetchStatus() == S_DNS_SUCCESS;
315 } else if (scheme.equals("http") || scheme.equals("https")) {
316 retVal = curi.getFetchStatus() > 0 && curi.isHttpTransaction();
317 } else if (scheme.equals("ftp")) {
318 retVal = curi.getFetchStatus() > 0;
319 } else {
320
321 curi.addAnnotation(ANNOTATION_UNWRITTEN + ":scheme");
322 return false;
323 }
324 if (retVal == false) {
325
326 curi.addAnnotation(ANNOTATION_UNWRITTEN + ":status");
327 return false;
328 }
329 return true;
330 }
331
332 /***
333 * Return IP address of given URI suitable for recording (as in a
334 * classic ARC 5-field header line).
335 *
336 * @param curi CrawlURI
337 * @return String of IP address
338 */
339 protected String getHostAddress(CrawlURI curi) {
340
341 if(curi.getUURI().getScheme().toLowerCase().equals("dns")) {
342 return curi.getString(A_DNS_SERVER_IP_LABEL);
343 }
344
345 CrawlHost h = getController().getServerCache().getHostFor(curi);
346 if (h == null) {
347 throw new NullPointerException("Crawlhost is null for " +
348 curi + " " + curi.getVia());
349 }
350 InetAddress a = h.getIP();
351 if (a == null) {
352 throw new NullPointerException("Address is null for " +
353 curi + " " + curi.getVia() + ". Address " +
354 ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP)?
355 "was never looked up.":
356 (System.currentTimeMillis() - h.getIpFetched()) +
357 " ms ago."));
358 }
359 return h.getIP().getHostAddress();
360 }
361
362 /***
363 * Version of getAttributes that catches and logs exceptions
364 * and returns null if failure to fetch the attribute.
365 * @param name Attribute name.
366 * @return Attribute or null.
367 */
368 public Object getAttributeUnchecked(String name) {
369 Object result = null;
370 try {
371 result = super.getAttribute(name);
372 } catch (AttributeNotFoundException e) {
373 logger.warning(e.getLocalizedMessage());
374 } catch (MBeanException e) {
375 logger.warning(e.getLocalizedMessage());
376 } catch (ReflectionException e) {
377 logger.warning(e.getLocalizedMessage());
378 }
379 return result;
380 }
381
382 /***
383 * Max size we want files to be (bytes).
384 *
385 * Default is ARCConstants.DEFAULT_MAX_ARC_FILE_SIZE. Note that ARC
386 * files will usually be bigger than maxSize; they'll be maxSize + length
387 * to next boundary.
388 * @return ARC maximum size.
389 */
390 public long getMaxSize() {
391 Object obj = getAttributeUnchecked(ATTR_MAX_SIZE_BYTES);
392 return (obj == null)? getDefaultMaxFileSize(): ((Long)obj).longValue();
393 }
394
395 public String getPrefix() {
396 Object obj = getAttributeUnchecked(ATTR_PREFIX);
397 return (obj == null)? WriterPoolMember.DEFAULT_PREFIX: (String)obj;
398 }
399
400 @SuppressWarnings("unchecked")
401 public List<File> getOutputDirs() {
402 Object obj = getAttributeUnchecked(ATTR_PATH);
403 List list = (obj == null)? Arrays.asList(DEFAULT_PATH): (StringList)obj;
404 ArrayList<File> results = new ArrayList<File>();
405 for (Iterator i = list.iterator(); i.hasNext();) {
406 String path = (String)i.next();
407 File f = new File(path);
408 if (!f.isAbsolute()) {
409 f = new File(getController().getDisk(), path);
410 }
411 if (!f.exists()) {
412 try {
413 f.mkdirs();
414 } catch (Exception e) {
415 e.printStackTrace();
416 continue;
417 }
418 }
419 results.add(f);
420 }
421 return results;
422 }
423
424 public boolean isCompressed() {
425 Object obj = getAttributeUnchecked(ATTR_COMPRESS);
426 return (obj == null)? DEFAULT_COMPRESS:
427 ((Boolean)obj).booleanValue();
428 }
429
430 /***
431 * @return Returns the poolMaximumActive.
432 */
433 public int getPoolMaximumActive() {
434 Object obj = getAttributeUnchecked(ATTR_POOL_MAX_ACTIVE);
435 return (obj == null)? WriterPool.DEFAULT_MAX_ACTIVE:
436 ((Integer)obj).intValue();
437 }
438
439 /***
440 * @return Returns the poolMaximumWait.
441 */
442 public int getPoolMaximumWait() {
443 Object obj = getAttributeUnchecked(ATTR_POOL_MAX_WAIT);
444 return (obj == null)? WriterPool.DEFAULT_MAXIMUM_WAIT:
445 ((Integer)obj).intValue();
446 }
447
448 private String getHostname() {
449 String hostname = "localhost.localdomain";
450 try {
451 hostname = InetAddress.getLocalHost().getCanonicalHostName();
452 } catch (UnknownHostException ue) {
453 logger.severe("Failed getHostAddress for this host: " + ue);
454 }
455
456 return hostname;
457 }
458
459 private int getPort() {
460 if (Heritrix.getHttpServer() != null) {
461 return Heritrix.getHttpServer().getPort();
462 } else {
463 return 0;
464 }
465 }
466
467 public String getSuffix() {
468 Object obj = getAttributeUnchecked(ATTR_SUFFIX);
469 String sfx = (obj == null)?
470 WriterPoolMember.DEFAULT_SUFFIX: (String)obj;
471 sfx = sfx.trim();
472 if (sfx.contains(WriterPoolMember.HOSTNAME_ADMINPORT_VARIABLE)
473 || sfx.contains(WriterPoolMember.HOSTNAME_VARIABLE)) {
474 String hostname = getHostname();
475 sfx = sfx.replace(WriterPoolMember.HOSTNAME_ADMINPORT_VARIABLE, hostname + "-" + getPort());
476 sfx = sfx.replace(WriterPoolMember.HOSTNAME_VARIABLE, hostname);
477 }
478 return sfx;
479 }
480
481 public long getMaxToWrite() {
482 Object obj = getAttributeUnchecked(ATTR_MAX_BYTES_WRITTEN);
483 return (obj == null)? 0: ((Long)obj).longValue();
484 }
485
486 public void crawlEnding(String sExitMessage) {
487 }
488
489 public void crawlEnded(String sExitMessage) {
490
491 this.pool.close();
492 }
493
494
495
496
497 public void crawlStarted(String message) {
498
499 }
500
501 protected String getCheckpointStateFile() {
502 return this.getClass().getName() + ".state";
503 }
504
505 public void crawlCheckpoint(File checkpointDir) throws IOException {
506 int serial = getSerialNo().get();
507 if (this.pool.getNumActive() > 0) {
508
509
510
511
512
513 serial = getSerialNo().incrementAndGet();
514 }
515 saveCheckpointSerialNumber(checkpointDir, serial);
516
517 try {
518 this.pool.close();
519 } finally {
520
521 setupPool(new AtomicInteger(serial));
522 }
523 }
524
525 public void crawlPausing(String statusMessage) {
526
527 }
528
529 public void crawlPaused(String statusMessage) {
530
531 }
532
533 public void crawlResuming(String statusMessage) {
534
535 }
536
537 private void readObject(ObjectInputStream stream)
538 throws IOException, ClassNotFoundException {
539 stream.defaultReadObject();
540 ObjectPlusFilesInputStream coistream =
541 (ObjectPlusFilesInputStream)stream;
542 coistream.registerFinishTask( new Runnable() {
543 public void run() {
544 setupPool(new AtomicInteger());
545 }
546 });
547 }
548
549 protected WriterPool getPool() {
550 return pool;
551 }
552
553 protected void setPool(WriterPool pool) {
554 this.pool = pool;
555 }
556
557 protected long getTotalBytesWritten() {
558 return totalBytesWritten;
559 }
560
561 protected void setTotalBytesWritten(long totalBytesWritten) {
562 this.totalBytesWritten = totalBytesWritten;
563 }
564
565 /***
566 * Called out of {@link #initialTasks()} when recovering a checkpoint.
567 * Restore state.
568 */
569 protected void checkpointRecover() {
570 int serialNo = loadCheckpointSerialNumber();
571 if (serialNo != -1) {
572 getSerialNo().set(serialNo);
573 }
574 }
575
576 /***
577 * @return Serial number from checkpoint state file or if unreadable, -1
578 * (Client should check for -1).
579 */
580 protected int loadCheckpointSerialNumber() {
581 int result = -1;
582
583
584
585 File stateFile = new File(getSettingsHandler().getOrder()
586 .getController().getCheckpointRecover().getDirectory(),
587 getCheckpointStateFile());
588 if (!stateFile.exists()) {
589 logger.info(stateFile.getAbsolutePath()
590 + " doesn't exist so cannot restore Writer serial number.");
591 } else {
592 DataInputStream dis = null;
593 try {
594 dis = new DataInputStream(new FileInputStream(stateFile));
595 result = dis.readShort();
596 } catch (FileNotFoundException e) {
597 e.printStackTrace();
598 } catch (IOException e) {
599 e.printStackTrace();
600 } finally {
601 try {
602 if (dis != null) {
603 dis.close();
604 }
605 } catch (IOException e) {
606 e.printStackTrace();
607 }
608 }
609 }
610 return result;
611 }
612
613 protected void saveCheckpointSerialNumber(final File checkpointDir,
614 final int serialNo)
615 throws IOException {
616
617 File f = new File(checkpointDir, getCheckpointStateFile());
618 DataOutputStream dos = new DataOutputStream(new FileOutputStream(f));
619 try {
620 dos.writeShort(serialNo);
621 } finally {
622 dos.close();
623 }
624 }
625
626 /***
627 * Return list of metadatas to add to first arc file metadata record.
628 *
629 * Default is to stylesheet the order file. To specify stylesheet,
630 * override {@link #getFirstrecordStylesheet()}.
631 *
632 * Get xml files from settingshandler. Currently order file is the
633 * only xml file. We're NOT adding seeds to meta data.
634 *
635 * @return List of strings and/or files to add to arc file as metadata or
636 * null.
637 */
638 public synchronized List<String> getMetadata() {
639 if (this.cachedMetadata != null) {
640 return this.cachedMetadata;
641 }
642 return cacheMetadata();
643 }
644
645 protected synchronized List<String> cacheMetadata() {
646
647
648 if (getFirstrecordStylesheet() == null ||
649 getFirstrecordStylesheet().length() == 0) {
650 this.cachedMetadata = new ArrayList<String>(1);
651 this.cachedMetadata.add("");
652 return this.cachedMetadata;
653 }
654
655 List<String> result = null;
656 if (!XMLSettingsHandler.class.isInstance(getSettingsHandler())) {
657 logger.warning("Expected xml settings handler (No warcinfo).");
658
659 return result;
660 }
661
662 XMLSettingsHandler xsh = (XMLSettingsHandler)getSettingsHandler();
663 File orderFile = xsh.getOrderFile();
664 if (!orderFile.exists() || !orderFile.canRead()) {
665 logger.severe("File " + orderFile.getAbsolutePath() +
666 " is does not exist or is not readable.");
667 } else {
668 result = new ArrayList<String>(1);
669 result.add(getFirstrecordBody(orderFile));
670 }
671 this.cachedMetadata = result;
672 return this.cachedMetadata;
673 }
674
675 /***
676 * @preturn Full path to stylesheet (Its read off the CLASSPATH
677 * as resource).
678 */
679 protected String getFirstrecordStylesheet() {
680 return null;
681 }
682
683 /***
684 * Write the arc metadata body content.
685 *
686 * Its based on the order xml file but into this base we'll add other info
687 * such as machine ip.
688 *
689 * @param orderFile Order file.
690
691 *
692 * @return String that holds the arc metaheader body.
693 */
694 protected String getFirstrecordBody(File orderFile) {
695 String result = null;
696 TransformerFactory factory = TransformerFactory.newInstance();
697 Templates templates = null;
698 Transformer xformer = null;
699 try {
700 templates = factory.newTemplates(new StreamSource(
701 this.getClass().getResourceAsStream(getFirstrecordStylesheet())));
702 xformer = templates.newTransformer();
703
704 xformer.setParameter("software", "Heritrix " +
705 Heritrix.getVersion() + " http://crawler.archive.org");
706 xformer.setParameter("ip",
707 InetAddress.getLocalHost().getHostAddress());
708 xformer.setParameter("hostname",
709 InetAddress.getLocalHost().getCanonicalHostName());
710 StreamSource source = new StreamSource(
711 new FileInputStream(orderFile));
712 StringWriter writer = new StringWriter();
713 StreamResult target = new StreamResult(writer);
714 xformer.transform(source, target);
715 result= writer.toString();
716 } catch (TransformerConfigurationException e) {
717 logger.severe("Failed transform " + e);
718 } catch (FileNotFoundException e) {
719 logger.severe("Failed transform, file not found " + e);
720 } catch (UnknownHostException e) {
721 logger.severe("Failed transform, unknown host " + e);
722 } catch(TransformerException e) {
723 SourceLocator locator = e.getLocator();
724 int col = locator.getColumnNumber();
725 int line = locator.getLineNumber();
726 String publicId = locator.getPublicId();
727 String systemId = locator.getSystemId();
728 logger.severe("Transform error " + e + ", col " + col + ", line " +
729 line + ", publicId " + publicId + ", systemId " + systemId);
730 }
731
732 return result;
733 }
734 }