View Javadoc

1   /* ReplayableOutputStream
2    *
3    * $Id: RecordingOutputStream.java 6512 2009-09-23 03:02:29Z gojomo $
4    *
5    * Created on Sep 23, 2003
6    *
7    * Copyright (C) 2003 Internet Archive.
8    *
9    * This file is part of the Heritrix web crawler (crawler.archive.org).
10   *
11   * Heritrix is free software; you can redistribute it and/or modify
12   * it under the terms of the GNU Lesser Public License as published by
13   * the Free Software Foundation; either version 2.1 of the License, or
14   * any later version.
15   *
16   * Heritrix is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU Lesser Public License for more details.
20   *
21   * You should have received a copy of the GNU Lesser Public License
22   * along with Heritrix; if not, write to the Free Software
23   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24   */
25  package org.archive.io;
26  
27  import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
28  
29  import java.io.FileOutputStream;
30  import java.io.IOException;
31  import java.io.OutputStream;
32  import java.nio.charset.Charset;
33  import java.security.MessageDigest;
34  import java.security.NoSuchAlgorithmException;
35  import java.util.logging.Level;
36  import java.util.logging.Logger;
37  
38  
39  /***
40   * An output stream that records all writes to wrapped output
41   * stream.
42   *
43   * A RecordingOutputStream can be wrapped around any other
44   * OutputStream to record all bytes written to it.  You can
45   * then request a ReplayInputStream to read those bytes.
46   *
47   * <p>The RecordingOutputStream uses an in-memory buffer and
48   * backing disk file to allow it to record streams of
49   * arbitrary length limited only by available disk space.
50   *
51   * <p>As long as the stream recorded is smaller than the
52   * in-memory buffer, no disk access will occur.
53   *
54   * <p>Recorded content can be recovered as a ReplayInputStream
55   * (via getReplayInputStream() or, for only the content after
56   * the content-begin-mark is set, getContentReplayInputStream() )
57   * or as a ReplayCharSequence (via getReplayCharSequence()).
58   *
59   * <p>This class is also used as a straight output stream
60   * by {@link RecordingInputStream} to which it records all reads.
61   * {@link RecordingInputStream} is exploiting the file backed buffer
62   * facility of this class passing <code>null</code> for the stream
63   * to wrap.  TODO: Make a FileBackedOutputStream class that is
64   * subclassed by RecordingInputStream.
65   *
66   * @author gojomo
67   *
68   */
69  public class RecordingOutputStream extends OutputStream {
70      protected static Logger logger =
71          Logger.getLogger(RecordingOutputStream.class.getName());
72      
73      /***
74       * Size of recording.
75       *
76       * Later passed to ReplayInputStream on creation.  It uses it to know when
77       * EOS.
78       */
79      private long size = 0;
80  
81      private String backingFilename;
82      private OutputStream diskStream = null;
83  
84      /***
85       * Buffer we write recordings to.
86       *
87       * We write all recordings here first till its full.  Thereafter we
88       * write the backing file.
89       */
90      private byte[] buffer;
91  
92      /*** current virtual position in the recording */
93      private long position;
94      
95      /*** flag to disable recording */
96      private boolean recording;
97      
98      /***
99       * Reusable buffer for FastBufferedOutputStream
100      */
101     protected byte[] bufStreamBuf = 
102         new byte [ FastBufferedOutputStream.DEFAULT_BUFFER_SIZE ];
103     
104     /***
105      * True if we're to digest content.
106      */
107     private boolean shouldDigest = false;
108  
109     /***
110      * Digest instance.
111      */
112     private MessageDigest digest = null;
113 
114     /***
115      * Define for SHA1 alogarithm.
116      */
117     private static final String SHA1 = "SHA1";
118 
119     /***
120      * Maximum amount of header material to accept without the content
121      * body beginning -- if more, throw a RecorderTooMuchHeaderException.
122      * TODO: make configurable? make smaller?
123      */
124     protected static final long MAX_HEADER_MATERIAL = 1024*1024; // 1MB
125     
126     // configurable max length, max time limits
127     /*** maximum length of material to record before throwing exception */ 
128     protected long maxLength = Long.MAX_VALUE;
129     /*** maximum time to record before throwing exception */ 
130     protected long timeoutMs = Long.MAX_VALUE;
131     /*** maximum rate to record (adds delays to hit target rate) */ 
132     protected long maxRateBytesPerMs = Long.MAX_VALUE;
133     /*** time recording begins for timeout, rate calculations */ 
134     protected long startTime = Long.MAX_VALUE;
135     
136     /***
137      * When recording HTTP, where the content-body starts.
138      */
139     private long contentBeginMark;
140 
141     /***
142      * Stream to record.
143      */
144     private OutputStream out = null;
145 
146     // mark/reset support 
147     /*** furthest position reached before any reset()s */
148     private long maxPosition = 0;
149     /*** remembered position to reset() to */ 
150     private long markPosition = 0; 
151 
152     /***
153      * Create a new RecordingOutputStream.
154      *
155      * @param bufferSize Buffer size to use.
156      * @param backingFilename Name of backing file to use.
157      */
158     public RecordingOutputStream(int bufferSize, String backingFilename) {
159         this.buffer = new byte[bufferSize];
160         this.backingFilename = backingFilename;
161         recording = true;
162     }
163 
164     /***
165      * Wrap the given stream, both recording and passing along any data written
166      * to this RecordingOutputStream.
167      *
168      * @throws IOException If failed creation of backing file.
169      */
170     public void open() throws IOException {
171         this.open(null);
172     }
173 
174     /***
175      * Wrap the given stream, both recording and passing along any data written
176      * to this RecordingOutputStream.
177      *
178      * @param wrappedStream Stream to wrap.  May be null for case where we
179      * want to write to a file backed stream only.
180      *
181      * @throws IOException If failed creation of backing file.
182      */
183     public void open(OutputStream wrappedStream) throws IOException {
184         if(isOpen()) {
185             // error; should not be opening/wrapping in an unclosed 
186             // stream remains open
187             throw new IOException("ROS already open for "
188                     +Thread.currentThread().getName());
189         }
190         this.out = wrappedStream;
191         this.position = 0;
192         this.markPosition = 0;
193         this.maxPosition = 0; 
194         this.size = 0;
195         this.contentBeginMark = -1;
196         // ensure recording turned on
197         this.recording = true;
198         // Always begins false; must use startDigest() to begin
199         this.shouldDigest = false;
200         if (this.diskStream != null) {
201             closeDiskStream();
202         }
203         if (this.diskStream == null) {
204             // TODO: Fix so we only make file when its actually needed.
205             FileOutputStream fis = new FileOutputStream(this.backingFilename);
206             
207             this.diskStream = new RecyclingFastBufferedOutputStream(fis, bufStreamBuf);
208         }
209         startTime = System.currentTimeMillis();
210     }
211 
212     public void write(int b) throws IOException {
213         if(position<maxPosition) {
214             // revisiting previous content; do nothing but advance position
215             position++;
216             return; 
217         }
218         if(recording) {
219             record(b);
220         }
221         if (this.out != null) {
222             this.out.write(b);
223         }
224         checkLimits();
225     }
226 
227     public void write(byte[] b, int off, int len) throws IOException {
228         if(position < maxPosition) {
229             if(position+len<=maxPosition) {
230                 // revisiting; do nothing but advance position
231                 position += len;
232                 return;
233             }
234             // consume part of the array doing nothing but advancing position
235             long consumeRange = maxPosition - position; 
236             position += consumeRange;
237             off += consumeRange;
238             len -= consumeRange; 
239         }
240         if(recording) {
241             record(b, off, len);
242         }
243         if (this.out != null) {
244             this.out.write(b, off, len);
245         }
246         checkLimits();
247     }
248     
249     /***
250      * Check any enforced limits. 
251      */
252     protected void checkLimits() throws RecorderIOException {
253         // too much material before finding end of headers? 
254         if (contentBeginMark<0) {
255             // no mark yet
256             if(position>MAX_HEADER_MATERIAL) {
257                 throw new RecorderTooMuchHeaderException();
258             }
259         }
260         // overlong?
261         if(position>maxLength) {
262             throw new RecorderLengthExceededException(); 
263         }
264         // taking too long? 
265         long duration = System.currentTimeMillis() - startTime; 
266         duration = Math.max(duration,1); // !divzero
267         if(duration>timeoutMs) {
268             throw new RecorderTimeoutException(); 
269         }
270         // need to throttle reading to hit max configured rate? 
271         if(position/duration > maxRateBytesPerMs) {
272             long desiredDuration = position / maxRateBytesPerMs;
273             try {
274                 Thread.sleep(desiredDuration-duration);
275             } catch (InterruptedException e) {
276                 logger.log(Level.WARNING,
277                         "bandwidth throttling sleep interrupted", e);
278             } 
279         }
280     }
281 
282     /***
283      * Record the given byte for later recovery
284      *
285      * @param b Int to record.
286      *
287      * @exception IOException Failed write to backing file.
288      */
289     private void record(int b) throws IOException {
290         if (this.shouldDigest) {
291             this.digest.update((byte)b);
292         }
293         if (this.position >= this.buffer.length) {
294             // TODO: Its possible to call write w/o having first opened a
295             // stream.  Protect ourselves against this.
296             assert this.diskStream != null: "Diskstream is null";
297             this.diskStream.write(b);
298         } else {
299             this.buffer[(int) this.position] = (byte) b;
300         }
301         this.position++;
302     }
303 
304     /***
305      * Record the given byte-array range for recovery later
306      *
307      * @param b Buffer to record.
308      * @param off Offset into buffer at which to start recording.
309      * @param len Length of buffer to record.
310      *
311      * @exception IOException Failed write to backing file.
312      */
313     private void record(byte[] b, int off, int len) throws IOException {
314         if(this.shouldDigest) {
315             assert this.digest != null: "Digest is null.";
316             this.digest.update(b, off, len);
317         }
318         tailRecord(b, off, len);
319     }
320 
321     /***
322      * Record without digesting.
323      * 
324      * @param b Buffer to record.
325      * @param off Offset into buffer at which to start recording.
326      * @param len Length of buffer to record.
327      *
328      * @exception IOException Failed write to backing file.
329      */
330     private void tailRecord(byte[] b, int off, int len) throws IOException {
331         if(this.position >= this.buffer.length){
332             // TODO: Its possible to call write w/o having first opened a
333             // stream.  Lets protect ourselves against this.
334             if (this.diskStream == null) {
335                 throw new IOException("diskstream is null");
336             }
337             this.diskStream.write(b, off, len);
338             this.position += len;
339         } else {
340             assert this.buffer != null: "Buffer is null";
341             int toCopy = (int)Math.min(this.buffer.length - this.position, len);
342             assert b != null: "Passed buffer is null";
343             System.arraycopy(b, off, this.buffer, (int)this.position, toCopy);
344             this.position += toCopy;
345             // TODO verify these are +1 -1 right
346             if (toCopy < len) {
347                 tailRecord(b, off + toCopy, len - toCopy);
348             }
349         }
350     }
351 
352     public void close() throws IOException {
353         if(contentBeginMark<0) {
354             // if unset, consider 0 posn as content-start
355             // (so that a -1 never survives to replay step)
356             contentBeginMark = 0;
357         }
358         if (this.out != null) {
359             this.out.close();
360             this.out = null;
361         }
362         closeRecorder();
363     }
364     
365     protected synchronized void closeDiskStream()
366     throws IOException {
367         if (this.diskStream != null) {
368             this.diskStream.close();
369             this.diskStream = null;
370         }
371     }
372 
373     public void closeRecorder() throws IOException {
374         recording = false;
375         closeDiskStream(); // if any
376         // This setting of size is important.  Its passed to ReplayInputStream
377         // on creation.  It uses it to know EOS.
378         if (this.size == 0) {
379             this.size = this.position;
380         }
381     }
382 
383     /* (non-Javadoc)
384      * @see java.io.OutputStream#flush()
385      */
386     public void flush() throws IOException {
387         if (this.out != null) {
388             this.out.flush();
389         }
390         if (this.diskStream != null) {
391             this.diskStream.flush();
392         }
393     }
394 
395     public ReplayInputStream getReplayInputStream() throws IOException {
396         return getReplayInputStream(0);
397     }
398     
399     public ReplayInputStream getReplayInputStream(long skip) throws IOException {
400         // If this method is being called, then assumption must be that the
401         // stream is closed. If it ain't, then the stream gotten won't work
402         // -- the size will zero so any attempt at a read will get back EOF.
403         assert this.out == null: "Stream is still open.";
404         ReplayInputStream replay = new ReplayInputStream(this.buffer, 
405                 this.size, this.contentBeginMark, this.backingFilename);
406         replay.skip(skip);
407         return replay; 
408     }
409 
410     /***
411      * Return a replay stream, cued up to begining of content
412      *
413      * @throws IOException
414      * @return An RIS.
415      */
416     public ReplayInputStream getContentReplayInputStream() throws IOException {
417         return getReplayInputStream(this.contentBeginMark);
418     }
419 
420     public long getSize() {
421         return this.size;
422     }
423 
424     /***
425      * Remember the current position as the start of the "response
426      * body". Useful when recording HTTP traffic as a way to start
427      * replays after the headers.
428      */
429     public void markContentBegin() {
430         this.contentBeginMark = this.position;
431         startDigest();
432     }
433 
434     /***
435      * Return stored content-begin-mark (which is also end-of-headers)
436      */
437     public long getContentBegin() {
438         return this.contentBeginMark;
439     }
440     
441     /***
442      * Starts digesting recorded data, if a MessageDigest has been
443      * set.
444      */
445     public void startDigest() {
446         if (this.digest != null) {
447             this.digest.reset();
448             this.shouldDigest = true;
449         }
450     }
451 
452     /***
453      * Convenience method for setting SHA1 digest.
454      * @see #setDigest(String)
455      */
456     public void setSha1Digest() {
457         setDigest(SHA1);
458     }
459     
460 
461     /***
462      * Sets a digest function which may be applied to recorded data.
463      * The difference between calling this method and {@link #setDigest(MessageDigest)}
464      * is that this method tries to reuse MethodDigest instance if already allocated
465      * and of appropriate algorithm.
466      * @param algorithm Message digest algorithm to use.
467      * @see #setDigest(MessageDigest)
468      */
469     public void setDigest(String algorithm) {
470         try {
471             // Reuse extant digest if its sha1 algorithm.
472             if (this.digest == null ||
473                     !this.digest.getAlgorithm().equals(algorithm)) {
474                 setDigest(MessageDigest.getInstance(algorithm));
475             }
476         } catch (NoSuchAlgorithmException e) {
477             e.printStackTrace();
478         }
479     }
480 
481     /***
482      * Sets a digest function which may be applied to recorded data.
483      *
484      * As usually only a subset of the recorded data should
485      * be fed to the digest, you must also call startDigest()
486      * to begin digesting.
487      *
488      * @param md Message digest function to use.
489      */
490     public void setDigest(MessageDigest md) {
491         this.digest = md;
492     }
493 
494     /***
495      * Return the digest value for any recorded, digested data. Call
496      * only after all data has been recorded; otherwise, the running
497      * digest state is ruined.
498      *
499      * @return the digest final value
500      */
501     public byte[] getDigestValue() {
502         if(this.digest == null) {
503             return null;
504         }
505         return this.digest.digest();
506     }
507 
508     public ReplayCharSequence getReplayCharSequence() throws IOException {
509         return getReplayCharSequence(null);
510     }
511 
512     public ReplayCharSequence getReplayCharSequence(String characterEncoding) 
513     throws IOException {
514         return getReplayCharSequence(characterEncoding, this.contentBeginMark);
515     }
516 
517     private static final String canonicalLatin1 = Charset.forName("iso8859-1").name();
518 
519     /***
520      * @param characterEncoding Encoding of recorded stream.
521      * @return A ReplayCharSequence  Will return null if an IOException.  Call
522      * close on returned RCS when done.
523      * @throws IOException
524      */
525     public ReplayCharSequence getReplayCharSequence(String characterEncoding, 
526             long startOffset) throws IOException {
527         if (characterEncoding == null) {
528             // in absence of other info, assume iso8859-1 on all JVMs
529             characterEncoding = canonicalLatin1;
530         }
531         try {
532             // ensure name is canonical 
533             characterEncoding = Charset.forName(characterEncoding).name();
534         } catch (IllegalArgumentException iae) {
535             // revert to single-byte for unknown encodings
536             characterEncoding = canonicalLatin1;
537         }
538         // TODO: handled transfer-encoding: chunked content-bodies properly
539         if (canonicalLatin1.equals(characterEncoding)) {
540             return new Latin1ByteReplayCharSequence(
541                     this.buffer, 
542                     this.size, 
543                     startOffset,
544                     this.backingFilename);
545         } else {
546             // multibyte 
547             if(this.size <= this.buffer.length) {
548                 // raw data is all in memory; do in memory
549                 return new GenericReplayCharSequence(
550                         this.buffer, 
551                         this.size, 
552                         startOffset,
553                         characterEncoding);
554                 
555             } else {
556                 // raw data overflows to disk; use temp file
557                 ReplayInputStream ris = getReplayInputStream(startOffset);
558                 ReplayCharSequence rcs = new GenericReplayCharSequence(
559                         ris, 
560                         this.backingFilename,
561                         characterEncoding);
562                 ris.close(); 
563                 return rcs;
564             }
565             
566         }
567         
568     }
569 
570     public long getResponseContentLength() {
571         return this.size - this.contentBeginMark;
572     }
573 
574     /***
575      * @return True if this ROS is open.
576      */
577     public boolean isOpen() {
578         return this.out != null;
579     }
580     
581     /***
582      * When used alongside a mark-supporting RecordingInputStream, remember
583      * a position reachable by a future reset().
584      */
585     public void mark() {
586         // remember this position for subsequent reset()
587         this.markPosition = position; 
588     }
589     
590     /***
591      * When used alongside a mark-supporting RecordingInputStream, reset 
592      * the position to that saved by previous mark(). Until the position 
593      * again reached "new" material, none of the bytes pushed to this 
594      * stream will be digested or recorded. 
595      */
596     public void reset() {
597         // take note of furthest-position-reached to avoid double-recording
598         maxPosition = Math.max(maxPosition, position); 
599         // reset to previous position
600         position = markPosition;
601     }
602     
603     /***
604      * Set limits on length, time, and rate to enforce.
605      * 
606      * @param length
607      * @param milliseconds
608      * @param rateKBps
609      */
610     public void setLimits(long length, long milliseconds, long rateKBps) {
611         maxLength = (length>0) ? length : Long.MAX_VALUE;
612         timeoutMs = (milliseconds>0) ? milliseconds : Long.MAX_VALUE;
613         maxRateBytesPerMs = (rateKBps>0) ? rateKBps*1024/1000 : Long.MAX_VALUE;
614     }
615     
616     /***
617      * Reset limits to effectively-unlimited defaults
618      */
619     public void resetLimits() {
620         maxLength = Long.MAX_VALUE;
621         timeoutMs = Long.MAX_VALUE;
622         maxRateBytesPerMs = Long.MAX_VALUE;
623     }
624     
625     /***
626      * Return number of bytes that could be recorded without hitting 
627      * length limit
628      * 
629      * @return long byte count
630      */
631     public long getRemainingLength() {
632         return maxLength - position; 
633     }
634 }