1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package org.archive.crawler.frontier;
27
28 import java.io.File;
29 import java.io.FileNotFoundException;
30 import java.io.IOException;
31 import java.io.Serializable;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Queue;
37 import java.util.TreeSet;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.logging.Level;
40 import java.util.logging.Logger;
41
42 import javax.management.AttributeNotFoundException;
43
44 import org.apache.commons.collections.Closure;
45 import org.archive.crawler.datamodel.CrawlURI;
46 import org.archive.crawler.datamodel.UriUniqFilter;
47 import org.archive.crawler.framework.CrawlController;
48 import org.archive.crawler.framework.FrontierMarker;
49 import org.archive.crawler.framework.exceptions.FatalConfigurationException;
50 import org.archive.crawler.settings.SimpleType;
51 import org.archive.crawler.settings.Type;
52 import org.archive.crawler.util.BdbUriUniqFilter;
53 import org.archive.crawler.util.BloomUriUniqFilter;
54 import org.archive.crawler.util.CheckpointUtils;
55 import org.archive.crawler.util.DiskFPMergeUriUniqFilter;
56 import org.archive.crawler.util.MemFPMergeUriUniqFilter;
57 import org.archive.queue.StoredQueue;
58 import org.archive.util.ArchiveUtils;
59 import org.archive.util.Supplier;
60
61 import com.sleepycat.je.Database;
62 import com.sleepycat.je.DatabaseException;
63
64 /***
65 * A Frontier using several BerkeleyDB JE Databases to hold its record of
66 * known hosts (queues), and pending URIs.
67 *
68 * @author Gordon Mohr
69 */
70 public class BdbFrontier extends WorkQueueFrontier implements Serializable {
71
72 private static final long serialVersionUID = ArchiveUtils
73 .classnameBasedUID(BdbFrontier.class, 1);
74
75 private static final Logger logger =
76 Logger.getLogger(BdbFrontier.class.getName());
77
78 /*** all URIs scheduled to be crawled */
79 protected transient BdbMultipleWorkQueues pendingUris;
80
81 /*** all URI-already-included options available to be chosen */
82 private String[] AVAILABLE_INCLUDED_OPTIONS = new String[] {
83 BdbUriUniqFilter.class.getName(),
84 BloomUriUniqFilter.class.getName(),
85 MemFPMergeUriUniqFilter.class.getName(),
86 DiskFPMergeUriUniqFilter.class.getName()};
87
88 /*** URI-already-included to use (by class name) */
89 public final static String ATTR_INCLUDED = "uri-included-structure";
90
91 private final static String DEFAULT_INCLUDED =
92 BdbUriUniqFilter.class.getName();
93
94 /*** URI-already-included to use (by class name) */
95 public final static String ATTR_DUMP_PENDING_AT_CLOSE =
96 "dump-pending-at-close";
97 private final static Boolean DEFAULT_DUMP_PENDING_AT_CLOSE =
98 Boolean.FALSE;
99
100
101 /***
102 * Constructor.
103 * @param name Name for of this Frontier.
104 */
105 public BdbFrontier(String name) {
106 this(name, "BdbFrontier. "
107 + "A Frontier using BerkeleyDB Java Edition databases for "
108 + "persistence to disk.");
109 }
110
111 /***
112 * Create the BdbFrontier
113 *
114 * @param name
115 * @param description
116 */
117 public BdbFrontier(String name, String description) {
118 super(name, description);
119 Type t = addElementToDefinition(new SimpleType(ATTR_INCLUDED,
120 "Structure to use for tracking already-seen URIs. Non-default " +
121 "options may require additional configuration via system " +
122 "properties.", DEFAULT_INCLUDED, AVAILABLE_INCLUDED_OPTIONS));
123 t.setExpertSetting(true);
124 t = addElementToDefinition(new SimpleType(ATTR_DUMP_PENDING_AT_CLOSE,
125 "Whether to dump all URIs waiting in queues to crawl.log " +
126 "when a crawl ends. May add a significant delay to " +
127 "crawl termination. Dumped lines will have a zero (0) " +
128 "status.", DEFAULT_DUMP_PENDING_AT_CLOSE));
129 t.setExpertSetting(true);
130 }
131
132 /***
133 * Create the single object (within which is one BDB database)
134 * inside which all the other queues live.
135 *
136 * @return the created BdbMultipleWorkQueues
137 * @throws DatabaseException
138 */
139 private BdbMultipleWorkQueues createMultipleWorkQueues()
140 throws DatabaseException {
141 return new BdbMultipleWorkQueues(this.controller.getBdbEnvironment(),
142 this.controller.getBdbEnvironment().getClassCatalog(),
143 this.controller.isCheckpointRecover());
144 }
145
146
147 @Override
148 protected void initQueuesOfQueues() {
149 if(this.controller.isCheckpointRecover()) {
150
151 return;
152 }
153
154
155 readyClassQueues = new LinkedBlockingQueue<String>();
156
157 try {
158 Database inactiveQueuesDb = this.controller.getBdbEnvironment()
159 .openDatabase(null, "inactiveQueues",
160 StoredQueue.databaseConfig());
161 inactiveQueues = new StoredQueue<String>(inactiveQueuesDb,
162 String.class, null);
163 Database retiredQueuesDb = this.controller.getBdbEnvironment()
164 .openDatabase(null, "retiredQueues",
165 StoredQueue.databaseConfig());
166 retiredQueues = new StoredQueue<String>(retiredQueuesDb,
167 String.class, null);
168 } catch (DatabaseException e) {
169 throw new RuntimeException(e);
170 }
171
172
173
174
175 snoozedClassQueues = Collections.synchronizedSortedSet(new TreeSet<WorkQueue>());
176 }
177
178 protected Queue<String> reinit(Queue<String> q, String name) {
179 try {
180
181 Database db = this.controller.getBdbEnvironment()
182 .openDatabase(null, name, StoredQueue.databaseConfig());
183
184 StoredQueue<String> queue;
185 if(q instanceof StoredQueue) {
186 queue = (StoredQueue<String>) q;
187 queue.hookupDatabase(db, String.class, null);
188 } else {
189
190 queue = new StoredQueue<String>(db,String.class,
191 this.controller.getBdbEnvironment().getClassCatalog());
192 queue.addAll(q);
193 }
194 return queue;
195 } catch (DatabaseException e) {
196 throw new RuntimeException(e);
197 }
198 }
199
200 /***
201 * Create a UriUniqFilter that will serve as record
202 * of already seen URIs.
203 *
204 * @return A UURISet that will serve as a record of already seen URIs
205 * @throws IOException
206 */
207 protected UriUniqFilter createAlreadyIncluded() throws IOException {
208 UriUniqFilter uuf;
209 String c = null;
210 try {
211 c = (String)getAttribute(null, ATTR_INCLUDED);
212 } catch (AttributeNotFoundException e) {
213
214 }
215
216
217 if (c != null && c.equals(BloomUriUniqFilter.class.getName())) {
218 uuf = this.controller.isCheckpointRecover()?
219 deserializeAlreadySeen(BloomUriUniqFilter.class,
220 this.controller.getCheckpointRecover().getDirectory()):
221 new BloomUriUniqFilter();
222 } else if (c!=null && c.equals(MemFPMergeUriUniqFilter.class.getName())) {
223
224 uuf = new MemFPMergeUriUniqFilter();
225 } else if (c!=null && c.equals(DiskFPMergeUriUniqFilter.class.getName())) {
226
227 uuf = new DiskFPMergeUriUniqFilter(controller.getScratchDisk());
228 } else {
229
230 uuf = this.controller.isCheckpointRecover()?
231 deserializeAlreadySeen(BdbUriUniqFilter.class,
232 this.controller.getCheckpointRecover().getDirectory()):
233 new BdbUriUniqFilter(this.controller.getBdbEnvironment());
234 if (this.controller.isCheckpointRecover()) {
235
236 try {
237 ((BdbUriUniqFilter)uuf).
238 reopen(this.controller.getBdbEnvironment());
239 } catch (DatabaseException e) {
240 throw new IOException(e.getMessage());
241 }
242 }
243 }
244 uuf.setDestination(this);
245 return uuf;
246 }
247
248 protected UriUniqFilter deserializeAlreadySeen(
249 final Class<? extends UriUniqFilter> cls,
250 final File dir)
251 throws FileNotFoundException, IOException {
252 UriUniqFilter uuf = null;
253 try {
254 logger.fine("Started deserializing " + cls.getName() +
255 " of checkpoint recover.");
256 uuf = CheckpointUtils.readObjectFromFile(cls, dir);
257 logger.fine("Finished deserializing bdbje as part " +
258 "of checkpoint recover.");
259 } catch (ClassNotFoundException e) {
260 throw new IOException("Failed to deserialize " +
261 cls.getName() + ": " + e.getMessage());
262 }
263 return uuf;
264 }
265
266 /***
267 * Return the work queue for the given CrawlURI's classKey. URIs
268 * are ordered and politeness-delayed within their 'class'.
269 *
270 * @param curi CrawlURI to base queue on
271 * @return the found or created BdbWorkQueue
272 */
273 protected WorkQueue getQueueFor(final CrawlURI curi) {
274 final String classKey = curi.getClassKey();
275 synchronized (allQueues) {
276 WorkQueue wq = allQueues.getOrUse(
277 classKey,
278 new Supplier<WorkQueue>() {
279 public WorkQueue get() {
280 String qKey = new String(classKey);
281 WorkQueue q = new BdbWorkQueue(qKey, BdbFrontier.this);
282 q.setTotalBudget(((Long)getUncheckedAttribute(
283 curi,ATTR_QUEUE_TOTAL_BUDGET)).longValue());
284 return q;
285 }});
286 return wq;
287 }
288 }
289
290 /***
291 * Return the work queue for the given classKey, or null
292 * if no such queue exists.
293 *
294 * @param classKey key to look for
295 * @return the found WorkQueue
296 */
297 protected WorkQueue getQueueFor(String classKey) {
298 WorkQueue wq;
299 synchronized (allQueues) {
300 wq = (WorkQueue)allQueues.get(classKey);
301 }
302 return wq;
303 }
304
305 public FrontierMarker getInitialMarker(String regexpr,
306 boolean inCacheOnly) {
307 return pendingUris.getInitialMarker(regexpr);
308 }
309
310 /***
311 * Return list of urls.
312 * @param marker
313 * @param numberOfMatches
314 * @param verbose
315 * @return List of URIs (strings).
316 */
317 public ArrayList<String> getURIsList(FrontierMarker marker,
318 int numberOfMatches, final boolean verbose) {
319 List curis;
320 try {
321 curis = pendingUris.getFrom(marker, numberOfMatches);
322 } catch (DatabaseException e) {
323 e.printStackTrace();
324 throw new RuntimeException(e);
325 }
326 ArrayList<String> results = new ArrayList<String>(curis.size());
327 Iterator iter = curis.iterator();
328 while(iter.hasNext()) {
329 CrawlURI curi = (CrawlURI) iter.next();
330 results.add("["+curi.getClassKey()+"] "+curi.singleLineReport());
331 }
332 return results;
333 }
334
335 protected void initQueue() throws IOException {
336 try {
337 this.pendingUris = createMultipleWorkQueues();
338 } catch(DatabaseException e) {
339 throw (IOException)new IOException(e.getMessage()).initCause(e);
340 }
341 }
342
343 public void finalTasks() {
344 if((Boolean)getUncheckedAttribute(null,ATTR_DUMP_PENDING_AT_CLOSE)) {
345 try {
346 dumpAllPendingToLog();
347 } catch (DatabaseException e) {
348 logger.log(Level.WARNING,"dump pending problem",e);
349 }
350 }
351 }
352
353 protected void closeQueue() {
354 if (this.pendingUris != null) {
355 this.pendingUris.close();
356 this.pendingUris = null;
357 }
358 }
359
360 protected BdbMultipleWorkQueues getWorkQueues() {
361 return pendingUris;
362 }
363
364 protected boolean workQueueDataOnDisk() {
365 return true;
366 }
367
368 public void initialize(CrawlController c)
369 throws FatalConfigurationException, IOException {
370 this.controller = c;
371
372
373 if (c.isCheckpointRecover()) {
374
375
376
377
378
379
380
381 BdbFrontier f = null;
382 try {
383 f = (BdbFrontier)CheckpointUtils.
384 readObjectFromFile(this.getClass(),
385 c.getCheckpointRecover().getDirectory());
386 } catch (FileNotFoundException e) {
387 throw new FatalConfigurationException("Failed checkpoint " +
388 "recover: " + e.getMessage());
389 } catch (IOException e) {
390 throw new FatalConfigurationException("Failed checkpoint " +
391 "recover: " + e.getMessage());
392 } catch (ClassNotFoundException e) {
393 throw new FatalConfigurationException("Failed checkpoint " +
394 "recover: " + e.getMessage());
395 }
396
397 this.nextOrdinal = f.nextOrdinal;
398 this.totalProcessedBytes = f.totalProcessedBytes;
399 this.liveDisregardedUriCount = f.liveDisregardedUriCount;
400 this.liveFailedFetchCount = f.liveFailedFetchCount;
401 this.processedBytesAfterLastEmittedURI =
402 f.processedBytesAfterLastEmittedURI;
403 this.liveQueuedUriCount = f.liveQueuedUriCount;
404 this.liveSucceededFetchCount = f.liveSucceededFetchCount;
405 this.lastMaxBandwidthKB = f.lastMaxBandwidthKB;
406 this.readyClassQueues = f.readyClassQueues;
407 this.inactiveQueues = reinit(f.inactiveQueues,"inactiveQueues");
408 this.retiredQueues = reinit(f.retiredQueues,"retiredQueues");
409 this.snoozedClassQueues = f.snoozedClassQueues;
410 this.inProcessQueues = f.inProcessQueues;
411 super.initialize(c);
412 wakeQueues();
413 } else {
414
415 super.initialize(c);
416 }
417 }
418
419
420
421 @Override
422 public void crawlEnded(String sExitMessage) {
423 ((StoredQueue)inactiveQueues).close();
424 ((StoredQueue)retiredQueues).close();
425 super.crawlEnded(sExitMessage);
426 }
427
428 public void crawlCheckpoint(File checkpointDir) throws Exception {
429 super.crawlCheckpoint(checkpointDir);
430 logger.fine("Started serializing already seen as part "
431 + "of checkpoint. Can take some time.");
432
433
434 if (this.pendingUris != null) {
435 this.pendingUris.sync();
436 }
437 CheckpointUtils.writeObjectToFile(this.alreadyIncluded, checkpointDir);
438 logger.fine("Finished serializing already seen as part "
439 + "of checkpoint.");
440
441 CheckpointUtils.writeObjectToFile(this, checkpointDir);
442 }
443
444 /***
445 * Dump all still-enqueued URIs to the crawl.log -- without actually
446 * dequeuing. Useful for understanding what was remaining in a
447 * crawl that was ended early, for example at a time limit.
448 *
449 * @throws DatabaseException
450 */
451 public void dumpAllPendingToLog() throws DatabaseException {
452 Closure tolog = new Closure() {
453 public void execute(Object curi) {
454 log((CrawlURI)curi);
455 }};
456 pendingUris.forAllPendingDo(tolog);
457 }
458 }