1 package org.archive.crawler.frontier;
2
3 import java.io.IOException;
4 import java.io.PrintWriter;
5 import java.io.Serializable;
6 import java.util.logging.Level;
7 import java.util.logging.Logger;
8
9 import org.apache.commons.httpclient.URIException;
10 import org.archive.crawler.datamodel.CrawlSubstats;
11 import org.archive.crawler.datamodel.CrawlURI;
12 import org.archive.crawler.framework.Frontier;
13 import org.archive.net.UURI;
14 import org.archive.net.UURIFactory;
15 import org.archive.util.ArchiveUtils;
16 import org.archive.util.Reporter;
17
18 /***
19 * A single queue of related URIs to visit, grouped by a classKey
20 * (typically "hostname:port" or similar)
21 *
22 * @author gojomo
23 * @author Christian Kohlschuetter
24 */
25 public abstract class WorkQueue implements Frontier.FrontierGroup, Comparable,
26 Serializable, Reporter {
27 static final long serialVersionUID = -1939168792663316048L;
28
29 private static final Logger logger =
30 Logger.getLogger(WorkQueue.class.getName());
31
32 /*** The classKey */
33 protected final String classKey;
34
35 private boolean active = true;
36
37 /*** Total number of stored items */
38 private long count = 0;
39
40 /*** Total number of items ever enqueued */
41 private long enqueueCount = 0;
42
43 /*** Whether queue is already in lifecycle stage */
44 private boolean isHeld = false;
45
46 /*** Time to wake, if snoozed */
47 private long wakeTime = 0;
48
49 /*** Running 'budget' indicating whether queue should stay active */
50 private int sessionBalance = 0;
51
52 /*** Cost of the last item to be charged against queue */
53 private int lastCost = 0;
54
55 /*** Total number of items charged against queue; with totalExpenditure
56 * can be used to calculate 'average cost'. */
57 private long costCount = 0;
58
59 /*** Running tally of total expenditures out of this queue;
60 * tallied only as a URI is finished or retried */
61 private long totalExpenditure = 0;
62
63 /*** Net cost of all currently-enqueued URIs. */
64 private long pendingExpenditure = 0;
65
66 /*** Total to spend on this queue over its lifetime */
67 private long totalBudget = 0;
68
69 /*** The next item to be returned */
70 private CrawlURI peekItem = null;
71
72 /*** Last URI enqueued */
73 private String lastQueued;
74
75 /*** Last URI peeked */
76 private String lastPeeked;
77
78 /*** time of last dequeue (disposition of some URI) **/
79 private long lastDequeueTime;
80
81 /*** count of errors encountered */
82 private long errorCount = 0;
83
84 /*** Substats for all CrawlURIs in this group */
85 protected CrawlSubstats substats = new CrawlSubstats();
86
87 private boolean retired;
88
89 public WorkQueue(final String pClassKey) {
90 this.classKey = pClassKey;
91 }
92
93 /***
94 * Delete URIs matching the given pattern from this queue.
95 * @param frontier
96 * @param match
97 * @return count of deleted URIs
98 */
99 public long deleteMatching(final WorkQueueFrontier frontier, String match) {
100 try {
101 final long deleteCount = deleteMatchingFromQueue(frontier, match);
102 this.count -= deleteCount;
103 return deleteCount;
104 } catch (IOException e) {
105
106 e.printStackTrace();
107 throw new RuntimeException(e);
108 }
109 }
110
111 /***
112 * Add the given CrawlURI, noting its addition in running count. (It
113 * should not already be present.)
114 *
115 * @param frontier Work queues manager.
116 * @param curi CrawlURI to insert.
117 */
118 public synchronized void enqueue(final WorkQueueFrontier frontier,
119 CrawlURI curi) {
120 try {
121 insert(frontier, curi, false);
122 } catch (IOException e) {
123
124 e.printStackTrace();
125 throw new RuntimeException(e);
126 }
127 count++;
128 enqueueCount++;
129 pendingExpenditure += curi.getHolderCost();
130 }
131
132 /***
133 * Return the topmost queue item -- and remember it,
134 * such that even later higher-priority inserts don't
135 * change it.
136 *
137 * TODO: evaluate if this is really necessary
138 * @param frontier Work queues manager
139 *
140 * @return topmost queue item, or null
141 */
142 public CrawlURI peek(final WorkQueueFrontier frontier) {
143 if(peekItem == null && count > 0) {
144 try {
145 peekItem = peekItem(frontier);
146 } catch (IOException e) {
147
148 logger.log(Level.SEVERE,"peek failure",e);
149 e.printStackTrace();
150
151 }
152 if(peekItem != null) {
153 lastPeeked = peekItem.toString();
154 }
155 }
156 return peekItem;
157 }
158
159 /***
160 * Remove the peekItem from the queue and adjusts the count.
161 *
162 * @param frontier Work queues manager.
163 */
164 public synchronized void dequeue(final WorkQueueFrontier frontier) {
165 try {
166 deleteItem(frontier, peekItem);
167 } catch (IOException e) {
168
169 e.printStackTrace();
170 throw new RuntimeException(e);
171 }
172 pendingExpenditure -= peekItem.getHolderCost();
173 unpeek();
174 count--;
175 lastDequeueTime = System.currentTimeMillis();
176 }
177
178 /***
179 * Set the session 'activity budget balance' to the given value
180 *
181 * @param balance to use
182 */
183 public void setSessionBalance(int balance) {
184 this.sessionBalance = balance;
185 }
186
187 /***
188 * Return current session 'activity budget balance'
189 *
190 * @return session balance
191 */
192 public int getSessionBalance() {
193 return this.sessionBalance;
194 }
195
196 /***
197 * Set the total expenditure level allowable before queue is
198 * considered inherently 'over-budget'.
199 *
200 * @param budget
201 */
202 public void setTotalBudget(long budget) {
203 this.totalBudget = budget;
204 }
205
206 /***
207 * Retrieve the total expenditure level allowed by this queue.
208 *
209 * @return the queues total budget
210 */
211 public long getTotalBudget() {
212 return this.totalBudget;
213 }
214
215 /***
216 * Check whether queue has temporarily or permanently exceeded
217 * its budget.
218 *
219 * @return true if queue is over its set budget(s)
220 */
221 public boolean isOverBudget() {
222
223
224 return this.sessionBalance <= 0
225 || (this.totalBudget >= 0 && this.totalExpenditure >= this.totalBudget);
226 }
227
228 /***
229 * Return the tally of all expenditures from this queue (dequeued
230 * items)
231 *
232 * @return total amount expended on this queue
233 */
234 public long getTotalExpenditure() {
235 return totalExpenditure;
236 }
237
238 /***
239 * Return the tally of all URI costs currently inside this queue
240 *
241 * @return total amount expended on this queue
242 */
243 public long getPendingExpenditure() {
244 return pendingExpenditure;
245 }
246
247 /***
248 * Increase the internal running budget to be used before
249 * deactivating the queue
250 *
251 * @param amount amount to increment
252 * @return updated budget value
253 */
254 public int incrementSessionBalance(int amount) {
255 this.sessionBalance = this.sessionBalance + amount;
256 return this.sessionBalance;
257 }
258
259 /***
260 * Decrease the internal running budget by the given amount.
261 * @param amount tp decrement
262 * @return updated budget value
263 */
264 public int expend(int amount) {
265 this.sessionBalance = this.sessionBalance - amount;
266 this.totalExpenditure = this.totalExpenditure + amount;
267 this.lastCost = amount;
268 this.costCount++;
269 return this.sessionBalance;
270 }
271
272 /***
273 * A URI should not have been charged against queue (eg
274 * it was disregarded); return the amount expended
275 * @param amount to return
276 * @return updated budget value
277 */
278 public int refund(int amount) {
279 this.sessionBalance = this.sessionBalance + amount;
280 this.totalExpenditure = this.totalExpenditure - amount;
281 this.costCount--;
282 return this.sessionBalance;
283 }
284
285 /***
286 * Note an error and assess an extra penalty.
287 * @param penalty additional amount to deduct
288 */
289 public void noteError(int penalty) {
290 this.sessionBalance = this.sessionBalance - penalty;
291 this.totalExpenditure = this.totalExpenditure + penalty;
292 errorCount++;
293 }
294
295 /***
296 * @param l
297 */
298 public void setWakeTime(long l) {
299 wakeTime = l;
300 }
301
302 /***
303 * @return wakeTime
304 */
305 public long getWakeTime() {
306 return wakeTime;
307 }
308
309 /***
310 * @return classKey, the 'identifier', for this queue.
311 */
312 public String getClassKey() {
313 return this.classKey;
314 }
315
316 /***
317 * Clear isHeld to false
318 */
319 public void clearHeld() {
320 isHeld = false;
321 }
322
323 /***
324 * Whether the queue is already in a lifecycle stage --
325 * such as ready, in-progress, snoozed -- and thus should
326 * not be redundantly inserted to readyClassQueues
327 *
328 * @return isHeld
329 */
330 public boolean isHeld() {
331 return isHeld;
332 }
333
334 /***
335 * Set isHeld to true
336 */
337 public void setHeld() {
338 isHeld = true;
339 }
340
341 /***
342 * Forgive the peek, allowing a subsequent peek to
343 * return a different item.
344 *
345 */
346 public void unpeek() {
347 peekItem = null;
348 }
349
350 public final int compareTo(Object obj) {
351 if(this == obj) {
352 return 0;
353 }
354 WorkQueue other = (WorkQueue) obj;
355 if(getWakeTime() > other.getWakeTime()) {
356 return 1;
357 }
358 if(getWakeTime() < other.getWakeTime()) {
359 return -1;
360 }
361
362
363 return this.classKey.compareTo(other.getClassKey());
364 }
365
366 /***
367 * Update the given CrawlURI, which should already be present. (This
368 * is not checked.) Equivalent to an enqueue without affecting the count.
369 *
370 * @param frontier Work queues manager.
371 * @param curi CrawlURI to update.
372 */
373 public void update(final WorkQueueFrontier frontier, CrawlURI curi) {
374 try {
375 insert(frontier, curi, true);
376 } catch (IOException e) {
377
378 e.printStackTrace();
379 throw new RuntimeException(e);
380 }
381 }
382
383 /***
384 * @return Returns the count.
385 */
386 public synchronized long getCount() {
387 return this.count;
388 }
389
390 /***
391 * Insert the given curi, whether it is already present or not.
392 * @param frontier WorkQueueFrontier.
393 * @param curi CrawlURI to insert.
394 * @throws IOException
395 */
396 private void insert(final WorkQueueFrontier frontier, CrawlURI curi,
397 boolean overwriteIfPresent)
398 throws IOException {
399 insertItem(frontier, curi, overwriteIfPresent);
400 lastQueued = curi.toString();
401 }
402
403 /***
404 * Insert the given curi, whether it is already present or not.
405 * Hook for subclasses.
406 *
407 * @param frontier WorkQueueFrontier.
408 * @param curi CrawlURI to insert.
409 * @throws IOException if there was a problem while inserting the item
410 */
411 protected abstract void insertItem(final WorkQueueFrontier frontier,
412 CrawlURI curi, boolean expectedPresent) throws IOException;
413
414 /***
415 * Delete URIs matching the given pattern from this queue.
416 * @param frontier WorkQueues manager.
417 * @param match the pattern to match
418 * @return count of deleted URIs
419 * @throws IOException if there was a problem while deleting
420 */
421 protected abstract long deleteMatchingFromQueue(
422 final WorkQueueFrontier frontier, final String match)
423 throws IOException;
424
425 /***
426 * Removes the given item from the queue.
427 *
428 * This is only used to remove the first item in the queue,
429 * so it is not necessary to implement a random-access queue.
430 *
431 * @param frontier Work queues manager.
432 * @throws IOException if there was a problem while deleting the item
433 */
434 protected abstract void deleteItem(final WorkQueueFrontier frontier,
435 final CrawlURI item) throws IOException;
436
437 /***
438 * Returns first item from queue (does not delete)
439 *
440 * @return The peeked item, or null
441 * @throws IOException if there was a problem while peeking
442 */
443 protected abstract CrawlURI peekItem(final WorkQueueFrontier frontier)
444 throws IOException;
445
446 /***
447 * Suspends this WorkQueue. Closes all connections to resources etc.
448 *
449 * @param frontier
450 * @throws IOException
451 */
452 protected void suspend(final WorkQueueFrontier frontier) throws IOException {
453 }
454
455 /***
456 * Resumes this WorkQueue. Eventually opens connections to resources etc.
457 *
458 * @param frontier
459 * @throws IOException
460 */
461 protected void resume(final WorkQueueFrontier frontier) throws IOException {
462 }
463
464 public void setActive(final WorkQueueFrontier frontier, final boolean b) {
465 if(active != b) {
466 active = b;
467 try {
468 if(active) {
469 resume(frontier);
470 } else {
471 suspend(frontier);
472 }
473 } catch (IOException e) {
474
475 e.printStackTrace();
476 throw new RuntimeException(e);
477 }
478 }
479 }
480
481
482
483
484
485
486
487
488 public String[] getReports() {
489 return new String[] {};
490 }
491
492
493
494
495 public void reportTo(PrintWriter writer) {
496 reportTo(null,writer);
497 }
498
499
500
501
502 public void singleLineReportTo(PrintWriter writer) {
503
504 writer.print(classKey);
505 writer.print(" ");
506
507 writer.print(Long.toString(count));
508 writer.print(" ");
509
510 writer.print(Long.toString(enqueueCount));
511 writer.print(" ");
512 writer.print(sessionBalance);
513 writer.print(" ");
514 writer.print(lastCost);
515 writer.print("(");
516 writer.print(ArchiveUtils.doubleToString(
517 ((double) totalExpenditure / costCount), 1));
518 writer.print(")");
519 writer.print(" ");
520
521 if (lastDequeueTime != 0) {
522 writer.print(ArchiveUtils.getLog17Date(lastDequeueTime));
523 } else {
524 writer.print("-");
525 }
526 writer.print(" ");
527
528 if (wakeTime != 0) {
529 writer.print(ArchiveUtils.formatMillisecondsToConventional(wakeTime - System.currentTimeMillis()));
530 } else {
531 writer.print("-");
532 }
533 writer.print(" ");
534 writer.print(Long.toString(totalExpenditure));
535 writer.print("/");
536 writer.print(Long.toString(totalBudget));
537 writer.print(" ");
538 writer.print(Long.toString(errorCount));
539 writer.print(" ");
540 writer.print(lastPeeked);
541 writer.print(" ");
542 writer.print(lastQueued);
543 writer.print("\n");
544 }
545
546
547
548
549 public String singleLineLegend() {
550 return "queue currentSize totalEnqueues sessionBalance lastCost " +
551 "(averageCost) lastDequeueTime wakeTime " +
552 "totalSpend/totalBudget errorCount lastPeekUri lastQueuedUri";
553 }
554
555
556
557
558 public String singleLineReport() {
559 return ArchiveUtils.singleLineReport(this);
560 }
561
562 /***
563 * @param writer
564 * @throws IOException
565 */
566 public void reportTo(String name, PrintWriter writer) {
567
568 writer.print("Queue ");
569 writer.print(classKey);
570 writer.print("\n");
571 writer.print(" ");
572 writer.print(Long.toString(count));
573 writer.print(" items");
574 if (wakeTime != 0) {
575 writer.print("\n wakes in: "+ArchiveUtils.formatMillisecondsToConventional(wakeTime - System.currentTimeMillis()));
576 }
577 writer.print("\n last enqueued: ");
578 writer.print(lastQueued);
579 writer.print("\n last peeked: ");
580 writer.print(lastPeeked);
581 writer.print("\n");
582 writer.print(" total expended: ");
583 writer.print(Long.toString(totalExpenditure));
584 writer.print(" (total budget: ");
585 writer.print(Long.toString(totalBudget));
586 writer.print(")\n");
587 writer.print(" active balance: ");
588 writer.print(sessionBalance);
589 writer.print("\n last(avg) cost: ");
590 writer.print(lastCost);
591 writer.print("(");
592 writer.print(ArchiveUtils.doubleToString(
593 ((double) totalExpenditure / costCount), 1));
594 writer.print(")\n\n");
595 }
596
597 public CrawlSubstats getSubstats() {
598 return substats;
599 }
600
601 /***
602 * Set the retired status of this queue.
603 *
604 * @param b new value for retired status
605 */
606 public void setRetired(boolean b) {
607 this.retired = b;
608 }
609
610 public boolean isRetired() {
611 return retired;
612 }
613
614 public UURI getContextUURI(WorkQueueFrontier wqf) {
615 if(lastPeeked!=null) {
616 try {
617 return UURIFactory.getInstance(lastPeeked);
618 } catch (URIException e) {
619
620 }
621 }
622 if(lastQueued!=null) {
623 try {
624 return UURIFactory.getInstance(lastQueued);
625 } catch (URIException e) {
626
627 }
628 }
629 if(peekItem!=null) {
630 return peekItem.getUURI();
631 }
632
633 UURI contextUri = peek(wqf).getUURI();
634 unpeek();
635 return contextUri;
636 }
637 }