View Javadoc

1   /* GzippedInputStream
2   *
3   * $Id: GzippedInputStream.java 4995 2007-03-12 23:48:36Z stack-sf $
4   *
5   * Created on July 5, 2004
6   *
7   * Copyright (C) 2004 Internet Archive.
8   *
9   * This file is part of the Heritrix web crawler (crawler.archive.org).
10  *
11  * Heritrix is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser Public License as published by
13  * the Free Software Foundation; either version 2.1 of the License, or
14  * any later version.
15  *
16  * Heritrix is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU Lesser Public License for more details.
20  *
21  * You should have received a copy of the GNU Lesser Public License
22  * along with Heritrix; if not, write to the Free Software
23  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24  */
25  package org.archive.io;
26  
27  import it.unimi.dsi.fastutil.io.RepositionableStream;
28  
29  import java.io.ByteArrayOutputStream;
30  import java.io.EOFException;
31  import java.io.IOException;
32  import java.io.InputStream;
33  import java.util.Iterator;
34  import java.util.logging.Logger;
35  import java.util.zip.Deflater;
36  import java.util.zip.GZIPInputStream;
37  import java.util.zip.GZIPOutputStream;
38  import java.util.zip.Inflater;
39  import java.util.zip.ZipException;
40  
41  
42  /***
43   * Subclass of GZIPInputStream that can handle a stream made of multiple
44   * concatenated GZIP members/records.
45   * 
46   * This class is needed because GZIPInputStream only finds the first GZIP
47   * member in the file even if the file is made up of multiple GZIP members.
48   * 
49   * <p>Takes an InputStream stream that implements
50   * {@link RepositionableStream} interface so it can backup over-reads done
51   * by the zlib Inflater class.
52   * 
53   * <p>Use the {@link #iterator()} method to get a gzip member iterator.
54   * Calls to {@link Iterator#next()} returns the next gzip member in the
55   * stream.  Cast return from {@link Iterator#next()} to InputStream.
56   * 
57   * <p>Use {@link #gzipMemberSeek(long)} to position stream before reading
58   * a gzip member if doing random accessing of gzip members.  Pass it offset
59   * at which gzip member starts.
60   * 
61   * <p>If you need to know position at which a gzip member starts, call
62   * {@link #position()} just after a call to {@link Iterator#hasNext()}
63   * and before you call {@link Iterator#next()}.
64   * 
65   * @author stack
66   */
67  public class GzippedInputStream
68  extends GZIPInputStream
69  implements RepositionableStream {
70      /***
71       * Tail on gzip members (The CRC).
72       */
73      private static final int GZIP_TRAILER_LENGTH = 8;
74      
75      /***
76       * Utility class used probing for gzip members in stream.
77       * We need this instance to get at the readByte method.
78       */
79      private final GzipHeader gzipHeader = new GzipHeader();
80      
81      /***
82       * Buffer size used skipping over gzip members.
83       */
84      private static final int LINUX_PAGE_SIZE = 4 * 1024;
85      
86      private final long initialOffset;
87      
88      public GzippedInputStream(InputStream is) throws IOException {
89          // Have buffer match linux page size.
90          this(is, LINUX_PAGE_SIZE);
91      }
92      
93      /***
94       * @param is An InputStream that implements RespositionableStream and
95       * returns <code>true</code> when we call
96       * {@link InputStream#markSupported()} (Latter is needed so can setup
97       * an {@link Iterator} against the Gzip stream).
98       * @param size Size of blocks to use reading.
99       * @throws IOException
100      */
101     public GzippedInputStream(final InputStream is, final int size)
102     throws IOException {
103         super(checkStream(is), size);
104         if (!is.markSupported()) {
105         	throw new IllegalArgumentException("GzippedInputStream requires " +
106         		"a markable stream");
107         }
108         if (!(is instanceof RepositionableStream)) {
109         	throw new IllegalArgumentException("GzippedInputStream requires " +
110     		"a stream that implements RepositionableStream");
111         }
112         // We need to calculate the absolute offset of the current
113         // GZIP Member.  Its almost always going to be zero but not
114         // always (We may have been passed a stream that is already part
115         // ways through a stream of GZIP Members).  So, getting
116         // absolute offset is not exactly straight-forward. The super
117         // class, GZIPInputStream on construction reads in the GZIP Header
118         // which is a pain because I then do not know the absolute offset
119         // at which the GZIP record began.  So, the call above to checkStream()
120         // marked the stream before passing it to the super calls.  Then
121         // below we get current postion at just past the GZIP Header, call
122         // reset so we go back to the absolute start of the GZIP Member in
123         // the file, record the offset for later should we need to start
124         // over again in this file -- i.e. we're asked to get an iterator
125         // from Record zero on -- then we move the file position to just
126         // after the GZIP Header again so we're again aligned for inflation
127         // of the current record.
128         long afterGZIPHeader = ((RepositionableStream)is).position();
129         is.reset();
130         this.initialOffset = ((RepositionableStream)is).position();
131         ((RepositionableStream)is).position(afterGZIPHeader);
132     }
133     
134     protected static InputStream checkStream(final InputStream is)
135     throws IOException {
136         if (is instanceof RepositionableStream) {
137         	// See note above in constructor on why the mark here.
138         	// Also minimal gzip header is 10.  IA GZIP Headers are 20 bytes.
139         	// Multiply by 4 in case extra info in the header.
140         	is.mark(GzipHeader.MINIMAL_GZIP_HEADER_LENGTH * 4);
141         	return is;
142         }
143         throw new IOException("Passed stream does not" +
144             " implement PositionableStream");
145     }
146     
147     /***
148      * Exhaust current GZIP member content.
149      * Call this method when you think you're on the end of the
150      * GZIP member.  It will clean out any dross.
151      * @param ignore Character to ignore counting characters (Usually
152      * trailing new lines).
153      * @return Count of characters skipped over.
154      * @throws IOException
155      */
156     public long gotoEOR(int ignore) throws IOException {
157         long bytesSkipped = 0;
158         if (this.inf.getTotalIn() <= 0) {
159             return bytesSkipped;
160         }
161         if (!this.inf.finished()) {
162             int read = 0;
163             while ((read = read()) != -1) {
164                 if ((byte)read == (byte)ignore) {
165                     continue;
166                 }
167                 bytesSkipped = gotoEOR() + 1;
168                 break;
169             }
170         }
171         return bytesSkipped;
172     }
173     
174     /***
175      * Exhaust current GZIP member content.
176      * Call this method when you think you're on the end of the
177      * GZIP member.  It will clean out any dross.
178      * @return Count of characters skipped over.
179      * @throws IOException
180      */
181     public long gotoEOR() throws IOException {
182         long bytesSkipped = 0;
183         if (this.inf.getTotalIn() <= 0) {
184             return bytesSkipped;
185         }
186         while(!this.inf.finished()) {
187             bytesSkipped += skip(Long.MAX_VALUE);
188         }
189         return bytesSkipped;
190     }
191     
192     /***
193      * Returns a GZIP Member Iterator.
194      * Has limitations. Can only get one Iterator per instance of this class;
195      * you must get new instance if you want to get Iterator again.
196      * @return Iterator over GZIP Members.
197      */
198     public Iterator iterator() {
199         final Logger logger = Logger.getLogger(this.getClass().getName());
200         
201         try {
202             // We know its a RepositionableStream else we'd have failed
203         	// construction.  On iterator construction, set file back to
204         	// initial position so we're ready to read GZIP Members
205         	// (May not always work dependent on how the
206         	// RepositionableStream was implemented).
207             ((RepositionableStream)this.in).position(this.initialOffset);
208         } catch (IOException e) {
209             throw new RuntimeException(e);
210         }
211         return new Iterator() {
212             private GzippedInputStream compressedStream =
213                 GzippedInputStream.this;
214             
215             public boolean hasNext() {
216                 try {
217                     gotoEOR();
218                 } catch (IOException e) {
219                     if ((e instanceof ZipException) ||
220                         (e.getMessage() != null &&
221                          e.getMessage().startsWith("Corrupt GZIP trailer"))) {
222                         // Try skipping end of bad record; try moving to next.
223                         logger.info("Skipping exception " + e.getMessage());
224                     } else {
225                         throw new RuntimeException(e);
226                     }
227                 }
228                 return moveToNextGzipMember();
229             }
230             
231             /***
232              * @return An InputStream onto a GZIP Member.
233              */
234             public Object next() {
235                 try {
236                     gzipMemberSeek();
237                 } catch (IOException e) {
238                     throw new RuntimeException("Failed move to EOR or " +
239                         "failed header read: " + e.getMessage());
240                 }
241                 return this.compressedStream;
242             }
243             
244             public void remove() {
245                 throw new UnsupportedOperationException();
246             }
247         };   
248     }
249     
250     /***
251      * @return True if we found another record in the stream.
252      */
253     protected boolean moveToNextGzipMember() {
254         boolean result = false;
255         // Move to the next gzip member, if there is one, positioning
256         // ourselves by backing up the stream so we reread any inflater
257         // remaining bytes. Then add 8 bytes to get us past the GZIP
258         // CRC trailer block that ends all gzip members.
259         try {
260             RepositionableStream ps = (RepositionableStream)getInputStream();
261             // 8 is sizeof gzip CRC block thats on tail of gzipped
262             // record. If remaining is < 8 then experience indicates
263             // we're seeking past the gzip header -- don't backup the
264             // stream.
265             if (getInflater().getRemaining() > GZIP_TRAILER_LENGTH) {
266                 ps.position(position() - getInflater().getRemaining() +
267                     GZIP_TRAILER_LENGTH);
268             }
269             for (int read = -1, headerRead = 0; true; headerRead = 0) {
270                 // Give a hint to underlying stream that we're going to want to
271                 // do some backing up.
272                 getInputStream().mark(3);
273                 if ((read = getInputStream().read()) == -1) {
274                     break;
275                 }
276                 if(compareBytes(read, GZIPInputStream.GZIP_MAGIC)) {
277                     headerRead++;
278                     if ((read = getInputStream().read()) == -1) {
279                     	break;
280                     }
281                     if(compareBytes(read, GZIPInputStream.GZIP_MAGIC >> 8)) {
282                         headerRead++;
283                         if ((read = getInputStream().read()) == -1) {
284                         	break;
285                         }
286                         if (compareBytes(read, Deflater.DEFLATED)) {
287                             headerRead++;
288                             // Found gzip header. Backup the stream the
289                             // bytes we just found and set result true.
290                             getInputStream().reset();
291                             result = true;
292                             break;
293                         }
294                     }
295                     // Didn't find gzip header.  Reset stream but one byte
296                     // futher on then redo header tests.
297                     ps.position(ps.position() - headerRead);
298                 }
299             }
300         } catch (IOException e) {
301             throw new RuntimeException("Failed i/o: " + e.getMessage());
302         }
303         return result;
304     }
305     
306     protected boolean compareBytes(final int a, final int b) {
307     	return ((byte)(a & 0xff)) == ((byte)(b & 0xff));
308     }
309   
310     protected Inflater getInflater() {
311         return this.inf;
312     }
313     
314     protected InputStream getInputStream() {
315         return this.in;
316     }
317     
318     protected GzipHeader getGzipHeader() {
319         return this.gzipHeader;
320     }
321     
322     /***
323      * Move to next gzip member in the file.
324      */
325     protected void resetInflater() {
326         this.eos = false;
327         this.inf.reset();
328     }
329     
330     /***
331      * Read in the gzip header.
332      * @throws IOException
333      */
334     protected void readHeader() throws IOException {
335         new GzipHeader(this.in);
336         // Reset the crc for subsequent reads.
337         this.crc.reset();
338     }
339 
340     /***
341      * Seek to passed offset.
342      * 
343      * After positioning the stream, it resets the inflater.
344      * Assumption is that public use of this method is only
345      * to position stream at start of a gzip member.
346      * 
347      * @param position Absolute position of a gzip member start.
348      * @throws IOException
349      */
350     public void position(long position) throws IOException {
351         ((RepositionableStream)this.in).position(position);
352         resetInflater();
353     }
354 
355     public long position() throws IOException {
356        return  ((RepositionableStream)this.in).position();
357     }
358     
359     /***
360      * Seek to a gzip member.
361      * 
362      * Moves stream to new position, resets inflater and reads in the gzip
363      * header ready for subsequent calls to read.
364      * 
365      * @param position Absolute position of a gzip member start.
366      * @throws IOException
367      */
368     public void gzipMemberSeek(long position) throws IOException {
369         position(position);
370         readHeader();
371     }
372     
373     public void gzipMemberSeek() throws IOException {
374         gzipMemberSeek(position());
375     }
376     
377     /***
378      * Gzip passed bytes.
379      * Use only when bytes is small.
380      * @param bytes What to gzip.
381      * @return A gzip member of bytes.
382      * @throws IOException
383      */
384     public static byte [] gzip(byte [] bytes) throws IOException {
385         ByteArrayOutputStream baos = new ByteArrayOutputStream();
386         GZIPOutputStream gzipOS = new GZIPOutputStream(baos);
387         gzipOS.write(bytes, 0, bytes.length);
388         gzipOS.close();
389         return baos.toByteArray();
390     }
391     
392     /***
393      * Tests passed stream is GZIP stream by reading in the HEAD.
394      * Does reposition of stream when done.
395      * @param rs An InputStream that is Repositionable.
396      * @return True if compressed stream.
397      * @throws IOException
398      */
399     public static boolean isCompressedRepositionableStream(
400             final RepositionableStream rs)
401     throws IOException {
402         boolean result = false;
403         long p = rs.position();
404         try {
405             result = isCompressedStream((InputStream)rs);
406         } finally {
407             rs.position(p);
408         }
409         return result; 
410     }
411     
412     /***
413      * Tests passed stream is gzip stream by reading in the HEAD.
414      * Does not reposition stream when done.
415      * @param is An InputStream.
416      * @return True if compressed stream.
417      * @throws IOException
418      */
419     public static boolean isCompressedStream(final InputStream is)
420     throws IOException {
421         try {
422             new GzipHeader(is);
423         } catch (NoGzipMagicException e) {
424             return false;
425         }
426         return true;
427     }
428 }