View Javadoc

1   /* Created on 2006-okt-03
2   *
3   * Copyright (C) 2006 National Library of Sweden.
4   *
5   * This program is free software; you can redistribute it and/or
6   * modify it under the terms of the GNU Lesser General Public License
7   * as published by the Free Software Foundation; either version 2
8   * of the License, or (at your option) any later version.
9   *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
18  */
19  
20  package org.archive.crawler.writer;
21  
22  import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
23  
24  import java.io.ByteArrayOutputStream;
25  import java.io.File;
26  import java.io.FileOutputStream;
27  import java.io.IOException;
28  import java.io.OutputStream;
29  import java.net.InetAddress;
30  import java.security.MessageDigest;
31  import java.security.NoSuchAlgorithmException;
32  import java.util.logging.Level;
33  import java.util.logging.Logger;
34  
35  import javax.management.AttributeNotFoundException;
36  import javax.management.MBeanException;
37  import javax.management.ReflectionException;
38  
39  import org.archive.crawler.datamodel.CoreAttributeConstants;
40  import org.archive.crawler.datamodel.CrawlHost;
41  import org.archive.crawler.datamodel.CrawlURI;
42  import org.archive.crawler.framework.Processor;
43  import org.archive.crawler.settings.SimpleType;
44  import org.archive.crawler.settings.Type;
45  import org.archive.io.ReplayInputStream;
46  import org.archive.crawler.writer.Kw3Constants;
47  
48  /***
49   * Processor module that writes the results of successful fetches to
50   * files on disk. These files are MIME-files of the type used by the
51   * Swedish National Library's Kulturarw3 web harvesting [http://www.kb.se/kw3/].
52   *  
53   * Each URI gets written to its own file and has a path consisting of:
54   * <ul>
55   *  <li> A dir named with the first two chars of the website's md5. </li>
56   *  <li> A dir named after the website. </li>
57   *  <li> 'current' - a dir indicating that this is the directory being written
58   *                   to by the ongoing crawl. </li>
59   *  <li> A file on the format <md5 of url>.<fetchtime in seconds> </li>
60   * </ul>
61   * Example: '/53/www.kb.se/current/6879ad79c0ccf886ee8ca55d80e5d6a1.1169211837'
62   * 
63   * The MIME-file itself consists of three parts:
64   * <ul>
65   *  <li> 1. ArchiveInfo - Metadata about the file and its content. </li>
66   *  <li> 2. Header - The HTTP response header. </li>
67   *  <li> 3. Content - The HTTP response content, plus content-type. </li>
68   * </ul>
69   * 
70   * @author oskar
71   */
72  public class Kw3WriterProcessor extends Processor implements
73        CoreAttributeConstants, Kw3Constants {
74  
75    private static final long serialVersionUID = 7171448068924684594L;
76    
77    private static String COLON = ":";
78    private static String WS = " ";
79    private static String LF = "\n";
80    
81    /***
82     * Logger.
83     */
84    private static final Logger logger =
85        Logger.getLogger(Kw3WriterProcessor.class.getName());
86        
87    /***
88     * Key to use asking settings for arc path value.
89     */
90    public static final String ATTR_PATH ="path";
91    
92    /***
93     * Default path.
94     */
95    private static final String DEFAULT_PATH = "arcs";
96    
97    /***
98     * Key to use asking settings for max size value.
99     */
100   public static final String ATTR_MAX_SIZE_BYTES = "max-size-bytes";
101   
102   /***
103    * Default max file size.
104    */
105   public static final int  DEFAULT_MAX_FILE_SIZE = 10000000;
106   
107   /***
108    * Key to use asking settings if chmod should be execuated .
109    */
110   public static final String ATTR_CHMOD = "chmod";
111   
112   /***
113    * Key to use asking settings for the new chmod value.
114    */
115   public static final String ATTR_CHMOD_VALUE = "chmod-value";
116   
117   /***
118    * Default value for permissions.
119    */
120   public static final String  DEFAULT_CHMOD_VALUE = "777";
121   
122   /***
123    * Key for the maximum ARC bytes to write attribute.
124    */
125   public static final String ATTR_MAX_BYTES_WRITTEN = "total-bytes-to-write";
126   
127   /***
128    * Key for the collection attribute.
129    */
130   public static final String ATTR_COLLECTION = "collection";
131   
132   /***
133    * Default value for collection.
134    */
135   public static final String  DEFAULT_COLLECTION_VALUE = "kw3";
136   
137   /***
138    * Key for the harvester attribute.
139    */
140   public static final String ATTR_HARVESTER = "harvester";
141   
142   /***
143    * Default value for harvester.
144    */
145   public static final String  DEFAULT_HARVESTER_VALUE = "heritrix";
146  
147   private static String BOUNDARY_START = "KulturArw3_";
148   
149   /*
150    * Private members for settings
151    */
152   private File arcsDir;
153   
154   private boolean chmod;
155   
156   private String chmodValue;
157   
158   private int maxSize;
159   
160   private String collection;
161   
162   private String harvester;
163   
164   
165   /***
166    * @param name Name of this processor.
167    */
168   public Kw3WriterProcessor(String name) {
169       super(name, "Kw3Writer processor. " +
170           "A writer that writes files in the MIME format of The " +
171           "Swedish National Library.  See this class's javadoc for" +
172           "format exposition.");
173       Type e; 
174       e = addElementToDefinition(new SimpleType(ATTR_PATH,
175               "Top-level directory for archive files.", DEFAULT_PATH));
176       e.setOverrideable(false);
177       e = addElementToDefinition(new SimpleType(ATTR_COLLECTION,
178               "Name of collection.", DEFAULT_COLLECTION_VALUE));
179       e.setOverrideable(false);
180       e = addElementToDefinition(new SimpleType(ATTR_HARVESTER,
181               "Name of the harvester that is used for the web harvesting.",
182               DEFAULT_HARVESTER_VALUE));
183       e.setOverrideable(false);
184       e = addElementToDefinition(new SimpleType(ATTR_MAX_SIZE_BYTES, 
185               "Max size of each file", new Integer(DEFAULT_MAX_FILE_SIZE)));
186       e.setOverrideable(false);
187       e = addElementToDefinition(new SimpleType(ATTR_CHMOD, 
188               "Should permissions be changed for the newly created dirs",
189               new Boolean(true)));
190       e.setOverrideable(false);
191       e = addElementToDefinition(new SimpleType(ATTR_CHMOD_VALUE, 
192               "What should the permissions be set to." +
193               " Given as three octal digits, as to the UNIX 'chmod' command." +
194               " Ex. 777 for all permissions to everyone.",
195               DEFAULT_CHMOD_VALUE));
196       e.setOverrideable(false);
197   }
198 
199   protected void initialTasks () {
200       try {
201           String arcsDirPath = (String) getAttribute(ATTR_PATH);
202           this.arcsDir = new File(arcsDirPath);
203           if (!this.arcsDir.isAbsolute()) 
204               this.arcsDir = new File(getController().getDisk(), arcsDirPath);
205           
206           this.collection = (String) getAttribute(ATTR_COLLECTION);
207           this.harvester = (String) getAttribute(ATTR_HARVESTER);
208           this.chmod = (Boolean) getAttribute(ATTR_CHMOD);
209           this.chmodValue = (String) getAttribute(ATTR_CHMOD_VALUE);            
210           this.maxSize = (Integer) getAttribute(ATTR_MAX_SIZE_BYTES);          
211       } catch (AttributeNotFoundException e) {
212           logger.log(Level.WARNING, "attribute error", e);
213       } catch (MBeanException e) {
214           logger.log(Level.WARNING, "attribute error", e);
215       } catch (ReflectionException e) {
216           logger.log(Level.WARNING, "attribute error", e);
217       }     
218   }      
219   
220   protected void innerProcess(CrawlURI curi) {
221       // Only successful fetches are written.
222       if (!curi.isSuccess()) 
223           return;
224       // Only http and https schemes are supported.
225       String scheme = curi.getUURI().getScheme().toLowerCase();
226       if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme))
227           return;        
228       
229       // Write the MIME-file
230       try {
231           writeMimeFile(curi);
232       } catch (IOException e) {
233           logger.log(Level.WARNING, "i/o error", e);
234       }      
235   }
236   
237   /*
238    * The actual writing of the Kulturarw3 MIME-file.
239    * 
240    * The MIME-file consists of three parts:
241    * 1. ArchiveInfo - Metadata about the file and its content.
242    * 2. Header - The HTTP response header.
243    * 3. Content - The HTTP response content, plus content-type.
244    * 
245    * For more on this format, see '?'.
246    */
247   protected void writeMimeFile(CrawlURI curi) throws IOException {
248       ReplayInputStream ris = null;
249       OutputStream out = null;
250                 
251       try {
252           String boundary = BOUNDARY_START + stringToMD5(curi.toString());
253           ris = curi.getHttpRecorder().getRecordedInput().
254               getReplayInputStream();
255           out = initOutputStream(curi);
256           
257           // Part 1: Archive info
258           writeArchiveInfoPart(boundary, curi, ris, out);
259 
260           // Part 2: Header info + HTTP header
261           writeHeaderPart(boundary, ris, out);
262 
263           // Part 3: Content info + HTTP content
264           writeContentPart(boundary, curi, ris, out);
265 
266           // And finally the terminator string
267           String terminator = "\n--" + boundary + "--\n";
268           out.write(terminator.getBytes());
269       } finally {
270           if (ris != null)
271               ris.close();
272           if (out != null)
273               out.close();
274       }
275   }
276   
277   /*
278    * Get the OutputStream for the file to write to.
279    * 
280    * It has a path consisting of:
281    * 1. A dir named with the first two chars of the website's md5.
282    * 2. A dir named after the website.
283    * 3. 'current' - a dir indicating that this is the directory being written
284    *                to by the ongoing crawl. 
285    * 4. A file on the format <md5 of url>.<fetchtime in seconds>
286    * 
287    * Example: '/53/www.kb.se/current/6879ad79c0ccf886ee8ca55d80e5d6a1.1169211837'            
288    */
289   protected OutputStream initOutputStream(CrawlURI curi) throws IOException {
290       String uri = curi.toString();
291       int port = curi.getUURI().getPort();
292       String host = (port == 80 || port <= 0) ?
293               curi.getUURI().getHost() : curi.getUURI().getHost() + ":" + port;
294       long fetchTime = curi.getLong(A_FETCH_BEGAN_TIME) / 1000;
295              
296       String md5 = stringToMD5(host);
297       File dir = new File(this.arcsDir, md5.substring(0, 2) + "/" + host +
298               "/current");
299       if (!dir.exists()) {
300           dir.mkdirs();
301           if (this.chmod)
302               chmods(dir, this.arcsDir);
303       }
304       md5 = stringToMD5(uri);
305       File arcFile = new File(dir, md5 + "." + fetchTime);
306       return new FastBufferedOutputStream(new FileOutputStream(arcFile));       
307   }
308   
309   protected void writeArchiveInfoPart(String boundary, CrawlURI curi,
310           ReplayInputStream ris, OutputStream out)
311           throws IOException {
312       // Get things we need to write in this part
313       String uri = curi.toString();
314       String ip = getHostAddress(curi);
315       long headerLength = ris.getHeaderSize();
316       long contentLength = ris.getContentSize();
317       long archiveTime = System.currentTimeMillis() / 1000; // Fetchtime in seconds
318       int statusCode = curi.getFetchStatus();
319       String headerMd5 = null;
320       Object contentMd5 = null;       
321       
322       // Get headerMd5
323       ByteArrayOutputStream baos = new ByteArrayOutputStream();
324       ris.readHeaderTo(baos);
325       headerMd5 = stringToMD5(baos.toString());              
326       
327       // Get contentMd5
328       contentMd5 = curi.getContentDigest();
329       if (contentMd5 != null)
330           contentMd5 = getHexString((byte[]) contentMd5);
331       
332       StringBuffer buffer = new StringBuffer();
333       buffer.append("MIME-version: 1.1" + LF);
334       buffer.append("Content-Type: multipart/mixed; boundary=" + boundary + LF);
335       buffer.append("HTTP-Part: ArchiveInfo" + LF);
336       buffer.append(COLLECTION_KEY + COLON + WS + this.collection + LF);
337       buffer.append(HARVESTER_KEY + COLON + WS + this.harvester + LF);
338       buffer.append(URL_KEY + COLON + WS + uri + LF);
339       buffer.append(IP_ADDRESS_KEY + COLON + WS + ip + LF);
340       buffer.append(HEADER_LENGTH_KEY + COLON + WS + headerLength + LF);
341       buffer.append(HEADER_MD5_KEY + COLON + WS + headerMd5 + LF);
342       buffer.append(CONTENT_LENGTH_KEY + COLON + WS + contentLength + LF);
343       buffer.append(CONTENT_MD5_KEY + COLON + WS + contentMd5 + LF);
344       buffer.append(ARCHIVE_TIME_KEY + COLON + WS+ archiveTime + LF);
345       buffer.append(STATUS_CODE_KEY + COLON + WS + statusCode + LF + LF);       
346       out.write(buffer.toString().getBytes());       
347   }
348   
349   protected void writeHeaderPart(String boundary, ReplayInputStream ris,
350           OutputStream out) 
351           throws IOException {
352       StringBuffer buffer = new StringBuffer();
353       buffer.append("--" + boundary + LF);
354       buffer.append("Content-Type: text/plain; charset=\"US-ascii\"" + LF);
355       buffer.append("HTTP-Part: Header" + LF + LF );
356       out.write(buffer.toString().getBytes());
357       ris.readHeaderTo(out);       
358   }
359   
360   protected void writeContentPart(String boundary, CrawlURI curi,
361           ReplayInputStream ris, OutputStream out) 
362           throws IOException {
363       // Get things we need to write in this part
364       String uri = curi.toString();
365       String contentType = curi.getContentType();
366       long contentLength = ris.getContentSize();      
367       // Only write content if there is some
368       if (contentLength == 0)   return;
369              
370       StringBuffer buffer = new StringBuffer();
371       buffer.append("--" + boundary + LF);
372       buffer.append("Content-Type: " + contentType + LF);
373       buffer.append("HTTP-Part: Content" + LF + LF);
374       out.write(buffer.toString().getBytes());
375       
376       if (contentLength > this.maxSize) {
377           ris.readContentTo(out, this.maxSize);
378           logger.info(" Truncated url: " + uri + ", Size: " + contentLength +
379                   ", Content-type: " + contentType);
380       } else {
381           ris.readContentTo(out);
382       }
383   }
384 
385   // --- Private helper functions --- //
386   /*
387    * Get a MD5 checksum based on a String. 
388    */ 
389   private String stringToMD5(String str) {
390       try {
391           byte b[] = str.getBytes();
392           MessageDigest md = MessageDigest.getInstance("MD5");
393           md.update(b);
394           byte[] digest = md.digest();
395           return getHexString(digest);
396       } catch (NoSuchAlgorithmException e) {
397           logger.log(Level.WARNING, "md5 error", e);
398       } 
399       return null;
400   }
401 
402   /* 
403    * Fast convert a byte array to a hex string with possible leading zero.
404    */
405   private String getHexString(byte[] b) {
406       StringBuffer sb = new StringBuffer();
407       for (int i = 0; i < b.length; i++) {
408           String tmp = Integer.toHexString(b[i] & 0xff);
409           if (tmp.length() < 2)
410               sb.append("0" + tmp);
411           else
412               sb.append(tmp);
413       }
414       return sb.toString();
415   }
416 
417   /* 
418    * Chmods for all newly created directories.
419    */
420   private void chmods(File dir, File arcsDir) {
421       String topdir = arcsDir.getAbsolutePath();
422       chmod(dir, this.chmodValue);
423       File parent = dir.getParentFile();
424       while (!parent.getAbsolutePath().equalsIgnoreCase((topdir))) {
425           chmod(parent, this.chmodValue);
426           parent = parent.getParentFile();
427       }
428       
429   }
430 
431   /* 
432    * Chmod for a specific file or directory.
433    */
434   private void chmod(File file, String permissions) {
435       Process proc = null;
436       try {
437           proc = Runtime.getRuntime().exec("chmod " + permissions + " " +
438                   file.getAbsolutePath());
439           proc.waitFor();
440           proc.getInputStream().close();
441           proc.getOutputStream().close();
442           proc.getErrorStream().close();
443       } catch (IOException e) {
444           logger.log(Level.WARNING, "chmod failed", e);
445       } catch (InterruptedException e) {
446           logger.log(Level.WARNING, "chmod failed", e);
447       }
448   }
449 
450   private String getHostAddress(CrawlURI curi) {
451       CrawlHost h = getController().getServerCache().getHostFor(curi);
452       if (h == null) {
453           throw new NullPointerException("Crawlhost is null for " + curi + " " +
454                   curi.getVia());
455       }
456       InetAddress a = h.getIP();
457       if (a == null) {
458           throw new NullPointerException("Address is null for " + curi + " " +
459              curi.getVia() + ". Address " +
460                  ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP) ?
461                      "was never looked up." :
462                      (System.currentTimeMillis() - h.getIpFetched()) + " ms ago."));
463       }
464       return h.getIP().getHostAddress();
465   }
466 }