1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.io;
26
27 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
28
29 import java.io.FileOutputStream;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.nio.charset.Charset;
33 import java.security.MessageDigest;
34 import java.security.NoSuchAlgorithmException;
35 import java.util.logging.Level;
36 import java.util.logging.Logger;
37
38
39 /***
40 * An output stream that records all writes to wrapped output
41 * stream.
42 *
43 * A RecordingOutputStream can be wrapped around any other
44 * OutputStream to record all bytes written to it. You can
45 * then request a ReplayInputStream to read those bytes.
46 *
47 * <p>The RecordingOutputStream uses an in-memory buffer and
48 * backing disk file to allow it to record streams of
49 * arbitrary length limited only by available disk space.
50 *
51 * <p>As long as the stream recorded is smaller than the
52 * in-memory buffer, no disk access will occur.
53 *
54 * <p>Recorded content can be recovered as a ReplayInputStream
55 * (via getReplayInputStream() or, for only the content after
56 * the content-begin-mark is set, getContentReplayInputStream() )
57 * or as a ReplayCharSequence (via getReplayCharSequence()).
58 *
59 * <p>This class is also used as a straight output stream
60 * by {@link RecordingInputStream} to which it records all reads.
61 * {@link RecordingInputStream} is exploiting the file backed buffer
62 * facility of this class passing <code>null</code> for the stream
63 * to wrap. TODO: Make a FileBackedOutputStream class that is
64 * subclassed by RecordingInputStream.
65 *
66 * @author gojomo
67 *
68 */
69 public class RecordingOutputStream extends OutputStream {
70 protected static Logger logger =
71 Logger.getLogger(RecordingOutputStream.class.getName());
72
73 /***
74 * Size of recording.
75 *
76 * Later passed to ReplayInputStream on creation. It uses it to know when
77 * EOS.
78 */
79 private long size = 0;
80
81 private String backingFilename;
82 private OutputStream diskStream = null;
83
84 /***
85 * Buffer we write recordings to.
86 *
87 * We write all recordings here first till its full. Thereafter we
88 * write the backing file.
89 */
90 private byte[] buffer;
91
92 /*** current virtual position in the recording */
93 private long position;
94
95 /*** flag to disable recording */
96 private boolean recording;
97
98 /***
99 * Reusable buffer for FastBufferedOutputStream
100 */
101 protected byte[] bufStreamBuf =
102 new byte [ FastBufferedOutputStream.DEFAULT_BUFFER_SIZE ];
103
104 /***
105 * True if we're to digest content.
106 */
107 private boolean shouldDigest = false;
108
109 /***
110 * Digest instance.
111 */
112 private MessageDigest digest = null;
113
114 /***
115 * Define for SHA1 alogarithm.
116 */
117 private static final String SHA1 = "SHA1";
118
119 /***
120 * Maximum amount of header material to accept without the content
121 * body beginning -- if more, throw a RecorderTooMuchHeaderException.
122 * TODO: make configurable? make smaller?
123 */
124 protected static final long MAX_HEADER_MATERIAL = 1024*1024;
125
126
127 /*** maximum length of material to record before throwing exception */
128 protected long maxLength = Long.MAX_VALUE;
129 /*** maximum time to record before throwing exception */
130 protected long timeoutMs = Long.MAX_VALUE;
131 /*** maximum rate to record (adds delays to hit target rate) */
132 protected long maxRateBytesPerMs = Long.MAX_VALUE;
133 /*** time recording begins for timeout, rate calculations */
134 protected long startTime = Long.MAX_VALUE;
135
136 /***
137 * When recording HTTP, where the content-body starts.
138 */
139 private long contentBeginMark;
140
141 /***
142 * Stream to record.
143 */
144 private OutputStream out = null;
145
146
147 /*** furthest position reached before any reset()s */
148 private long maxPosition = 0;
149 /*** remembered position to reset() to */
150 private long markPosition = 0;
151
152 /***
153 * Create a new RecordingOutputStream.
154 *
155 * @param bufferSize Buffer size to use.
156 * @param backingFilename Name of backing file to use.
157 */
158 public RecordingOutputStream(int bufferSize, String backingFilename) {
159 this.buffer = new byte[bufferSize];
160 this.backingFilename = backingFilename;
161 recording = true;
162 }
163
164 /***
165 * Wrap the given stream, both recording and passing along any data written
166 * to this RecordingOutputStream.
167 *
168 * @throws IOException If failed creation of backing file.
169 */
170 public void open() throws IOException {
171 this.open(null);
172 }
173
174 /***
175 * Wrap the given stream, both recording and passing along any data written
176 * to this RecordingOutputStream.
177 *
178 * @param wrappedStream Stream to wrap. May be null for case where we
179 * want to write to a file backed stream only.
180 *
181 * @throws IOException If failed creation of backing file.
182 */
183 public void open(OutputStream wrappedStream) throws IOException {
184 if(isOpen()) {
185
186
187 throw new IOException("ROS already open for "
188 +Thread.currentThread().getName());
189 }
190 this.out = wrappedStream;
191 this.position = 0;
192 this.markPosition = 0;
193 this.maxPosition = 0;
194 this.size = 0;
195 this.contentBeginMark = -1;
196
197 this.recording = true;
198
199 this.shouldDigest = false;
200 if (this.diskStream != null) {
201 closeDiskStream();
202 }
203 if (this.diskStream == null) {
204
205 FileOutputStream fis = new FileOutputStream(this.backingFilename);
206
207 this.diskStream = new RecyclingFastBufferedOutputStream(fis, bufStreamBuf);
208 }
209 startTime = System.currentTimeMillis();
210 }
211
212 public void write(int b) throws IOException {
213 if(position<maxPosition) {
214
215 position++;
216 return;
217 }
218 if(recording) {
219 record(b);
220 }
221 if (this.out != null) {
222 this.out.write(b);
223 }
224 checkLimits();
225 }
226
227 public void write(byte[] b, int off, int len) throws IOException {
228 if(position < maxPosition) {
229 if(position+len<=maxPosition) {
230
231 position += len;
232 return;
233 }
234
235 long consumeRange = maxPosition - position;
236 position += consumeRange;
237 off += consumeRange;
238 len -= consumeRange;
239 }
240 if(recording) {
241 record(b, off, len);
242 }
243 if (this.out != null) {
244 this.out.write(b, off, len);
245 }
246 checkLimits();
247 }
248
249 /***
250 * Check any enforced limits.
251 */
252 protected void checkLimits() throws RecorderIOException {
253
254 if (contentBeginMark<0) {
255
256 if(position>MAX_HEADER_MATERIAL) {
257 throw new RecorderTooMuchHeaderException();
258 }
259 }
260
261 if(position>maxLength) {
262 throw new RecorderLengthExceededException();
263 }
264
265 long duration = System.currentTimeMillis() - startTime;
266 duration = Math.max(duration,1);
267 if(duration>timeoutMs) {
268 throw new RecorderTimeoutException();
269 }
270
271 if(position/duration > maxRateBytesPerMs) {
272 long desiredDuration = position / maxRateBytesPerMs;
273 try {
274 Thread.sleep(desiredDuration-duration);
275 } catch (InterruptedException e) {
276 logger.log(Level.WARNING,
277 "bandwidth throttling sleep interrupted", e);
278 }
279 }
280 }
281
282 /***
283 * Record the given byte for later recovery
284 *
285 * @param b Int to record.
286 *
287 * @exception IOException Failed write to backing file.
288 */
289 private void record(int b) throws IOException {
290 if (this.shouldDigest) {
291 this.digest.update((byte)b);
292 }
293 if (this.position >= this.buffer.length) {
294
295
296 assert this.diskStream != null: "Diskstream is null";
297 this.diskStream.write(b);
298 } else {
299 this.buffer[(int) this.position] = (byte) b;
300 }
301 this.position++;
302 }
303
304 /***
305 * Record the given byte-array range for recovery later
306 *
307 * @param b Buffer to record.
308 * @param off Offset into buffer at which to start recording.
309 * @param len Length of buffer to record.
310 *
311 * @exception IOException Failed write to backing file.
312 */
313 private void record(byte[] b, int off, int len) throws IOException {
314 if(this.shouldDigest) {
315 assert this.digest != null: "Digest is null.";
316 this.digest.update(b, off, len);
317 }
318 tailRecord(b, off, len);
319 }
320
321 /***
322 * Record without digesting.
323 *
324 * @param b Buffer to record.
325 * @param off Offset into buffer at which to start recording.
326 * @param len Length of buffer to record.
327 *
328 * @exception IOException Failed write to backing file.
329 */
330 private void tailRecord(byte[] b, int off, int len) throws IOException {
331 if(this.position >= this.buffer.length){
332
333
334 if (this.diskStream == null) {
335 throw new IOException("diskstream is null");
336 }
337 this.diskStream.write(b, off, len);
338 this.position += len;
339 } else {
340 assert this.buffer != null: "Buffer is null";
341 int toCopy = (int)Math.min(this.buffer.length - this.position, len);
342 assert b != null: "Passed buffer is null";
343 System.arraycopy(b, off, this.buffer, (int)this.position, toCopy);
344 this.position += toCopy;
345
346 if (toCopy < len) {
347 tailRecord(b, off + toCopy, len - toCopy);
348 }
349 }
350 }
351
352 public void close() throws IOException {
353 if(contentBeginMark<0) {
354
355
356 contentBeginMark = 0;
357 }
358 if (this.out != null) {
359 this.out.close();
360 this.out = null;
361 }
362 closeRecorder();
363 }
364
365 protected synchronized void closeDiskStream()
366 throws IOException {
367 if (this.diskStream != null) {
368 this.diskStream.close();
369 this.diskStream = null;
370 }
371 }
372
373 public void closeRecorder() throws IOException {
374 recording = false;
375 closeDiskStream();
376
377
378 if (this.size == 0) {
379 this.size = this.position;
380 }
381 }
382
383
384
385
386 public void flush() throws IOException {
387 if (this.out != null) {
388 this.out.flush();
389 }
390 if (this.diskStream != null) {
391 this.diskStream.flush();
392 }
393 }
394
395 public ReplayInputStream getReplayInputStream() throws IOException {
396 return getReplayInputStream(0);
397 }
398
399 public ReplayInputStream getReplayInputStream(long skip) throws IOException {
400
401
402
403 assert this.out == null: "Stream is still open.";
404 ReplayInputStream replay = new ReplayInputStream(this.buffer,
405 this.size, this.contentBeginMark, this.backingFilename);
406 replay.skip(skip);
407 return replay;
408 }
409
410 /***
411 * Return a replay stream, cued up to begining of content
412 *
413 * @throws IOException
414 * @return An RIS.
415 */
416 public ReplayInputStream getContentReplayInputStream() throws IOException {
417 return getReplayInputStream(this.contentBeginMark);
418 }
419
420 public long getSize() {
421 return this.size;
422 }
423
424 /***
425 * Remember the current position as the start of the "response
426 * body". Useful when recording HTTP traffic as a way to start
427 * replays after the headers.
428 */
429 public void markContentBegin() {
430 this.contentBeginMark = this.position;
431 startDigest();
432 }
433
434 /***
435 * Return stored content-begin-mark (which is also end-of-headers)
436 */
437 public long getContentBegin() {
438 return this.contentBeginMark;
439 }
440
441 /***
442 * Starts digesting recorded data, if a MessageDigest has been
443 * set.
444 */
445 public void startDigest() {
446 if (this.digest != null) {
447 this.digest.reset();
448 this.shouldDigest = true;
449 }
450 }
451
452 /***
453 * Convenience method for setting SHA1 digest.
454 * @see #setDigest(String)
455 */
456 public void setSha1Digest() {
457 setDigest(SHA1);
458 }
459
460
461 /***
462 * Sets a digest function which may be applied to recorded data.
463 * The difference between calling this method and {@link #setDigest(MessageDigest)}
464 * is that this method tries to reuse MethodDigest instance if already allocated
465 * and of appropriate algorithm.
466 * @param algorithm Message digest algorithm to use.
467 * @see #setDigest(MessageDigest)
468 */
469 public void setDigest(String algorithm) {
470 try {
471
472 if (this.digest == null ||
473 !this.digest.getAlgorithm().equals(algorithm)) {
474 setDigest(MessageDigest.getInstance(algorithm));
475 }
476 } catch (NoSuchAlgorithmException e) {
477 e.printStackTrace();
478 }
479 }
480
481 /***
482 * Sets a digest function which may be applied to recorded data.
483 *
484 * As usually only a subset of the recorded data should
485 * be fed to the digest, you must also call startDigest()
486 * to begin digesting.
487 *
488 * @param md Message digest function to use.
489 */
490 public void setDigest(MessageDigest md) {
491 this.digest = md;
492 }
493
494 /***
495 * Return the digest value for any recorded, digested data. Call
496 * only after all data has been recorded; otherwise, the running
497 * digest state is ruined.
498 *
499 * @return the digest final value
500 */
501 public byte[] getDigestValue() {
502 if(this.digest == null) {
503 return null;
504 }
505 return this.digest.digest();
506 }
507
508 public ReplayCharSequence getReplayCharSequence() throws IOException {
509 return getReplayCharSequence(null);
510 }
511
512 public ReplayCharSequence getReplayCharSequence(String characterEncoding)
513 throws IOException {
514 return getReplayCharSequence(characterEncoding, this.contentBeginMark);
515 }
516
517 private static final String canonicalLatin1 = Charset.forName("iso8859-1").name();
518
519 /***
520 * @param characterEncoding Encoding of recorded stream.
521 * @return A ReplayCharSequence Will return null if an IOException. Call
522 * close on returned RCS when done.
523 * @throws IOException
524 */
525 public ReplayCharSequence getReplayCharSequence(String characterEncoding,
526 long startOffset) throws IOException {
527 if (characterEncoding == null) {
528
529 characterEncoding = canonicalLatin1;
530 }
531 try {
532
533 characterEncoding = Charset.forName(characterEncoding).name();
534 } catch (IllegalArgumentException iae) {
535
536 characterEncoding = canonicalLatin1;
537 }
538
539 if (canonicalLatin1.equals(characterEncoding)) {
540 return new Latin1ByteReplayCharSequence(
541 this.buffer,
542 this.size,
543 startOffset,
544 this.backingFilename);
545 } else {
546
547 if(this.size <= this.buffer.length) {
548
549 return new GenericReplayCharSequence(
550 this.buffer,
551 this.size,
552 startOffset,
553 characterEncoding);
554
555 } else {
556
557 ReplayInputStream ris = getReplayInputStream(startOffset);
558 ReplayCharSequence rcs = new GenericReplayCharSequence(
559 ris,
560 this.backingFilename,
561 characterEncoding);
562 ris.close();
563 return rcs;
564 }
565
566 }
567
568 }
569
570 public long getResponseContentLength() {
571 return this.size - this.contentBeginMark;
572 }
573
574 /***
575 * @return True if this ROS is open.
576 */
577 public boolean isOpen() {
578 return this.out != null;
579 }
580
581 /***
582 * When used alongside a mark-supporting RecordingInputStream, remember
583 * a position reachable by a future reset().
584 */
585 public void mark() {
586
587 this.markPosition = position;
588 }
589
590 /***
591 * When used alongside a mark-supporting RecordingInputStream, reset
592 * the position to that saved by previous mark(). Until the position
593 * again reached "new" material, none of the bytes pushed to this
594 * stream will be digested or recorded.
595 */
596 public void reset() {
597
598 maxPosition = Math.max(maxPosition, position);
599
600 position = markPosition;
601 }
602
603 /***
604 * Set limits on length, time, and rate to enforce.
605 *
606 * @param length
607 * @param milliseconds
608 * @param rateKBps
609 */
610 public void setLimits(long length, long milliseconds, long rateKBps) {
611 maxLength = (length>0) ? length : Long.MAX_VALUE;
612 timeoutMs = (milliseconds>0) ? milliseconds : Long.MAX_VALUE;
613 maxRateBytesPerMs = (rateKBps>0) ? rateKBps*1024/1000 : Long.MAX_VALUE;
614 }
615
616 /***
617 * Reset limits to effectively-unlimited defaults
618 */
619 public void resetLimits() {
620 maxLength = Long.MAX_VALUE;
621 timeoutMs = Long.MAX_VALUE;
622 maxRateBytesPerMs = Long.MAX_VALUE;
623 }
624
625 /***
626 * Return number of bytes that could be recorded without hitting
627 * length limit
628 *
629 * @return long byte count
630 */
631 public long getRemainingLength() {
632 return maxLength - position;
633 }
634 }