1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.archive.crawler.writer;
21
22 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.File;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.OutputStream;
29 import java.net.InetAddress;
30 import java.security.MessageDigest;
31 import java.security.NoSuchAlgorithmException;
32 import java.util.logging.Level;
33 import java.util.logging.Logger;
34
35 import javax.management.AttributeNotFoundException;
36 import javax.management.MBeanException;
37 import javax.management.ReflectionException;
38
39 import org.archive.crawler.datamodel.CoreAttributeConstants;
40 import org.archive.crawler.datamodel.CrawlHost;
41 import org.archive.crawler.datamodel.CrawlURI;
42 import org.archive.crawler.framework.Processor;
43 import org.archive.crawler.settings.SimpleType;
44 import org.archive.crawler.settings.Type;
45 import org.archive.io.ReplayInputStream;
46 import org.archive.crawler.writer.Kw3Constants;
47
48 /***
49 * Processor module that writes the results of successful fetches to
50 * files on disk. These files are MIME-files of the type used by the
51 * Swedish National Library's Kulturarw3 web harvesting [http://www.kb.se/kw3/].
52 *
53 * Each URI gets written to its own file and has a path consisting of:
54 * <ul>
55 * <li> A dir named with the first two chars of the website's md5. </li>
56 * <li> A dir named after the website. </li>
57 * <li> 'current' - a dir indicating that this is the directory being written
58 * to by the ongoing crawl. </li>
59 * <li> A file on the format <md5 of url>.<fetchtime in seconds> </li>
60 * </ul>
61 * Example: '/53/www.kb.se/current/6879ad79c0ccf886ee8ca55d80e5d6a1.1169211837'
62 *
63 * The MIME-file itself consists of three parts:
64 * <ul>
65 * <li> 1. ArchiveInfo - Metadata about the file and its content. </li>
66 * <li> 2. Header - The HTTP response header. </li>
67 * <li> 3. Content - The HTTP response content, plus content-type. </li>
68 * </ul>
69 *
70 * @author oskar
71 */
72 public class Kw3WriterProcessor extends Processor implements
73 CoreAttributeConstants, Kw3Constants {
74
75 private static final long serialVersionUID = 7171448068924684594L;
76
77 private static String COLON = ":";
78 private static String WS = " ";
79 private static String LF = "\n";
80
81 /***
82 * Logger.
83 */
84 private static final Logger logger =
85 Logger.getLogger(Kw3WriterProcessor.class.getName());
86
87 /***
88 * Key to use asking settings for arc path value.
89 */
90 public static final String ATTR_PATH ="path";
91
92 /***
93 * Default path.
94 */
95 private static final String DEFAULT_PATH = "arcs";
96
97 /***
98 * Key to use asking settings for max size value.
99 */
100 public static final String ATTR_MAX_SIZE_BYTES = "max-size-bytes";
101
102 /***
103 * Default max file size.
104 */
105 public static final int DEFAULT_MAX_FILE_SIZE = 10000000;
106
107 /***
108 * Key to use asking settings if chmod should be execuated .
109 */
110 public static final String ATTR_CHMOD = "chmod";
111
112 /***
113 * Key to use asking settings for the new chmod value.
114 */
115 public static final String ATTR_CHMOD_VALUE = "chmod-value";
116
117 /***
118 * Default value for permissions.
119 */
120 public static final String DEFAULT_CHMOD_VALUE = "777";
121
122 /***
123 * Key for the maximum ARC bytes to write attribute.
124 */
125 public static final String ATTR_MAX_BYTES_WRITTEN = "total-bytes-to-write";
126
127 /***
128 * Key for the collection attribute.
129 */
130 public static final String ATTR_COLLECTION = "collection";
131
132 /***
133 * Default value for collection.
134 */
135 public static final String DEFAULT_COLLECTION_VALUE = "kw3";
136
137 /***
138 * Key for the harvester attribute.
139 */
140 public static final String ATTR_HARVESTER = "harvester";
141
142 /***
143 * Default value for harvester.
144 */
145 public static final String DEFAULT_HARVESTER_VALUE = "heritrix";
146
147 private static String BOUNDARY_START = "KulturArw3_";
148
149
150
151
152 private File arcsDir;
153
154 private boolean chmod;
155
156 private String chmodValue;
157
158 private int maxSize;
159
160 private String collection;
161
162 private String harvester;
163
164
165 /***
166 * @param name Name of this processor.
167 */
168 public Kw3WriterProcessor(String name) {
169 super(name, "Kw3Writer processor. " +
170 "A writer that writes files in the MIME format of The " +
171 "Swedish National Library. See this class's javadoc for" +
172 "format exposition.");
173 Type e;
174 e = addElementToDefinition(new SimpleType(ATTR_PATH,
175 "Top-level directory for archive files.", DEFAULT_PATH));
176 e.setOverrideable(false);
177 e = addElementToDefinition(new SimpleType(ATTR_COLLECTION,
178 "Name of collection.", DEFAULT_COLLECTION_VALUE));
179 e.setOverrideable(false);
180 e = addElementToDefinition(new SimpleType(ATTR_HARVESTER,
181 "Name of the harvester that is used for the web harvesting.",
182 DEFAULT_HARVESTER_VALUE));
183 e.setOverrideable(false);
184 e = addElementToDefinition(new SimpleType(ATTR_MAX_SIZE_BYTES,
185 "Max size of each file", new Integer(DEFAULT_MAX_FILE_SIZE)));
186 e.setOverrideable(false);
187 e = addElementToDefinition(new SimpleType(ATTR_CHMOD,
188 "Should permissions be changed for the newly created dirs",
189 new Boolean(true)));
190 e.setOverrideable(false);
191 e = addElementToDefinition(new SimpleType(ATTR_CHMOD_VALUE,
192 "What should the permissions be set to." +
193 " Given as three octal digits, as to the UNIX 'chmod' command." +
194 " Ex. 777 for all permissions to everyone.",
195 DEFAULT_CHMOD_VALUE));
196 e.setOverrideable(false);
197 }
198
199 protected void initialTasks () {
200 try {
201 String arcsDirPath = (String) getAttribute(ATTR_PATH);
202 this.arcsDir = new File(arcsDirPath);
203 if (!this.arcsDir.isAbsolute())
204 this.arcsDir = new File(getController().getDisk(), arcsDirPath);
205
206 this.collection = (String) getAttribute(ATTR_COLLECTION);
207 this.harvester = (String) getAttribute(ATTR_HARVESTER);
208 this.chmod = (Boolean) getAttribute(ATTR_CHMOD);
209 this.chmodValue = (String) getAttribute(ATTR_CHMOD_VALUE);
210 this.maxSize = (Integer) getAttribute(ATTR_MAX_SIZE_BYTES);
211 } catch (AttributeNotFoundException e) {
212 logger.log(Level.WARNING, "attribute error", e);
213 } catch (MBeanException e) {
214 logger.log(Level.WARNING, "attribute error", e);
215 } catch (ReflectionException e) {
216 logger.log(Level.WARNING, "attribute error", e);
217 }
218 }
219
220 protected void innerProcess(CrawlURI curi) {
221
222 if (!curi.isSuccess())
223 return;
224
225 String scheme = curi.getUURI().getScheme().toLowerCase();
226 if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme))
227 return;
228
229
230 try {
231 writeMimeFile(curi);
232 } catch (IOException e) {
233 logger.log(Level.WARNING, "i/o error", e);
234 }
235 }
236
237
238
239
240
241
242
243
244
245
246
247 protected void writeMimeFile(CrawlURI curi) throws IOException {
248 ReplayInputStream ris = null;
249 OutputStream out = null;
250
251 try {
252 String boundary = BOUNDARY_START + stringToMD5(curi.toString());
253 ris = curi.getHttpRecorder().getRecordedInput().
254 getReplayInputStream();
255 out = initOutputStream(curi);
256
257
258 writeArchiveInfoPart(boundary, curi, ris, out);
259
260
261 writeHeaderPart(boundary, ris, out);
262
263
264 writeContentPart(boundary, curi, ris, out);
265
266
267 String terminator = "\n--" + boundary + "--\n";
268 out.write(terminator.getBytes());
269 } finally {
270 if (ris != null)
271 ris.close();
272 if (out != null)
273 out.close();
274 }
275 }
276
277
278
279
280
281
282
283
284
285
286
287
288
289 protected OutputStream initOutputStream(CrawlURI curi) throws IOException {
290 String uri = curi.toString();
291 int port = curi.getUURI().getPort();
292 String host = (port == 80 || port <= 0) ?
293 curi.getUURI().getHost() : curi.getUURI().getHost() + ":" + port;
294 long fetchTime = curi.getLong(A_FETCH_BEGAN_TIME) / 1000;
295
296 String md5 = stringToMD5(host);
297 File dir = new File(this.arcsDir, md5.substring(0, 2) + "/" + host +
298 "/current");
299 if (!dir.exists()) {
300 dir.mkdirs();
301 if (this.chmod)
302 chmods(dir, this.arcsDir);
303 }
304 md5 = stringToMD5(uri);
305 File arcFile = new File(dir, md5 + "." + fetchTime);
306 return new FastBufferedOutputStream(new FileOutputStream(arcFile));
307 }
308
309 protected void writeArchiveInfoPart(String boundary, CrawlURI curi,
310 ReplayInputStream ris, OutputStream out)
311 throws IOException {
312
313 String uri = curi.toString();
314 String ip = getHostAddress(curi);
315 long headerLength = ris.getHeaderSize();
316 long contentLength = ris.getContentSize();
317 long archiveTime = System.currentTimeMillis() / 1000;
318 int statusCode = curi.getFetchStatus();
319 String headerMd5 = null;
320 Object contentMd5 = null;
321
322
323 ByteArrayOutputStream baos = new ByteArrayOutputStream();
324 ris.readHeaderTo(baos);
325 headerMd5 = stringToMD5(baos.toString());
326
327
328 contentMd5 = curi.getContentDigest();
329 if (contentMd5 != null)
330 contentMd5 = getHexString((byte[]) contentMd5);
331
332 StringBuffer buffer = new StringBuffer();
333 buffer.append("MIME-version: 1.1" + LF);
334 buffer.append("Content-Type: multipart/mixed; boundary=" + boundary + LF);
335 buffer.append("HTTP-Part: ArchiveInfo" + LF);
336 buffer.append(COLLECTION_KEY + COLON + WS + this.collection + LF);
337 buffer.append(HARVESTER_KEY + COLON + WS + this.harvester + LF);
338 buffer.append(URL_KEY + COLON + WS + uri + LF);
339 buffer.append(IP_ADDRESS_KEY + COLON + WS + ip + LF);
340 buffer.append(HEADER_LENGTH_KEY + COLON + WS + headerLength + LF);
341 buffer.append(HEADER_MD5_KEY + COLON + WS + headerMd5 + LF);
342 buffer.append(CONTENT_LENGTH_KEY + COLON + WS + contentLength + LF);
343 buffer.append(CONTENT_MD5_KEY + COLON + WS + contentMd5 + LF);
344 buffer.append(ARCHIVE_TIME_KEY + COLON + WS+ archiveTime + LF);
345 buffer.append(STATUS_CODE_KEY + COLON + WS + statusCode + LF + LF);
346 out.write(buffer.toString().getBytes());
347 }
348
349 protected void writeHeaderPart(String boundary, ReplayInputStream ris,
350 OutputStream out)
351 throws IOException {
352 StringBuffer buffer = new StringBuffer();
353 buffer.append("--" + boundary + LF);
354 buffer.append("Content-Type: text/plain; charset=\"US-ascii\"" + LF);
355 buffer.append("HTTP-Part: Header" + LF + LF );
356 out.write(buffer.toString().getBytes());
357 ris.readHeaderTo(out);
358 }
359
360 protected void writeContentPart(String boundary, CrawlURI curi,
361 ReplayInputStream ris, OutputStream out)
362 throws IOException {
363
364 String uri = curi.toString();
365 String contentType = curi.getContentType();
366 long contentLength = ris.getContentSize();
367
368 if (contentLength == 0) return;
369
370 StringBuffer buffer = new StringBuffer();
371 buffer.append("--" + boundary + LF);
372 buffer.append("Content-Type: " + contentType + LF);
373 buffer.append("HTTP-Part: Content" + LF + LF);
374 out.write(buffer.toString().getBytes());
375
376 if (contentLength > this.maxSize) {
377 ris.readContentTo(out, this.maxSize);
378 logger.info(" Truncated url: " + uri + ", Size: " + contentLength +
379 ", Content-type: " + contentType);
380 } else {
381 ris.readContentTo(out);
382 }
383 }
384
385
386
387
388
389 private String stringToMD5(String str) {
390 try {
391 byte b[] = str.getBytes();
392 MessageDigest md = MessageDigest.getInstance("MD5");
393 md.update(b);
394 byte[] digest = md.digest();
395 return getHexString(digest);
396 } catch (NoSuchAlgorithmException e) {
397 logger.log(Level.WARNING, "md5 error", e);
398 }
399 return null;
400 }
401
402
403
404
405 private String getHexString(byte[] b) {
406 StringBuffer sb = new StringBuffer();
407 for (int i = 0; i < b.length; i++) {
408 String tmp = Integer.toHexString(b[i] & 0xff);
409 if (tmp.length() < 2)
410 sb.append("0" + tmp);
411 else
412 sb.append(tmp);
413 }
414 return sb.toString();
415 }
416
417
418
419
420 private void chmods(File dir, File arcsDir) {
421 String topdir = arcsDir.getAbsolutePath();
422 chmod(dir, this.chmodValue);
423 File parent = dir.getParentFile();
424 while (!parent.getAbsolutePath().equalsIgnoreCase((topdir))) {
425 chmod(parent, this.chmodValue);
426 parent = parent.getParentFile();
427 }
428
429 }
430
431
432
433
434 private void chmod(File file, String permissions) {
435 Process proc = null;
436 try {
437 proc = Runtime.getRuntime().exec("chmod " + permissions + " " +
438 file.getAbsolutePath());
439 proc.waitFor();
440 proc.getInputStream().close();
441 proc.getOutputStream().close();
442 proc.getErrorStream().close();
443 } catch (IOException e) {
444 logger.log(Level.WARNING, "chmod failed", e);
445 } catch (InterruptedException e) {
446 logger.log(Level.WARNING, "chmod failed", e);
447 }
448 }
449
450 private String getHostAddress(CrawlURI curi) {
451 CrawlHost h = getController().getServerCache().getHostFor(curi);
452 if (h == null) {
453 throw new NullPointerException("Crawlhost is null for " + curi + " " +
454 curi.getVia());
455 }
456 InetAddress a = h.getIP();
457 if (a == null) {
458 throw new NullPointerException("Address is null for " + curi + " " +
459 curi.getVia() + ". Address " +
460 ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP) ?
461 "was never looked up." :
462 (System.currentTimeMillis() - h.getIpFetched()) + " ms ago."));
463 }
464 return h.getIP().getHostAddress();
465 }
466 }