1   /* $Id: ArchiveReader.java 6847 2010-04-26 21:49:27Z szznax $
2    *
3    * Created on August 21st, 2006
4    *
5    * Copyright (C) 2006 Internet Archive.
6    *
7    * This file is part of the Heritrix web crawler (crawler.archive.org).
8    *
9    * Heritrix is free software; you can redistribute it and/or modify
10   * it under the terms of the GNU Lesser Public License as published by
11   * the Free Software Foundation; either version 2.1 of the License, or
12   * any later version.
13   *
14   * Heritrix is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   * GNU Lesser Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser Public License
20   * along with Heritrix; if not, write to the Free Software
21   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22   */
23  package org.archive.io;
24  
25  import it.unimi.dsi.fastutil.io.RepositionableStream;
26  
27  import java.io.BufferedInputStream;
28  import java.io.BufferedWriter;
29  import java.io.EOFException;
30  import java.io.File;
31  import java.io.FileOutputStream;
32  import java.io.IOException;
33  import java.io.InputStream;
34  import java.io.OutputStreamWriter;
35  import java.util.ArrayList;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.logging.Level;
39  import java.util.logging.Logger;
40  
41  import org.apache.commons.cli.Option;
42  import org.apache.commons.cli.Options;
43  import org.archive.util.MimetypeUtils;
44  
45  
46  /***
47   * Reader for an Archive file of Archive {@link ArchiveRecord}s.
48   * @author stack
49   * @version $Date: 2010-04-26 21:49:27 +0000 (Mon, 26 Apr 2010) $ $Version$
50   */
51  public abstract class ArchiveReader implements ArchiveFileConstants {    
52      /***
53       * Is this Archive file compressed?
54       */
55      private boolean compressed = false;
56      
57      /***
58       * Should we digest as we read?
59       */
60      private boolean digest = true;
61      
62      /***
63       * Should the parse be strict?
64       */
65      private boolean strict = false;
66      
67      /***
68       * Archive file input stream.
69       *
70       * Keep it around so we can close it when done.
71       *
72       * <p>Set in constructor. Must support {@link RepositionableStream}
73       * interface.  Make it protected so subclasses have access.
74       */
75      private InputStream in = null;
76      
77      /***
78       * Maximum amount of recoverable exceptions in a row.
79       * If more than this amount in a row, we'll let out the exception rather
80       * than go back in for yet another retry.
81       */
82      public static final int MAX_ALLOWED_RECOVERABLES = 10;
83      
84  
85      /***
86       * The Record currently being read.
87       *
88       * Keep this ongoing reference so we'll close the record even if the caller
89       * doesn't.
90       */
91      private ArchiveRecord currentRecord = null;
92      
93      /***
94       * Descriptive string for the Archive file we're going against:
95       * full path, url, etc. -- depends on context in which file was made.
96       */
97      private String identifier = null;
98      
99      /***
100      * Archive file version.
101      */
102     private String version = null;
103     
104     
105     protected ArchiveReader() {
106         super();
107     }
108     
109     /***
110      * Convenience method used by subclass constructors.
111      * @param i Identifier for Archive file this reader goes against.
112      */
113     protected void initialize(final String i) {
114         setReaderIdentifier(i);
115     }
116     
117     /***
118      * Convenience method for constructors.
119      * 
120      * @param f File to read.
121      * @param offset Offset at which to start reading.
122      * @return InputStream to read from.
123      * @throws IOException If failed open or fail to get a memory
124      * mapped byte buffer on file.
125      */
126     protected InputStream getInputStream(final File f, final long offset)
127     throws IOException {
128         return new RandomAccessBufferedInputStream(
129             new RandomAccessInputStream(f, offset));
130     }
131 
132     public boolean isCompressed() {
133         return this.compressed;
134     }
135 
136     /***
137      * Get record at passed <code>offset</code>.
138      * 
139      * @param offset Byte index into file at which a record starts.
140      * @return An Archive Record reference.
141      * @throws IOException
142      */
143     public ArchiveRecord get(long offset) throws IOException {
144         cleanupCurrentRecord();
145         RepositionableStream ps = (RepositionableStream)this.in;
146         long currentOffset = ps.position();
147         if (currentOffset != offset) {
148             currentOffset = offset;
149             ps.position(offset);
150         }
151         return createArchiveRecord(this.in, currentOffset);
152     }
153     
154     /***
155      * @return Return Archive Record created against current offset.
156      * @throws IOException
157      */
158     public ArchiveRecord get() throws IOException {
159         return createArchiveRecord(this.in,
160             ((RepositionableStream)this.in).position());
161     }
162 
163     public void close() throws IOException {
164         if (this.in != null) {
165             this.in.close();
166             this.in = null;
167         }
168     }
169     
170     /***
171      * Rewinds stream to start of the Archive file.
172      * @throws IOException if stream is not resettable.
173      */
174     protected void rewind() throws IOException {
175         cleanupCurrentRecord();
176         if (this.in instanceof RepositionableStream) {
177             try {
178                 ((RepositionableStream)this.in).position(0);
179             } catch (IOException e) {
180                 throw new RuntimeException(e);
181             }
182        } else {
183            throw new IOException("Stream is not resettable.");
184        }
185     }
186     
187     /***
188      * Cleanout the current record if there is one.
189      * @throws IOException
190      */
191     protected void cleanupCurrentRecord() throws IOException {
192         if (this.currentRecord != null) {
193             this.currentRecord.close();
194             gotoEOR(this.currentRecord);
195             this.currentRecord = null;
196         }
197     }
198     
199     /***
200      * Return an Archive Record homed on <code>offset</code> into
201      * <code>is</code>.
202      * @param is Stream to read Record from.
203      * @param offset Offset to find Record at.
204      * @return ArchiveRecord instance.
205      * @throws IOException
206      */
207     protected abstract ArchiveRecord createArchiveRecord(InputStream is,
208     	long offset)
209     throws IOException;
210     
211     /***
212      * Skip over any trailing new lines at end of the record so we're lined up
213      * ready to read the next.
214      * @param record
215      * @throws IOException
216      */
217     protected abstract void gotoEOR(ArchiveRecord record) throws IOException;
218     
219     public abstract String getFileExtension();
220     public abstract String getDotFileExtension();
221 
222     /***
223      * @return Version of this Archive file.
224      */
225     public String getVersion() {
226     	return this.version;
227     }
228 
229     /***
230      * Validate the Archive file.
231      *
232      * This method iterates over the file throwing exception if it fails
233      * to successfully parse any record.
234      *
235      * <p>Assumes the stream is at the start of the file.
236      * @return List of all read Archive Headers.
237      *
238      * @throws IOException
239      */
240     public List<ArchiveRecordHeader> validate() throws IOException {
241         return validate(-1);
242     }
243 
244     /***
245      * Validate the Archive file.
246      *
247      * This method iterates over the file throwing exception if it fails
248      * to successfully parse.
249      *
250      * <p>We start validation from wherever we are in the stream.
251      *
252      * @param numRecords Number of records expected.  Pass -1 if number is
253      * unknown.
254      *
255      * @return List of all read metadatas. As we validate records, we add
256      * a reference to the read metadata.
257      *
258      * @throws IOException
259      */
260     public List<ArchiveRecordHeader> validate(int numRecords) throws IOException {
261         List<ArchiveRecordHeader> hdrList = new ArrayList<ArchiveRecordHeader>();
262         int recordCount = 0;
263         setStrict(true);
264         for (Iterator<ArchiveRecord> i = iterator(); i.hasNext();) {
265             recordCount++;
266             ArchiveRecord r = i.next();
267             if (r.getHeader().getLength() <= 0
268                 && r.getHeader().getMimetype().
269                     equals(MimetypeUtils.NO_TYPE_MIMETYPE)) {
270                 throw new IOException("record content is empty.");
271             }
272             r.close();
273             hdrList.add(r.getHeader());
274         }
275         if (numRecords != -1) {
276             if (recordCount != numRecords) {
277                 throw new IOException("Count of records, " 
278                         + Integer.toString(recordCount) 
279                         + " is less than expected " 
280                         + Integer.toString(numRecords));
281             }
282         }
283         return hdrList;
284     }
285 
286     /***
287      * Test Archive file is valid.
288      * Assumes the stream is at the start of the file.  Be aware that this
289      * method makes a pass over the whole file. 
290      * @return True if file can be successfully parsed.
291      */
292     public boolean isValid() {
293         boolean valid = false;
294         try {
295             validate();
296             valid = true;
297         } catch(Exception e) {
298             // File is not valid if exception thrown parsing.
299             valid = false;
300         }
301     
302         return valid;
303     }
304 
305     /***
306      * @return Returns the strict.
307      */
308     public boolean isStrict() {
309         return this.strict;
310     }
311 
312     /***
313      * @param s The strict to set.
314      */
315     public void setStrict(boolean s) {
316         this.strict = s;
317     }
318 
319     /***
320      * @param d True if we're to digest.
321      */
322     public void setDigest(boolean d) {
323         this.digest = d;
324     }
325 
326     /***
327      * @return True if we're digesting as we read.
328      */
329     public boolean isDigest() {
330         return this.digest;
331     }
332  
333     protected Logger getLogger() {
334         return Logger.getLogger(this.getClass().getName());
335     }
336     
337     protected InputStream getInputStream() {
338         return this.in;
339     }
340     
341     /***
342      * Returns an ArchiveRecord iterator.
343      * Of note, on IOException, especially if ZipException reading compressed
344      * ARCs, rather than fail the iteration, try moving to the next record.
345      * If {@link ArchiveReader#strict} is not set, this will usually succeed.
346      * @return An iterator over ARC records.
347      */
348     public Iterator<ArchiveRecord> iterator() {
349         // Eat up any record outstanding.
350         try {
351             cleanupCurrentRecord();
352         } catch (IOException e) {
353             throw new RuntimeException(e);
354         }
355         
356         // Now reset stream to the start of the arc file.
357         try {
358             rewind();
359         } catch (IOException e) {
360             throw new RuntimeException(e);
361         }
362         return new ArchiveRecordIterator();
363     }
364 
365 	protected void setCompressed(boolean compressed) {
366 		this.compressed = compressed;
367 	}
368 
369     /***
370      * @return The current ARC record or null if none.
371      * After construction has the arcfile header record.
372      * @see #get()
373      */
374 	protected ArchiveRecord getCurrentRecord() {
375 		return this.currentRecord;
376 	}
377 
378 	protected ArchiveRecord currentRecord(final ArchiveRecord currentRecord) {
379 		this.currentRecord = currentRecord;
380         return currentRecord;
381 	}
382 
383 	protected InputStream getIn() {
384 		return in;
385 	}
386 
387 	protected void setIn(InputStream in) {
388 		this.in = in;
389 	}
390 
391 	protected void setVersion(String version) {
392 		this.version = version;
393 	}
394 
395 	public String getReaderIdentifier() {
396 		return this.identifier;
397 	}
398 
399 	protected void setReaderIdentifier(final String i) {
400 		this.identifier = i;
401 	}
402 	
403     /***
404      * Log on stderr.
405      * Logging should go via the logging system.  This method
406      * bypasses the logging system going direct to stderr.
407      * Should not generally be used.  Its used for rare messages
408      * that come of cmdline usage of ARCReader ERRORs and WARNINGs.
409      * Override if using ARCReader in a context where no stderr or
410      * where you'd like to redirect stderr to other than System.err.
411      * @param level Level to log message at.
412      * @param message Message to log.
413      */
414     public void logStdErr(Level level, String message) {
415         System.err.println(level.toString() + " " + message);
416     }
417     
418     /***
419      * Add buffering to RandomAccessInputStream.
420      */
421     protected class RandomAccessBufferedInputStream
422     extends BufferedInputStream implements RepositionableStream {
423 
424         public RandomAccessBufferedInputStream(RandomAccessInputStream is)
425         		throws IOException {
426             super(is);
427         }
428 
429         public RandomAccessBufferedInputStream(RandomAccessInputStream is, int size)
430         		throws IOException {
431             super(is, size);
432         }
433 
434         public long position() throws IOException {
435             // Current position is the underlying files position
436             // minus the amount thats in the buffer yet to be read.
437             return ((RandomAccessInputStream)this.in).position() -
438             	(this.count - this.pos);
439         }
440 
441         public void position(long position) throws IOException {
442             // Force refill of buffer whenever there's been a seek.
443             this.pos = 0;
444             this.count = 0;
445             ((RandomAccessInputStream)this.in).position(position);
446         }
447         
448         public int available() throws IOException {
449             // Avoid overflow on large datastreams
450             long amount = (long)in.available() + (long)(count - pos);
451             return (amount >= Integer.MAX_VALUE)? Integer.MAX_VALUE: (int)amount;
452         }
453     }
454     
455     /***
456      * Inner ArchiveRecord Iterator class.
457      * Throws RuntimeExceptions in {@link #hasNext()} and {@link #next()} if
458      * trouble pulling record from underlying stream.
459      * @author stack
460      */
461     protected class ArchiveRecordIterator implements Iterator<ArchiveRecord> {
462         private final Logger logger =
463             Logger.getLogger(this.getClass().getName());
464         /***
465          * @return True if we have more records to read.
466          * @exception RuntimeException Can throw an IOException wrapped in a
467          * RuntimeException if a problem reading underlying stream (Corrupted
468          * gzip, etc.).
469          */
470         public boolean hasNext() {
471             // Call close on any extant record.  This will scoot us past
472             // any content not yet read.
473             try {
474                 cleanupCurrentRecord();
475             } catch (IOException e) {
476                 if (isStrict()) {
477                     throw new RuntimeException(e);
478                 }
479                 if (e instanceof EOFException) {
480                     logger.warning("Premature EOF cleaning up " + 
481                         currentRecord.getHeader().toString() + ": " +
482                         e.getMessage());
483                     return false;
484                 }
485                 // If not strict, try going again.  We might be able to skip
486                 // over the bad record.
487                 logger.warning("Trying skip of failed record cleanup of " +
488                     currentRecord.getHeader().toString() + ": " +
489                     e.getMessage());
490             }
491             return innerHasNext();
492         }
493         
494         protected boolean innerHasNext() {
495             long offset = -1;
496             try {
497                 offset = ((RepositionableStream)getInputStream()).position();
498                 return getInputStream().available() > 0;
499             } catch (IOException e) {
500                 throw new RuntimeException("Offset " + offset, e);
501             }
502         }
503 
504         /***
505          * Tries to move to next record if we get
506          * {@link RecoverableIOException}. If not <code>strict</code>
507          * tries to move to next record if we get an
508          * {@link IOException}.
509          * @return Next object.
510          * @exception RuntimeException Throws a runtime exception,
511          * usually a wrapping of an IOException, if trouble getting
512          * a record (Throws exception rather than return null).
513          */
514         public ArchiveRecord next() {
515             long offset = -1;
516             try {
517                 offset = ((RepositionableStream)getInputStream()).position();
518                 return exceptionNext();
519             } catch (IOException e) {
520                 if (!isStrict()) {
521                     // Retry though an IOE.  Maybe we will succeed reading
522                     // subsequent record.
523                     try {
524                         if (hasNext()) {
525                             getLogger().warning("Bad Record. Trying skip " +
526                                 "(Current offset " +  offset + "): " +
527                                 e.getMessage());
528                             return exceptionNext();
529                         }
530                         // Else we are at last record.  Iterator#next is
531                         // expecting value. We do not have one. Throw exception.
532                         throw new RuntimeException("Retried but no next " + 
533                             "record (Offset " + offset + ")", e);
534                     } catch (IOException e1) {
535                         throw new RuntimeException("After retry (Offset " +
536                                 offset + ")", e1);
537                     }
538                 }
539                 throw new RuntimeException("(Offset " + offset + ")", e);
540             }
541         }
542         
543         /***
544          * A next that throws exceptions and has handling of
545          * recoverable exceptions moving us to next record. Can call
546          * hasNext which itself may throw exceptions.
547          * @return Next record.
548          * @throws IOException
549          * @throws RuntimeException Thrown when we've reached maximum
550          * retries.
551          */
552         protected ArchiveRecord exceptionNext()
553         throws IOException, RuntimeException {
554             ArchiveRecord result = null;
555             IOException ioe = null;
556             for (int i = MAX_ALLOWED_RECOVERABLES; i > 0 &&
557                     result == null; i--) {
558                 ioe = null;
559                 try {
560                     result = innerNext();
561                 } catch (RecoverableIOException e) {
562                     ioe = e;
563                     getLogger().warning(e.getMessage());
564                     if (hasNext()) {
565                         continue;
566                     }
567                     // No records left.  Throw exception rather than
568                     // return null.  The caller is expecting to get
569                     // back a record since they've just called
570                     // hasNext.
571                     break;
572                 }
573             }
574             if (ioe != null) {
575                 // Then we did MAX_ALLOWED_RECOVERABLES retries.  Throw
576                 // the recoverable ioe wrapped in a RuntimeException so
577                 // it goes out pass checks for IOE.
578                 throw new RuntimeException("Retried " +
579                     MAX_ALLOWED_RECOVERABLES + " times in a row", ioe);
580             }
581             return result;
582         }
583         
584         protected ArchiveRecord innerNext() throws IOException {
585             return get(((RepositionableStream)getInputStream()).position());
586         }
587         
588         public void remove() {
589             throw new UnsupportedOperationException();
590         }
591     }
592     
593     protected static String stripExtension(final String name,
594     		final String ext) {
595         return (!name.endsWith(ext))? name:
596             name.substring(0, name.length() - ext.length());
597     }
598     
599     /***
600      * @return short name of Archive file.
601      */
602     public String getFileName() {
603         return (new File(getReaderIdentifier())).getName();
604     }
605 
606     /***
607      * @return short name of Archive file.
608      */
609     public String getStrippedFileName() {
610         return getStrippedFileName(getFileName(),
611     		getDotFileExtension());
612     }
613     
614     /***
615      * @param name Name of ARCFile.
616      * @param dotFileExtension '.arc' or '.warc', etc.
617      * @return short name of Archive file.
618      */
619     public static String getStrippedFileName(String name,
620     		final String dotFileExtension) {
621     	name = stripExtension(name,
622     		ArchiveFileConstants.DOT_COMPRESSED_FILE_EXTENSION);
623     	return stripExtension(name, dotFileExtension);
624     }
625     
626     /***
627      * @param value Value to test.
628      * @return True if value is 'true', else false.
629      */
630     protected static boolean getTrueOrFalse(final String value) {
631     	if (value == null || value.length() <= 0) {
632     		return false;
633     	}
634         return Boolean.TRUE.toString().equals(value.toLowerCase());
635     }
636     
637     /***
638      * @param format Format to use outputting.
639      * @throws IOException
640      * @throws java.text.ParseException
641      * @return True if handled.
642      */
643     protected boolean output(final String format)
644     throws IOException, java.text.ParseException {
645     	boolean result = true;
646         // long start = System.currentTimeMillis();
647     	
648         // Write output as pseudo-CDX file.  See
649         // http://www.archive.org/web/researcher/cdx_legend.php
650         // and http://www.archive.org/web/researcher/example_cdx.php.
651         // Hash is hard-coded straight SHA-1 hash of content.
652         if (format.equals(DUMP)) {
653         	// No point digesting dumping.
654         	setDigest(false);
655             dump(false);
656         } else if (format.equals(GZIP_DUMP)) {
657         	// No point digesting dumping.
658         	setDigest(false);
659             dump(true);
660         } else if (format.equals(CDX)) {
661         	cdxOutput(false);   
662         } else if (format.equals(CDX_FILE)) {
663             cdxOutput(true);
664         } else {
665         	result = false;
666         }	
667         return result;
668     }
669     
670     protected void cdxOutput(boolean toFile)
671     throws IOException {
672         BufferedWriter cdxWriter = null;
673         if (toFile) {
674             String cdxFilename = stripExtension(getReaderIdentifier(),
675                 DOT_COMPRESSED_FILE_EXTENSION);
676             cdxFilename = stripExtension(cdxFilename, getDotFileExtension());
677             cdxFilename += ('.' + CDX);
678             cdxWriter = new BufferedWriter(
679                 new OutputStreamWriter(new FileOutputStream(cdxFilename),"UTF-8"));
680         }
681         
682         String header = "CDX b e a m s c " + ((isCompressed()) ? "V" : "v")
683             + " n g";
684         if (toFile) {
685             cdxWriter.write(header);
686             cdxWriter.newLine();
687         } else {
688             System.out.println(header);
689         }
690         
691         String strippedFileName = getStrippedFileName();
692         try {
693             for (Iterator<ArchiveRecord> ii = iterator(); ii.hasNext();) {
694             	ArchiveRecord r = ii.next();
695                 if (toFile) {
696                     cdxWriter.write(r.outputCdx(strippedFileName));
697                     cdxWriter.newLine();
698                 } else {
699                     System.out.println(r.outputCdx(strippedFileName));
700                 }
701             }
702         } finally {
703             if (toFile) {
704                 cdxWriter.close();
705             }
706         }
707     }
708     
709     /***
710      * Output passed record using passed format specifier.
711      * @param format What format to use outputting.
712      * @throws IOException
713      * @return True if handled.
714      */
715     public boolean outputRecord(final String format)
716     throws IOException {
717     	boolean result = true;
718         if (format.equals(CDX)) {
719             System.out.println(get().outputCdx(getStrippedFileName()));
720         } else if(format.equals(ArchiveFileConstants.DUMP)) {
721             // No point digesting if dumping content.
722             setDigest(false);
723             get().dump();
724         } else {
725         	result = false;
726         }
727         return result;
728     }
729 
730     /***
731      * Dump this file on STDOUT
732      * @throws compress True if dumped output is compressed.
733      * @throws IOException
734      * @throws java.text.ParseException
735      */
736     public abstract void dump(final boolean compress)
737     throws IOException, java.text.ParseException;
738     
739     /***
740      * @return an ArchiveReader that will delete a local file on close.  Used
741      * when we bring Archive files local and need to clean up afterward.
742      */
743     public abstract ArchiveReader getDeleteFileOnCloseReader(final File f);
744     
745     /***
746      * Output passed record using passed format specifier.
747      * @param r ARCReader instance to output.
748      * @param format What format to use outputting.
749      * @throws IOException
750      */
751     protected static void outputRecord(final ArchiveReader r,
752         final String format)
753     throws IOException {
754         if (!r.outputRecord(format)) {
755             throw new IOException("Unsupported format" +
756                 " (or unsupported on a single record): " + format);
757         }
758     }
759     
760     /***
761      * @return Base Options object filled out with help, digest, strict, etc.
762      * options.
763      */
764     protected static Options getOptions() {
765         Options options = new Options();
766         options.addOption(new Option("h","help", false,
767             "Prints this message and exits."));
768         options.addOption(new Option("o","offset", true,
769             "Outputs record at this offset into file."));
770         options.addOption(new Option("d","digest", true,
771             "Pass true|false. Expensive. Default: true (SHA-1)."));
772         options.addOption(new Option("s","strict", false,
773             "Strict mode. Fails parse if incorrectly formatted file."));
774         options.addOption(new Option("f","format", true,
775             "Output options: 'cdx', cdxfile', 'dump', 'gzipdump'," +
776             "'or 'nohead'. Default: 'cdx'."));
777         return options;
778     }
779 }