1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.io;
26
27 import it.unimi.dsi.fastutil.io.RepositionableStream;
28
29 import java.io.ByteArrayOutputStream;
30 import java.io.EOFException;
31 import java.io.IOException;
32 import java.io.InputStream;
33 import java.util.Iterator;
34 import java.util.logging.Logger;
35 import java.util.zip.Deflater;
36 import java.util.zip.GZIPInputStream;
37 import java.util.zip.GZIPOutputStream;
38 import java.util.zip.Inflater;
39 import java.util.zip.ZipException;
40
41
42 /***
43 * Subclass of GZIPInputStream that can handle a stream made of multiple
44 * concatenated GZIP members/records.
45 *
46 * This class is needed because GZIPInputStream only finds the first GZIP
47 * member in the file even if the file is made up of multiple GZIP members.
48 *
49 * <p>Takes an InputStream stream that implements
50 * {@link RepositionableStream} interface so it can backup over-reads done
51 * by the zlib Inflater class.
52 *
53 * <p>Use the {@link #iterator()} method to get a gzip member iterator.
54 * Calls to {@link Iterator#next()} returns the next gzip member in the
55 * stream. Cast return from {@link Iterator#next()} to InputStream.
56 *
57 * <p>Use {@link #gzipMemberSeek(long)} to position stream before reading
58 * a gzip member if doing random accessing of gzip members. Pass it offset
59 * at which gzip member starts.
60 *
61 * <p>If you need to know position at which a gzip member starts, call
62 * {@link #position()} just after a call to {@link Iterator#hasNext()}
63 * and before you call {@link Iterator#next()}.
64 *
65 * @author stack
66 */
67 public class GzippedInputStream
68 extends GZIPInputStream
69 implements RepositionableStream {
70 /***
71 * Tail on gzip members (The CRC).
72 */
73 private static final int GZIP_TRAILER_LENGTH = 8;
74
75 /***
76 * Utility class used probing for gzip members in stream.
77 * We need this instance to get at the readByte method.
78 */
79 private final GzipHeader gzipHeader = new GzipHeader();
80
81 /***
82 * Buffer size used skipping over gzip members.
83 */
84 private static final int LINUX_PAGE_SIZE = 4 * 1024;
85
86 private final long initialOffset;
87
88 public GzippedInputStream(InputStream is) throws IOException {
89
90 this(is, LINUX_PAGE_SIZE);
91 }
92
93 /***
94 * @param is An InputStream that implements RespositionableStream and
95 * returns <code>true</code> when we call
96 * {@link InputStream#markSupported()} (Latter is needed so can setup
97 * an {@link Iterator} against the Gzip stream).
98 * @param size Size of blocks to use reading.
99 * @throws IOException
100 */
101 public GzippedInputStream(final InputStream is, final int size)
102 throws IOException {
103 super(checkStream(is), size);
104 if (!is.markSupported()) {
105 throw new IllegalArgumentException("GzippedInputStream requires " +
106 "a markable stream");
107 }
108 if (!(is instanceof RepositionableStream)) {
109 throw new IllegalArgumentException("GzippedInputStream requires " +
110 "a stream that implements RepositionableStream");
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 long afterGZIPHeader = ((RepositionableStream)is).position();
129 is.reset();
130 this.initialOffset = ((RepositionableStream)is).position();
131 ((RepositionableStream)is).position(afterGZIPHeader);
132 }
133
134 protected static InputStream checkStream(final InputStream is)
135 throws IOException {
136 if (is instanceof RepositionableStream) {
137
138
139
140 is.mark(GzipHeader.MINIMAL_GZIP_HEADER_LENGTH * 4);
141 return is;
142 }
143 throw new IOException("Passed stream does not" +
144 " implement PositionableStream");
145 }
146
147 /***
148 * Exhaust current GZIP member content.
149 * Call this method when you think you're on the end of the
150 * GZIP member. It will clean out any dross.
151 * @param ignore Character to ignore counting characters (Usually
152 * trailing new lines).
153 * @return Count of characters skipped over.
154 * @throws IOException
155 */
156 public long gotoEOR(int ignore) throws IOException {
157 long bytesSkipped = 0;
158 if (this.inf.getTotalIn() <= 0) {
159 return bytesSkipped;
160 }
161 if (!this.inf.finished()) {
162 int read = 0;
163 while ((read = read()) != -1) {
164 if ((byte)read == (byte)ignore) {
165 continue;
166 }
167 bytesSkipped = gotoEOR() + 1;
168 break;
169 }
170 }
171 return bytesSkipped;
172 }
173
174 /***
175 * Exhaust current GZIP member content.
176 * Call this method when you think you're on the end of the
177 * GZIP member. It will clean out any dross.
178 * @return Count of characters skipped over.
179 * @throws IOException
180 */
181 public long gotoEOR() throws IOException {
182 long bytesSkipped = 0;
183 if (this.inf.getTotalIn() <= 0) {
184 return bytesSkipped;
185 }
186 while(!this.inf.finished()) {
187 bytesSkipped += skip(Long.MAX_VALUE);
188 }
189 return bytesSkipped;
190 }
191
192 /***
193 * Returns a GZIP Member Iterator.
194 * Has limitations. Can only get one Iterator per instance of this class;
195 * you must get new instance if you want to get Iterator again.
196 * @return Iterator over GZIP Members.
197 */
198 public Iterator iterator() {
199 final Logger logger = Logger.getLogger(this.getClass().getName());
200
201 try {
202
203
204
205
206
207 ((RepositionableStream)this.in).position(this.initialOffset);
208 } catch (IOException e) {
209 throw new RuntimeException(e);
210 }
211 return new Iterator() {
212 private GzippedInputStream compressedStream =
213 GzippedInputStream.this;
214
215 public boolean hasNext() {
216 try {
217 gotoEOR();
218 } catch (IOException e) {
219 if ((e instanceof ZipException) ||
220 (e.getMessage() != null &&
221 e.getMessage().startsWith("Corrupt GZIP trailer"))) {
222
223 logger.info("Skipping exception " + e.getMessage());
224 } else {
225 throw new RuntimeException(e);
226 }
227 }
228 return moveToNextGzipMember();
229 }
230
231 /***
232 * @return An InputStream onto a GZIP Member.
233 */
234 public Object next() {
235 try {
236 gzipMemberSeek();
237 } catch (IOException e) {
238 throw new RuntimeException("Failed move to EOR or " +
239 "failed header read: " + e.getMessage());
240 }
241 return this.compressedStream;
242 }
243
244 public void remove() {
245 throw new UnsupportedOperationException();
246 }
247 };
248 }
249
250 /***
251 * @return True if we found another record in the stream.
252 */
253 protected boolean moveToNextGzipMember() {
254 boolean result = false;
255
256
257
258
259 try {
260 RepositionableStream ps = (RepositionableStream)getInputStream();
261
262
263
264
265 if (getInflater().getRemaining() > GZIP_TRAILER_LENGTH) {
266 ps.position(position() - getInflater().getRemaining() +
267 GZIP_TRAILER_LENGTH);
268 }
269 for (int read = -1, headerRead = 0; true; headerRead = 0) {
270
271
272 getInputStream().mark(3);
273 if ((read = getInputStream().read()) == -1) {
274 break;
275 }
276 if(compareBytes(read, GZIPInputStream.GZIP_MAGIC)) {
277 headerRead++;
278 if ((read = getInputStream().read()) == -1) {
279 break;
280 }
281 if(compareBytes(read, GZIPInputStream.GZIP_MAGIC >> 8)) {
282 headerRead++;
283 if ((read = getInputStream().read()) == -1) {
284 break;
285 }
286 if (compareBytes(read, Deflater.DEFLATED)) {
287 headerRead++;
288
289
290 getInputStream().reset();
291 result = true;
292 break;
293 }
294 }
295
296
297 ps.position(ps.position() - headerRead);
298 }
299 }
300 } catch (IOException e) {
301 throw new RuntimeException("Failed i/o: " + e.getMessage());
302 }
303 return result;
304 }
305
306 protected boolean compareBytes(final int a, final int b) {
307 return ((byte)(a & 0xff)) == ((byte)(b & 0xff));
308 }
309
310 protected Inflater getInflater() {
311 return this.inf;
312 }
313
314 protected InputStream getInputStream() {
315 return this.in;
316 }
317
318 protected GzipHeader getGzipHeader() {
319 return this.gzipHeader;
320 }
321
322 /***
323 * Move to next gzip member in the file.
324 */
325 protected void resetInflater() {
326 this.eos = false;
327 this.inf.reset();
328 }
329
330 /***
331 * Read in the gzip header.
332 * @throws IOException
333 */
334 protected void readHeader() throws IOException {
335 new GzipHeader(this.in);
336
337 this.crc.reset();
338 }
339
340 /***
341 * Seek to passed offset.
342 *
343 * After positioning the stream, it resets the inflater.
344 * Assumption is that public use of this method is only
345 * to position stream at start of a gzip member.
346 *
347 * @param position Absolute position of a gzip member start.
348 * @throws IOException
349 */
350 public void position(long position) throws IOException {
351 ((RepositionableStream)this.in).position(position);
352 resetInflater();
353 }
354
355 public long position() throws IOException {
356 return ((RepositionableStream)this.in).position();
357 }
358
359 /***
360 * Seek to a gzip member.
361 *
362 * Moves stream to new position, resets inflater and reads in the gzip
363 * header ready for subsequent calls to read.
364 *
365 * @param position Absolute position of a gzip member start.
366 * @throws IOException
367 */
368 public void gzipMemberSeek(long position) throws IOException {
369 position(position);
370 readHeader();
371 }
372
373 public void gzipMemberSeek() throws IOException {
374 gzipMemberSeek(position());
375 }
376
377 /***
378 * Gzip passed bytes.
379 * Use only when bytes is small.
380 * @param bytes What to gzip.
381 * @return A gzip member of bytes.
382 * @throws IOException
383 */
384 public static byte [] gzip(byte [] bytes) throws IOException {
385 ByteArrayOutputStream baos = new ByteArrayOutputStream();
386 GZIPOutputStream gzipOS = new GZIPOutputStream(baos);
387 gzipOS.write(bytes, 0, bytes.length);
388 gzipOS.close();
389 return baos.toByteArray();
390 }
391
392 /***
393 * Tests passed stream is GZIP stream by reading in the HEAD.
394 * Does reposition of stream when done.
395 * @param rs An InputStream that is Repositionable.
396 * @return True if compressed stream.
397 * @throws IOException
398 */
399 public static boolean isCompressedRepositionableStream(
400 final RepositionableStream rs)
401 throws IOException {
402 boolean result = false;
403 long p = rs.position();
404 try {
405 result = isCompressedStream((InputStream)rs);
406 } finally {
407 rs.position(p);
408 }
409 return result;
410 }
411
412 /***
413 * Tests passed stream is gzip stream by reading in the HEAD.
414 * Does not reposition stream when done.
415 * @param is An InputStream.
416 * @return True if compressed stream.
417 * @throws IOException
418 */
419 public static boolean isCompressedStream(final InputStream is)
420 throws IOException {
421 try {
422 new GzipHeader(is);
423 } catch (NoGzipMagicException e) {
424 return false;
425 }
426 return true;
427 }
428 }