1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.io;
26
27 import java.io.File;
28 import java.io.FileOutputStream;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.net.SocketException;
32 import java.net.SocketTimeoutException;
33 import java.security.MessageDigest;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36
37
38 /***
39 * Stream which records all data read from it, which it acquires from a wrapped
40 * input stream.
41 *
42 * Makes use of a RecordingOutputStream for recording because of its being
43 * file backed so we can write massive amounts of data w/o worrying about
44 * overflowing memory.
45 *
46 * @author gojomo
47 *
48 */
49 public class RecordingInputStream
50 extends InputStream {
51
52 protected static Logger logger =
53 Logger.getLogger("org.archive.io.RecordingInputStream");
54
55 /***
56 * Where we are recording to.
57 */
58 private RecordingOutputStream recordingOutputStream;
59
60 /***
61 * Stream to record.
62 */
63 private InputStream in = null;
64
65 /***
66 * Reusable buffer to avoid reallocation on each readFullyUntil
67 */
68 protected byte[] drainBuffer = new byte[16*1024];
69
70 /***
71 * Create a new RecordingInputStream.
72 *
73 * @param bufferSize Size of buffer to use.
74 * @param backingFilename Name of backing file.
75 */
76 public RecordingInputStream(int bufferSize, String backingFilename)
77 {
78 this.recordingOutputStream = new RecordingOutputStream(bufferSize,
79 backingFilename);
80 }
81
82 public void open(InputStream wrappedStream) throws IOException {
83 logger.fine(Thread.currentThread().getName() + " opening " +
84 wrappedStream + ", " + Thread.currentThread().getName());
85 if(isOpen()) {
86
87
88 throw new IOException("RIS already open for "
89 +Thread.currentThread().getName());
90 }
91 this.in = wrappedStream;
92 this.recordingOutputStream.open();
93 }
94
95 public int read() throws IOException {
96 if (!isOpen()) {
97 throw new IOException("Stream closed " +
98 Thread.currentThread().getName());
99 }
100 int b = this.in.read();
101 if (b != -1) {
102 assert this.recordingOutputStream != null: "ROS is null " +
103 Thread.currentThread().getName();
104 this.recordingOutputStream.write(b);
105 }
106 return b;
107 }
108
109 public int read(byte[] b, int off, int len) throws IOException {
110 if (!isOpen()) {
111 throw new IOException("Stream closed " +
112 Thread.currentThread().getName());
113 }
114 int count = this.in.read(b,off,len);
115 if (count > 0) {
116 assert this.recordingOutputStream != null: "ROS is null " +
117 Thread.currentThread().getName();
118 this.recordingOutputStream.write(b,off,count);
119 }
120 return count;
121 }
122
123 public int read(byte[] b) throws IOException {
124 if (!isOpen()) {
125 throw new IOException("Stream closed " +
126 Thread.currentThread().getName());
127 }
128 int count = this.in.read(b);
129 if (count > 0) {
130 assert this.recordingOutputStream != null: "ROS is null " +
131 Thread.currentThread().getName();
132 this.recordingOutputStream.write(b,0,count);
133 }
134 return count;
135 }
136
137 public void close() throws IOException {
138 if (logger.isLoggable(Level.FINE)) {
139 logger.fine(Thread.currentThread().getName() + " closing " +
140 this.in + ", " + Thread.currentThread().getName());
141 }
142 if (this.in != null) {
143 this.in.close();
144 this.in = null;
145 }
146 this.recordingOutputStream.close();
147 }
148
149 public ReplayInputStream getReplayInputStream() throws IOException {
150 return this.recordingOutputStream.getReplayInputStream();
151 }
152
153 public ReplayInputStream getContentReplayInputStream() throws IOException {
154 return this.recordingOutputStream.getContentReplayInputStream();
155 }
156
157 public long readFully() throws IOException {
158 while(read(drainBuffer) != -1) {
159
160 continue;
161 }
162 return this.recordingOutputStream.getSize();
163 }
164
165 /***
166 * Read all of a stream (Or read until we timeout or have read to the max).
167 * @param softMaxLength Maximum length to read; if zero or < 0, then no
168 * limit. If met, return normally.
169 * @param hardMaxLength Maximum length to read; if zero or < 0, then no
170 * limit. If exceeded, throw RecorderLengthExceededException
171 * @param timeout Timeout in milliseconds for total read; if zero or
172 * negative, timeout is <code>Long.MAX_VALUE</code>. If exceeded, throw
173 * RecorderTimeoutException
174 * @param maxBytesPerMs How many bytes per millisecond.
175 * @throws IOException failed read.
176 * @throws RecorderLengthExceededException
177 * @throws RecorderTimeoutException
178 * @throws InterruptedException
179 */
180 public void readFullyOrUntil(long softMaxLength)
181 throws IOException, RecorderLengthExceededException,
182 RecorderTimeoutException, InterruptedException {
183
184 if (!isOpen()) {
185
186 return;
187 }
188
189 long totalBytes = 0L;
190 long bytesRead = -1L;
191 long maxToRead = -1;
192 while (true) {
193 try {
194
195 maxToRead = (softMaxLength <= 0)
196 ? drainBuffer.length
197 : Math.min(drainBuffer.length, softMaxLength - totalBytes);
198
199 maxToRead = Math.min(maxToRead, recordingOutputStream.getRemainingLength());
200
201 maxToRead = Math.max(maxToRead, 1);
202
203 bytesRead = read(drainBuffer,0,(int)maxToRead);
204 if (bytesRead == -1) {
205 break;
206 }
207 totalBytes += bytesRead;
208
209 if (Thread.interrupted()) {
210 throw new InterruptedException("Interrupted during IO");
211 }
212 } catch (SocketTimeoutException e) {
213
214
215
216
217
218
219
220 if (logger.isLoggable(Level.FINE)) {
221 logger.log(Level.FINE, "socket timeout", e);
222 }
223
224 recordingOutputStream.checkLimits();
225 } catch (SocketException se) {
226 throw se;
227 } catch (NullPointerException e) {
228
229
230
231
232
233 throw new NullPointerException("Stream " + this.in + ", " +
234 e.getMessage() + " " + Thread.currentThread().getName());
235 }
236
237
238 if (softMaxLength > 0 && totalBytes >= softMaxLength) {
239 break;
240 }
241 }
242 }
243
244 public long getSize() {
245 return this.recordingOutputStream.getSize();
246 }
247
248 public void markContentBegin() {
249 this.recordingOutputStream.markContentBegin();
250 }
251
252 public long getContentBegin() {
253 return this.recordingOutputStream.getContentBegin();
254 }
255
256 public void startDigest() {
257 this.recordingOutputStream.startDigest();
258 }
259
260 /***
261 * Convenience method for setting SHA1 digest.
262 */
263 public void setSha1Digest() {
264 this.recordingOutputStream.setSha1Digest();
265 }
266
267 /***
268 * Sets a digest algorithm which may be applied to recorded data.
269 * As usually only a subset of the recorded data should
270 * be fed to the digest, you must also call startDigest()
271 * to begin digesting.
272 *
273 * @param algorithm
274 */
275 public void setDigest(String algorithm) {
276 this.recordingOutputStream.setDigest(algorithm);
277 }
278
279 /***
280 * Sets a digest function which may be applied to recorded data.
281 * As usually only a subset of the recorded data should
282 * be fed to the digest, you must also call startDigest()
283 * to begin digesting.
284 *
285 * @param md
286 */
287 public void setDigest(MessageDigest md) {
288 this.recordingOutputStream.setDigest(md);
289 }
290
291 /***
292 * Return the digest value for any recorded, digested data. Call
293 * only after all data has been recorded; otherwise, the running
294 * digest state is ruined.
295 *
296 * @return the digest final value
297 */
298 public byte[] getDigestValue() {
299 return this.recordingOutputStream.getDigestValue();
300 }
301
302 public ReplayCharSequence getReplayCharSequence() throws IOException {
303 return getReplayCharSequence(null);
304 }
305
306 /***
307 * @param characterEncoding Encoding of recorded stream.
308 * @return A ReplayCharSequence Will return null if an IOException. Call
309 * close on returned RCS when done.
310 * @throws IOException
311 */
312 public ReplayCharSequence getReplayCharSequence(String characterEncoding)
313 throws IOException {
314 return this.recordingOutputStream.
315 getReplayCharSequence(characterEncoding);
316 }
317
318 public long getResponseContentLength() {
319 return this.recordingOutputStream.getResponseContentLength();
320 }
321
322 public void closeRecorder() throws IOException {
323 this.recordingOutputStream.closeRecorder();
324 }
325
326 /***
327 * @param tempFile
328 * @throws IOException
329 */
330 public void copyContentBodyTo(File tempFile) throws IOException {
331 FileOutputStream fos = new FileOutputStream(tempFile);
332 ReplayInputStream ris = getContentReplayInputStream();
333 ris.readFullyTo(fos);
334 fos.close();
335 ris.close();
336 }
337
338 /***
339 * @return True if we've been opened.
340 */
341 public boolean isOpen()
342 {
343 return this.in != null;
344 }
345
346 @Override
347 public synchronized void mark(int readlimit) {
348 this.in.mark(readlimit);
349 this.recordingOutputStream.mark();
350 }
351
352 @Override
353 public boolean markSupported() {
354 return this.in.markSupported();
355 }
356
357 @Override
358 public synchronized void reset() throws IOException {
359 this.in.reset();
360 this.recordingOutputStream.reset();
361 }
362
363 /***
364 * Set limits to be enforced by internal recording-out
365 */
366 public void setLimits(long hardMax, long timeoutMs, long maxRateKBps) {
367 recordingOutputStream.setLimits(hardMax, timeoutMs, maxRateKBps);
368 }
369 }