View Javadoc

1   /* RecordingInputStream
2    *
3    * $Id: RecordingInputStream.java 5080 2007-04-13 20:30:49Z gojomo $
4    *
5    * Created on Sep 24, 2003
6    *
7    * Copyright (C) 2003 Internet Archive.
8    *
9    * This file is part of the Heritrix web crawler (crawler.archive.org).
10   *
11   * Heritrix is free software; you can redistribute it and/or modify
12   * it under the terms of the GNU Lesser Public License as published by
13   * the Free Software Foundation; either version 2.1 of the License, or
14   * any later version.
15   *
16   * Heritrix is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU Lesser Public License for more details.
20   *
21   * You should have received a copy of the GNU Lesser Public License
22   * along with Heritrix; if not, write to the Free Software
23   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24   */
25  package org.archive.io;
26  
27  import java.io.File;
28  import java.io.FileOutputStream;
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.net.SocketException;
32  import java.net.SocketTimeoutException;
33  import java.security.MessageDigest;
34  import java.util.logging.Level;
35  import java.util.logging.Logger;
36  
37  
38  /***
39   * Stream which records all data read from it, which it acquires from a wrapped
40   * input stream.
41   *
42   * Makes use of a RecordingOutputStream for recording because of its being
43   * file backed so we can write massive amounts of data w/o worrying about
44   * overflowing memory.
45   *
46   * @author gojomo
47   *
48   */
49  public class RecordingInputStream
50      extends InputStream {
51  
52      protected static Logger logger =
53          Logger.getLogger("org.archive.io.RecordingInputStream");
54  
55      /***
56       * Where we are recording to.
57       */
58      private RecordingOutputStream recordingOutputStream;
59  
60      /***
61       * Stream to record.
62       */
63      private InputStream in = null;
64  
65      /***
66       * Reusable buffer to avoid reallocation on each readFullyUntil
67       */
68      protected byte[] drainBuffer = new byte[16*1024];
69  
70      /***
71       * Create a new RecordingInputStream.
72       *
73       * @param bufferSize Size of buffer to use.
74       * @param backingFilename Name of backing file.
75       */
76      public RecordingInputStream(int bufferSize, String backingFilename)
77      {
78          this.recordingOutputStream = new RecordingOutputStream(bufferSize,
79              backingFilename);
80      }
81  
82      public void open(InputStream wrappedStream) throws IOException {
83          logger.fine(Thread.currentThread().getName() + " opening " +
84              wrappedStream + ", " + Thread.currentThread().getName());
85          if(isOpen()) {
86              // error; should not be opening/wrapping in an unclosed 
87              // stream remains open
88              throw new IOException("RIS already open for "
89                      +Thread.currentThread().getName());
90          }
91          this.in = wrappedStream;
92          this.recordingOutputStream.open();
93      }
94  
95      public int read() throws IOException {
96          if (!isOpen()) {
97              throw new IOException("Stream closed " +
98                  Thread.currentThread().getName());
99          }
100         int b = this.in.read();
101         if (b != -1) {
102             assert this.recordingOutputStream != null: "ROS is null " +
103                 Thread.currentThread().getName();
104             this.recordingOutputStream.write(b);
105         }
106         return b;
107     }
108 
109     public int read(byte[] b, int off, int len) throws IOException {
110         if (!isOpen()) {
111             throw new IOException("Stream closed " +
112                 Thread.currentThread().getName());
113         }
114         int count = this.in.read(b,off,len);
115         if (count > 0) {
116             assert this.recordingOutputStream != null: "ROS is null " +
117                 Thread.currentThread().getName();
118             this.recordingOutputStream.write(b,off,count);
119         }
120         return count;
121     }
122 
123     public int read(byte[] b) throws IOException {
124     	    if (!isOpen()) {
125     	    	    throw new IOException("Stream closed " +
126     			    Thread.currentThread().getName());
127     	    }
128     	    int count = this.in.read(b);
129         if (count > 0) {
130             assert this.recordingOutputStream != null: "ROS is null " +
131                 Thread.currentThread().getName();
132             this.recordingOutputStream.write(b,0,count);
133         }
134         return count;
135     }
136 
137     public void close() throws IOException {
138         if (logger.isLoggable(Level.FINE)) {
139             logger.fine(Thread.currentThread().getName() + " closing " +
140                     this.in + ", " + Thread.currentThread().getName());
141         }
142         if (this.in != null) {
143             this.in.close();
144             this.in = null;
145         }
146         this.recordingOutputStream.close();
147     }
148 
149     public ReplayInputStream getReplayInputStream() throws IOException {
150         return this.recordingOutputStream.getReplayInputStream();
151     }
152 
153     public ReplayInputStream getContentReplayInputStream() throws IOException {
154         return this.recordingOutputStream.getContentReplayInputStream();
155     }
156 
157     public long readFully() throws IOException {
158         while(read(drainBuffer) != -1) {
159             // Empty out stream.
160             continue;
161         }
162         return this.recordingOutputStream.getSize();
163     }
164 
165     /***
166      * Read all of a stream (Or read until we timeout or have read to the max).
167      * @param softMaxLength Maximum length to read; if zero or < 0, then no 
168      * limit. If met, return normally. 
169      * @param hardMaxLength Maximum length to read; if zero or < 0, then no 
170      * limit. If exceeded, throw RecorderLengthExceededException
171      * @param timeout Timeout in milliseconds for total read; if zero or
172      * negative, timeout is <code>Long.MAX_VALUE</code>. If exceeded, throw
173      * RecorderTimeoutException
174      * @param maxBytesPerMs How many bytes per millisecond.
175      * @throws IOException failed read.
176      * @throws RecorderLengthExceededException
177      * @throws RecorderTimeoutException
178      * @throws InterruptedException
179      */
180     public void readFullyOrUntil(long softMaxLength)
181         throws IOException, RecorderLengthExceededException,
182             RecorderTimeoutException, InterruptedException {
183         // Check we're open before proceeding.
184         if (!isOpen()) {
185             // TODO: should this be a noisier exception-raising error? 
186             return;
187         } 
188 
189         long totalBytes = 0L;
190         long bytesRead = -1L;
191         long maxToRead = -1; 
192         while (true) {
193             try {
194                 // read no more than soft max
195                 maxToRead = (softMaxLength <= 0) 
196                     ? drainBuffer.length 
197                     : Math.min(drainBuffer.length, softMaxLength - totalBytes);
198                 // nor more than hard max
199                 maxToRead = Math.min(maxToRead, recordingOutputStream.getRemainingLength());
200                 // but always at least 1 (to trigger hard max exception
201                 maxToRead = Math.max(maxToRead, 1);
202                 
203                 bytesRead = read(drainBuffer,0,(int)maxToRead);
204                 if (bytesRead == -1) {
205                     break;
206                 }
207                 totalBytes += bytesRead;
208 
209                 if (Thread.interrupted()) {
210                     throw new InterruptedException("Interrupted during IO");
211                 }
212             } catch (SocketTimeoutException e) {
213                 // A socket timeout is just a transient problem, meaning
214                 // nothing was available in the configured  timeout period,
215                 // but something else might become available later.
216                 // Take this opportunity to check the overall 
217                 // timeout (below).  One reason for this timeout is 
218                 // servers that keep up the connection, 'keep-alive', even
219                 // though we asked them to not keep the connection open.
220                 if (logger.isLoggable(Level.FINE)) {
221                     logger.log(Level.FINE, "socket timeout", e); 
222                 }
223                 // check for overall timeout
224                 recordingOutputStream.checkLimits();
225             } catch (SocketException se) {
226                 throw se;
227             } catch (NullPointerException e) {
228                 // [ 896757 ] NPEs in Andy's Th-Fri Crawl.
229                 // A crawl was showing NPE's in this part of the code but can
230                 // not reproduce.  Adding this rethrowing catch block w/
231                 // diagnostics to help should we come across the problem in the
232                 // future.
233                 throw new NullPointerException("Stream " + this.in + ", " +
234                     e.getMessage() + " " + Thread.currentThread().getName());
235             }
236             
237             // if have read 'enough', just finish
238             if (softMaxLength > 0 && totalBytes >= softMaxLength) {
239                 break; // return
240             }
241         }
242     }
243 
244     public long getSize() {
245         return this.recordingOutputStream.getSize();
246     }
247 
248     public void markContentBegin() {
249         this.recordingOutputStream.markContentBegin();
250     }
251     
252     public long getContentBegin() {
253         return this.recordingOutputStream.getContentBegin();
254     }
255     
256     public void startDigest() {
257         this.recordingOutputStream.startDigest();
258     }
259 
260     /***
261      * Convenience method for setting SHA1 digest.
262      */
263     public void setSha1Digest() {
264         this.recordingOutputStream.setSha1Digest();
265     }
266     
267     /***
268      * Sets a digest algorithm which may be applied to recorded data.
269      * As usually only a subset of the recorded data should
270      * be fed to the digest, you must also call startDigest()
271      * to begin digesting.
272      *
273      * @param algorithm
274      */
275     public void setDigest(String algorithm) {
276         this.recordingOutputStream.setDigest(algorithm);
277     }
278     
279     /***
280      * Sets a digest function which may be applied to recorded data.
281      * As usually only a subset of the recorded data should
282      * be fed to the digest, you must also call startDigest()
283      * to begin digesting.
284      *
285      * @param md
286      */
287     public void setDigest(MessageDigest md) {
288         this.recordingOutputStream.setDigest(md);
289     }
290 
291     /***
292      * Return the digest value for any recorded, digested data. Call
293      * only after all data has been recorded; otherwise, the running
294      * digest state is ruined.
295      *
296      * @return the digest final value
297      */
298     public byte[] getDigestValue() {
299         return this.recordingOutputStream.getDigestValue();
300     }
301 
302     public ReplayCharSequence getReplayCharSequence() throws IOException {
303         return getReplayCharSequence(null);
304     }
305 
306     /***
307      * @param characterEncoding Encoding of recorded stream.
308      * @return A ReplayCharSequence  Will return null if an IOException.  Call
309      * close on returned RCS when done.
310      * @throws IOException
311      */
312     public ReplayCharSequence getReplayCharSequence(String characterEncoding)
313     		throws IOException {
314         return this.recordingOutputStream.
315             getReplayCharSequence(characterEncoding);
316     }
317 
318     public long getResponseContentLength() {
319         return this.recordingOutputStream.getResponseContentLength();
320     }
321 
322     public void closeRecorder() throws IOException {
323         this.recordingOutputStream.closeRecorder();
324     }
325 
326     /***
327      * @param tempFile
328      * @throws IOException
329      */
330     public void copyContentBodyTo(File tempFile) throws IOException {
331         FileOutputStream fos = new FileOutputStream(tempFile);
332         ReplayInputStream ris = getContentReplayInputStream();
333         ris.readFullyTo(fos);
334         fos.close();
335         ris.close();
336     }
337 
338     /***
339      * @return True if we've been opened.
340      */
341     public boolean isOpen()
342     {
343         return this.in != null;
344     }
345 
346     @Override
347     public synchronized void mark(int readlimit) {
348         this.in.mark(readlimit); 
349         this.recordingOutputStream.mark(); 
350     }
351 
352     @Override
353     public boolean markSupported() {
354         return this.in.markSupported(); 
355     }
356 
357     @Override
358     public synchronized void reset() throws IOException {
359         this.in.reset();
360         this.recordingOutputStream.reset();
361     }
362 
363     /***
364      *  Set limits to be enforced by internal recording-out
365      */
366     public void setLimits(long hardMax, long timeoutMs, long maxRateKBps) {
367         recordingOutputStream.setLimits(hardMax, timeoutMs, maxRateKBps);
368     }
369 }