View Javadoc

1   /* BdbFrontier
2    * 
3    * $Id: BdbFrontier.java 6815 2010-04-12 21:32:49Z gojomo $
4   * 
5    * Created on Sep 24, 2004
6    *
7    *  Copyright (C) 2004 Internet Archive.
8    *
9    * This file is part of the Heritrix web crawler (crawler.archive.org).
10   *
11   * Heritrix is free software; you can redistribute it and/or modify
12   * it under the terms of the GNU Lesser Public License as published by
13   * the Free Software Foundation; either version 2.1 of the License, or
14   * any later version.
15   *
16   * Heritrix is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU Lesser Public License for more details.
20   *
21   * You should have received a copy of the GNU Lesser Public License
22   * along with Heritrix; if not, write to the Free Software
23   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24   *
25    */
26  package org.archive.crawler.frontier;
27  
28  import java.io.File;
29  import java.io.FileNotFoundException;
30  import java.io.IOException;
31  import java.io.Serializable;
32  import java.util.ArrayList;
33  import java.util.Collections;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Queue;
37  import java.util.TreeSet;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.logging.Level;
40  import java.util.logging.Logger;
41  
42  import javax.management.AttributeNotFoundException;
43  
44  import org.apache.commons.collections.Closure;
45  import org.archive.crawler.datamodel.CrawlURI;
46  import org.archive.crawler.datamodel.UriUniqFilter;
47  import org.archive.crawler.framework.CrawlController;
48  import org.archive.crawler.framework.FrontierMarker;
49  import org.archive.crawler.framework.exceptions.FatalConfigurationException;
50  import org.archive.crawler.settings.SimpleType;
51  import org.archive.crawler.settings.Type;
52  import org.archive.crawler.util.BdbUriUniqFilter;
53  import org.archive.crawler.util.BloomUriUniqFilter;
54  import org.archive.crawler.util.CheckpointUtils;
55  import org.archive.crawler.util.DiskFPMergeUriUniqFilter;
56  import org.archive.crawler.util.MemFPMergeUriUniqFilter;
57  import org.archive.queue.StoredQueue;
58  import org.archive.util.ArchiveUtils;
59  import org.archive.util.Supplier;
60  
61  import com.sleepycat.je.Database;
62  import com.sleepycat.je.DatabaseException;
63  
64  /***
65   * A Frontier using several BerkeleyDB JE Databases to hold its record of
66   * known hosts (queues), and pending URIs. 
67   *
68   * @author Gordon Mohr
69   */
70  public class BdbFrontier extends WorkQueueFrontier implements Serializable {
71      // be robust against trivial implementation changes
72      private static final long serialVersionUID = ArchiveUtils
73          .classnameBasedUID(BdbFrontier.class, 1);
74  
75      private static final Logger logger =
76          Logger.getLogger(BdbFrontier.class.getName());
77  
78      /*** all URIs scheduled to be crawled */
79      protected transient BdbMultipleWorkQueues pendingUris;
80  
81      /*** all URI-already-included options available to be chosen */
82      private String[] AVAILABLE_INCLUDED_OPTIONS = new String[] {
83              BdbUriUniqFilter.class.getName(),
84              BloomUriUniqFilter.class.getName(),
85              MemFPMergeUriUniqFilter.class.getName(),
86              DiskFPMergeUriUniqFilter.class.getName()};
87      
88      /*** URI-already-included to use (by class name) */
89      public final static String ATTR_INCLUDED = "uri-included-structure";
90      
91      private final static String DEFAULT_INCLUDED =
92          BdbUriUniqFilter.class.getName();
93      
94      /*** URI-already-included to use (by class name) */
95      public final static String ATTR_DUMP_PENDING_AT_CLOSE = 
96          "dump-pending-at-close";
97      private final static Boolean DEFAULT_DUMP_PENDING_AT_CLOSE = 
98          Boolean.FALSE;
99  
100     
101     /***
102      * Constructor.
103      * @param name Name for of this Frontier.
104      */
105     public BdbFrontier(String name) {
106         this(name, "BdbFrontier. "
107                 + "A Frontier using BerkeleyDB Java Edition databases for "
108                 + "persistence to disk.");
109     }
110 
111     /***
112      * Create the BdbFrontier
113      * 
114      * @param name
115      * @param description
116      */
117     public BdbFrontier(String name, String description) {
118         super(name, description);
119         Type t = addElementToDefinition(new SimpleType(ATTR_INCLUDED,
120                 "Structure to use for tracking already-seen URIs. Non-default " +
121                 "options may require additional configuration via system " +
122                 "properties.", DEFAULT_INCLUDED, AVAILABLE_INCLUDED_OPTIONS));
123         t.setExpertSetting(true);
124         t = addElementToDefinition(new SimpleType(ATTR_DUMP_PENDING_AT_CLOSE,
125                 "Whether to dump all URIs waiting in queues to crawl.log " +
126                 "when a crawl ends. May add a significant delay to " +
127                 "crawl termination. Dumped lines will have a zero (0) " +
128                 "status.", DEFAULT_DUMP_PENDING_AT_CLOSE));
129         t.setExpertSetting(true);
130     }
131 
132     /***
133      * Create the single object (within which is one BDB database)
134      * inside which all the other queues live. 
135      * 
136      * @return the created BdbMultipleWorkQueues
137      * @throws DatabaseException
138      */
139     private BdbMultipleWorkQueues createMultipleWorkQueues()
140     throws DatabaseException {
141         return new BdbMultipleWorkQueues(this.controller.getBdbEnvironment(),
142             this.controller.getBdbEnvironment().getClassCatalog(),
143             this.controller.isCheckpointRecover());
144     }
145 
146     
147     @Override
148     protected void initQueuesOfQueues() {
149         if(this.controller.isCheckpointRecover()) {
150             // do not setup here; take/init from deserialized frontier
151             return; 
152         }
153         // small risk of OutOfMemoryError: if 'hold-queues' is false,
154         // readyClassQueues may grow in size without bound
155         readyClassQueues = new LinkedBlockingQueue<String>();
156 
157         try {
158             Database inactiveQueuesDb = this.controller.getBdbEnvironment()
159                     .openDatabase(null, "inactiveQueues",
160                             StoredQueue.databaseConfig());
161             inactiveQueues = new StoredQueue<String>(inactiveQueuesDb,
162                     String.class, null);
163             Database retiredQueuesDb = this.controller.getBdbEnvironment()
164                     .openDatabase(null, "retiredQueues",
165                             StoredQueue.databaseConfig());
166             retiredQueues = new StoredQueue<String>(retiredQueuesDb,
167                     String.class, null);
168         } catch (DatabaseException e) {
169             throw new RuntimeException(e);
170         }
171         
172         // small risk of OutOfMemoryError: in large crawls with many 
173         // unresponsive queues, an unbounded number of snoozed queues 
174         // may exist
175         snoozedClassQueues = Collections.synchronizedSortedSet(new TreeSet<WorkQueue>());
176     }
177 
178     protected Queue<String> reinit(Queue<String> q, String name) {
179         try {
180             // restore the innner Database/StoredSortedMap of the queue
181             Database db = this.controller.getBdbEnvironment()
182                 .openDatabase(null, name, StoredQueue.databaseConfig());
183             
184             StoredQueue<String> queue;
185             if(q instanceof StoredQueue) {
186                 queue = (StoredQueue<String>) q;
187                 queue.hookupDatabase(db, String.class, null);
188             } else {
189                 // recovery of older checkpoint; copy to StoredQueue
190                 queue = new StoredQueue<String>(db,String.class,
191                         this.controller.getBdbEnvironment().getClassCatalog()); 
192                 queue.addAll(q);
193             }
194             return queue;
195         } catch (DatabaseException e) {
196             throw new RuntimeException(e);
197         }
198     }
199     
200     /***
201      * Create a UriUniqFilter that will serve as record 
202      * of already seen URIs.
203      *
204      * @return A UURISet that will serve as a record of already seen URIs
205      * @throws IOException
206      */
207     protected UriUniqFilter createAlreadyIncluded() throws IOException {
208         UriUniqFilter uuf;
209         String c = null;
210         try {
211             c = (String)getAttribute(null, ATTR_INCLUDED);
212         } catch (AttributeNotFoundException e) {
213             // Do default action if attribute not in order.
214         }
215         // TODO: avoid all this special-casing; enable some common
216         // constructor interface usable for all alt implemenations
217         if (c != null && c.equals(BloomUriUniqFilter.class.getName())) {
218             uuf = this.controller.isCheckpointRecover()?
219                     deserializeAlreadySeen(BloomUriUniqFilter.class,
220                         this.controller.getCheckpointRecover().getDirectory()):
221                     new BloomUriUniqFilter();
222         } else if (c!=null && c.equals(MemFPMergeUriUniqFilter.class.getName())) {
223             // TODO: add checkpointing for MemFPMergeUriUniqFilter
224             uuf = new MemFPMergeUriUniqFilter();
225         } else if (c!=null && c.equals(DiskFPMergeUriUniqFilter.class.getName())) {
226             // TODO: add checkpointing for DiskFPMergeUriUniqFilter
227             uuf = new DiskFPMergeUriUniqFilter(controller.getScratchDisk());
228         } else {
229             // Assume its BdbUriUniqFilter.
230             uuf = this.controller.isCheckpointRecover()?
231                 deserializeAlreadySeen(BdbUriUniqFilter.class,
232                     this.controller.getCheckpointRecover().getDirectory()):
233                 new BdbUriUniqFilter(this.controller.getBdbEnvironment());
234             if (this.controller.isCheckpointRecover()) {
235                 // If recover, need to call reopen of the db.
236                 try {
237                     ((BdbUriUniqFilter)uuf).
238                         reopen(this.controller.getBdbEnvironment());
239                 } catch (DatabaseException e) {
240                     throw new IOException(e.getMessage());
241                 }
242             }   
243         }
244         uuf.setDestination(this);
245         return uuf;
246     }
247     
248     protected UriUniqFilter deserializeAlreadySeen(
249             final Class<? extends UriUniqFilter> cls,
250             final File dir)
251     throws FileNotFoundException, IOException {
252         UriUniqFilter uuf = null;
253         try {
254             logger.fine("Started deserializing " + cls.getName() +
255                 " of checkpoint recover.");
256             uuf = CheckpointUtils.readObjectFromFile(cls, dir);
257             logger.fine("Finished deserializing bdbje as part " +
258                 "of checkpoint recover.");
259         } catch (ClassNotFoundException e) {
260             throw new IOException("Failed to deserialize "  +
261                 cls.getName() + ": " + e.getMessage());
262         }
263         return uuf;
264     }
265 
266     /***
267      * Return the work queue for the given CrawlURI's classKey. URIs
268      * are ordered and politeness-delayed within their 'class'.
269      * 
270      * @param curi CrawlURI to base queue on
271      * @return the found or created BdbWorkQueue
272      */
273     protected WorkQueue getQueueFor(final CrawlURI curi) {
274         final String classKey = curi.getClassKey();
275         synchronized (allQueues) {
276             WorkQueue wq = allQueues.getOrUse(
277                 classKey,
278                 new Supplier<WorkQueue>() {
279                     public WorkQueue get() {
280                         String qKey = new String(classKey); // ensure private minimal key
281                         WorkQueue q = new BdbWorkQueue(qKey, BdbFrontier.this);
282                         q.setTotalBudget(((Long)getUncheckedAttribute(
283                                 curi,ATTR_QUEUE_TOTAL_BUDGET)).longValue()); 
284                         return q;
285                     }});
286             return wq;
287         }
288     }
289     
290     /***
291      * Return the work queue for the given classKey, or null
292      * if no such queue exists.
293      * 
294      * @param classKey key to look for
295      * @return the found WorkQueue
296      */
297     protected WorkQueue getQueueFor(String classKey) {
298         WorkQueue wq; 
299         synchronized (allQueues) {
300             wq = (WorkQueue)allQueues.get(classKey);
301         }
302         return wq;
303     }
304 
305     public FrontierMarker getInitialMarker(String regexpr,
306             boolean inCacheOnly) {
307         return pendingUris.getInitialMarker(regexpr);
308     }
309 
310     /***
311      * Return list of urls.
312      * @param marker
313      * @param numberOfMatches
314      * @param verbose 
315      * @return List of URIs (strings).
316      */
317     public ArrayList<String> getURIsList(FrontierMarker marker, 
318             int numberOfMatches, final boolean verbose) {
319         List curis;
320         try {
321             curis = pendingUris.getFrom(marker, numberOfMatches);
322         } catch (DatabaseException e) {
323             e.printStackTrace();
324             throw new RuntimeException(e);
325         }
326         ArrayList<String> results = new ArrayList<String>(curis.size());
327         Iterator iter = curis.iterator();
328         while(iter.hasNext()) {
329             CrawlURI curi = (CrawlURI) iter.next();
330             results.add("["+curi.getClassKey()+"] "+curi.singleLineReport());
331         }
332         return results;
333     }
334     
335     protected void initQueue() throws IOException {
336         try {
337             this.pendingUris = createMultipleWorkQueues();
338         } catch(DatabaseException e) {
339             throw (IOException)new IOException(e.getMessage()).initCause(e);
340         }
341     }
342     
343     public void finalTasks() {
344     	if((Boolean)getUncheckedAttribute(null,ATTR_DUMP_PENDING_AT_CLOSE)) {
345             try {
346                 dumpAllPendingToLog();
347             } catch (DatabaseException e) {
348                 logger.log(Level.WARNING,"dump pending problem",e);
349             }
350         }
351     }
352     
353     protected void closeQueue() {
354         if (this.pendingUris != null) {
355             this.pendingUris.close();
356             this.pendingUris = null;
357         }
358     }
359         
360     protected BdbMultipleWorkQueues getWorkQueues() {
361         return pendingUris;
362     }
363 
364     protected boolean workQueueDataOnDisk() {
365         return true;
366     }
367     
368     public void initialize(CrawlController c)
369     throws FatalConfigurationException, IOException {
370         this.controller = c;
371         // fill in anything from a checkpoint recovery first (because
372         // usual initialization will skip initQueueOfQueues in checkpoint)
373         if (c.isCheckpointRecover()) {
374             // If a checkpoint recover, copy old values from serialized
375             // instance into this Frontier instance. Do it this way because 
376             // though its possible to serialize BdbFrontier, its currently not
377             // possible to set/remove frontier attribute plugging the
378             // deserialized object back into the settings system.
379             // The below copying over is error-prone because its easy
380             // to miss a value.  Perhaps there's a better way?  Introspection?
381             BdbFrontier f = null;
382             try {
383                 f = (BdbFrontier)CheckpointUtils.
384                     readObjectFromFile(this.getClass(),
385                         c.getCheckpointRecover().getDirectory());
386             } catch (FileNotFoundException e) {
387                 throw new FatalConfigurationException("Failed checkpoint " +
388                     "recover: " + e.getMessage());
389             } catch (IOException e) {
390                 throw new FatalConfigurationException("Failed checkpoint " +
391                     "recover: " + e.getMessage());
392             } catch (ClassNotFoundException e) {
393                 throw new FatalConfigurationException("Failed checkpoint " +
394                     "recover: " + e.getMessage());
395             }
396 
397             this.nextOrdinal = f.nextOrdinal;
398             this.totalProcessedBytes = f.totalProcessedBytes;
399             this.liveDisregardedUriCount = f.liveDisregardedUriCount;
400             this.liveFailedFetchCount = f.liveFailedFetchCount;
401             this.processedBytesAfterLastEmittedURI =
402                 f.processedBytesAfterLastEmittedURI;
403             this.liveQueuedUriCount = f.liveQueuedUriCount;
404             this.liveSucceededFetchCount = f.liveSucceededFetchCount;
405             this.lastMaxBandwidthKB = f.lastMaxBandwidthKB;
406             this.readyClassQueues = f.readyClassQueues;
407             this.inactiveQueues = reinit(f.inactiveQueues,"inactiveQueues");
408             this.retiredQueues = reinit(f.retiredQueues,"retiredQueues");
409             this.snoozedClassQueues = f.snoozedClassQueues;
410             this.inProcessQueues = f.inProcessQueues;
411             super.initialize(c);
412             wakeQueues();
413         } else {
414             // perform usual initialization 
415             super.initialize(c);
416         }
417     }
418 
419     
420     
421     @Override
422     public void crawlEnded(String sExitMessage) {
423         ((StoredQueue)inactiveQueues).close();
424         ((StoredQueue)retiredQueues).close();
425         super.crawlEnded(sExitMessage);
426     }
427 
428     public void crawlCheckpoint(File checkpointDir) throws Exception {
429         super.crawlCheckpoint(checkpointDir);
430         logger.fine("Started serializing already seen as part "
431             + "of checkpoint. Can take some time.");
432         // An explicit sync on the any deferred write dbs is needed to make the
433         // db recoverable. Sync'ing the environment doesn't work.
434         if (this.pendingUris != null) {
435         	this.pendingUris.sync();
436         }
437         CheckpointUtils.writeObjectToFile(this.alreadyIncluded, checkpointDir);
438         logger.fine("Finished serializing already seen as part "
439             + "of checkpoint.");
440         // Serialize ourselves.
441         CheckpointUtils.writeObjectToFile(this, checkpointDir);
442     }
443     
444     /***
445      * Dump all still-enqueued URIs to the crawl.log -- without actually
446      * dequeuing. Useful for understanding what was remaining in a
447      * crawl that was ended early, for example at a time limit. 
448      * 
449      * @throws DatabaseException
450      */
451     public void dumpAllPendingToLog() throws DatabaseException {
452         Closure tolog = new Closure() {
453             public void execute(Object curi) {
454                 log((CrawlURI)curi);
455             }};
456         pendingUris.forAllPendingDo(tolog);
457     }
458 }