View Javadoc

1   package org.archive.crawler.frontier;
2   
3   import java.io.IOException;
4   import java.io.PrintWriter;
5   import java.io.Serializable;
6   import java.util.logging.Level;
7   import java.util.logging.Logger;
8   
9   import org.apache.commons.httpclient.URIException;
10  import org.archive.crawler.datamodel.CrawlSubstats;
11  import org.archive.crawler.datamodel.CrawlURI;
12  import org.archive.crawler.framework.Frontier;
13  import org.archive.net.UURI;
14  import org.archive.net.UURIFactory;
15  import org.archive.util.ArchiveUtils;
16  import org.archive.util.Reporter;
17  
18  /***
19   * A single queue of related URIs to visit, grouped by a classKey
20   * (typically "hostname:port" or similar) 
21   * 
22   * @author gojomo
23   * @author Christian Kohlschuetter 
24   */
25  public abstract class WorkQueue implements Frontier.FrontierGroup, Comparable,
26          Serializable, Reporter {
27      static final long serialVersionUID = -1939168792663316048L;
28      
29      private static final Logger logger =
30          Logger.getLogger(WorkQueue.class.getName());
31      
32      /*** The classKey */
33      protected final String classKey;
34  
35      private boolean active = true;
36  
37      /*** Total number of stored items */
38      private long count = 0;
39  
40      /*** Total number of items ever enqueued */
41      private long enqueueCount = 0;
42      
43      /*** Whether queue is already in lifecycle stage */
44      private boolean isHeld = false;
45  
46      /*** Time to wake, if snoozed */
47      private long wakeTime = 0;
48  
49      /*** Running 'budget' indicating whether queue should stay active */
50      private int sessionBalance = 0;
51  
52      /*** Cost of the last item to be charged against queue */
53      private int lastCost = 0;
54  
55      /*** Total number of items charged against queue; with totalExpenditure
56       * can be used to calculate 'average cost'. */
57      private long costCount = 0;
58  
59      /*** Running tally of total expenditures out of this queue;
60       * tallied only as a URI is finished or retried */
61      private long totalExpenditure = 0;
62  
63      /*** Net cost of all currently-enqueued URIs. */
64      private long pendingExpenditure = 0;
65      
66      /*** Total to spend on this queue over its lifetime */
67      private long totalBudget = 0;
68  
69      /*** The next item to be returned */
70      private CrawlURI peekItem = null;
71  
72      /*** Last URI enqueued */
73      private String lastQueued;
74  
75      /*** Last URI peeked */
76      private String lastPeeked;
77  
78      /*** time of last dequeue (disposition of some URI) **/ 
79      private long lastDequeueTime;
80      
81      /*** count of errors encountered */
82      private long errorCount = 0;
83      
84      /*** Substats for all CrawlURIs in this group */
85      protected CrawlSubstats substats = new CrawlSubstats();
86  
87      private boolean retired;
88      
89      public WorkQueue(final String pClassKey) {
90          this.classKey = pClassKey;
91      }
92  
93      /***
94       * Delete URIs matching the given pattern from this queue. 
95       * @param frontier
96       * @param match
97       * @return count of deleted URIs
98       */
99      public long deleteMatching(final WorkQueueFrontier frontier, String match) {
100         try {
101             final long deleteCount = deleteMatchingFromQueue(frontier, match);
102             this.count -= deleteCount;
103             return deleteCount;
104         } catch (IOException e) {
105             //FIXME better exception handling
106             e.printStackTrace();
107             throw new RuntimeException(e);
108         }
109     }
110 
111     /***
112      * Add the given CrawlURI, noting its addition in running count. (It
113      * should not already be present.)
114      * 
115      * @param frontier Work queues manager.
116      * @param curi CrawlURI to insert.
117      */
118     public synchronized void enqueue(final WorkQueueFrontier frontier,
119         CrawlURI curi) {
120         try {
121             insert(frontier, curi, false);
122         } catch (IOException e) {
123             //FIXME better exception handling
124             e.printStackTrace();
125             throw new RuntimeException(e);
126         }
127         count++;
128         enqueueCount++;
129         pendingExpenditure += curi.getHolderCost();
130     }
131 
132     /***
133      * Return the topmost queue item -- and remember it,
134      * such that even later higher-priority inserts don't
135      * change it. 
136      * 
137      * TODO: evaluate if this is really necessary
138      * @param frontier Work queues manager
139      * 
140      * @return topmost queue item, or null
141      */
142     public CrawlURI peek(final WorkQueueFrontier frontier) {
143         if(peekItem == null && count > 0) {
144             try {
145                 peekItem = peekItem(frontier);
146             } catch (IOException e) {
147                 //FIXME better exception handling
148                 logger.log(Level.SEVERE,"peek failure",e);
149                 e.printStackTrace();
150                 // throw new RuntimeException(e);
151             }
152             if(peekItem != null) {
153                 lastPeeked = peekItem.toString();
154             }
155         }
156         return peekItem;
157     }
158 
159     /***
160      * Remove the peekItem from the queue and adjusts the count.
161      * 
162      * @param frontier  Work queues manager.
163      */
164     public synchronized void dequeue(final WorkQueueFrontier frontier) {
165         try {
166             deleteItem(frontier, peekItem);
167         } catch (IOException e) {
168             //FIXME better exception handling
169             e.printStackTrace();
170             throw new RuntimeException(e);
171         }
172         pendingExpenditure -= peekItem.getHolderCost();
173         unpeek();
174         count--;
175         lastDequeueTime = System.currentTimeMillis();
176     }
177 
178     /***
179      * Set the session 'activity budget balance' to the given value
180      * 
181      * @param balance to use
182      */
183     public void setSessionBalance(int balance) {
184         this.sessionBalance = balance;
185     }
186 
187     /***
188      * Return current session 'activity budget balance' 
189      * 
190      * @return session balance
191      */
192     public int getSessionBalance() {
193         return this.sessionBalance;
194     }
195 
196     /***
197      * Set the total expenditure level allowable before queue is 
198      * considered inherently 'over-budget'. 
199      * 
200      * @param budget
201      */
202     public void setTotalBudget(long budget) {
203         this.totalBudget = budget;
204     }
205 
206     /***
207      * Retrieve the total expenditure level allowed by this queue.
208      * 
209      * @return the queues total budget
210      */
211     public long getTotalBudget() {
212         return this.totalBudget;
213     }
214     
215     /***
216      * Check whether queue has temporarily or permanently exceeded
217      * its budget. 
218      * 
219      * @return true if queue is over its set budget(s)
220      */
221     public boolean isOverBudget() {
222         // check whether running balance is depleted 
223         // or totalExpenditure exceeds totalBudget
224         return this.sessionBalance <= 0
225             || (this.totalBudget >= 0 && this.totalExpenditure >= this.totalBudget);
226     }
227 
228     /***
229      * Return the tally of all expenditures from this queue (dequeued 
230      * items)
231      * 
232      * @return total amount expended on this queue
233      */
234     public long getTotalExpenditure() {
235         return totalExpenditure;
236     }
237     
238     /***
239      * Return the tally of all URI costs currently inside this queue
240      * 
241      * @return total amount expended on this queue
242      */
243     public long getPendingExpenditure() {
244         return pendingExpenditure;
245     }
246 
247     /***
248      * Increase the internal running budget to be used before 
249      * deactivating the queue
250      * 
251      * @param amount amount to increment
252      * @return updated budget value
253      */
254     public int incrementSessionBalance(int amount) {
255         this.sessionBalance = this.sessionBalance + amount;
256         return this.sessionBalance;
257     }
258 
259     /***
260      * Decrease the internal running budget by the given amount. 
261      * @param amount tp decrement
262      * @return updated budget value
263      */
264     public int expend(int amount) {
265         this.sessionBalance = this.sessionBalance - amount;
266         this.totalExpenditure = this.totalExpenditure + amount;
267         this.lastCost = amount;
268         this.costCount++;
269         return this.sessionBalance;
270     }
271 
272     /***
273      * A URI should not have been charged against queue (eg
274      * it was disregarded); return the amount expended 
275      * @param amount to return
276      * @return updated budget value
277      */
278     public int refund(int amount) {
279         this.sessionBalance = this.sessionBalance + amount;
280         this.totalExpenditure = this.totalExpenditure - amount;
281         this.costCount--;
282         return this.sessionBalance;
283     }
284     
285     /***
286      * Note an error and assess an extra penalty. 
287      * @param penalty additional amount to deduct
288      */
289     public void noteError(int penalty) {
290         this.sessionBalance = this.sessionBalance - penalty;
291         this.totalExpenditure = this.totalExpenditure + penalty;
292         errorCount++;
293     }
294     
295     /***
296      * @param l
297      */
298     public void setWakeTime(long l) {
299         wakeTime = l;
300     }
301 
302     /***
303      * @return wakeTime
304      */
305     public long getWakeTime() {
306         return wakeTime;
307     }
308 
309     /***
310      * @return classKey, the 'identifier', for this queue.
311      */
312     public String getClassKey() {
313         return this.classKey;
314     }
315 
316     /***
317      * Clear isHeld to false
318      */
319     public void clearHeld() {
320         isHeld = false;
321     }
322 
323     /***
324      * Whether the queue is already in a lifecycle stage --
325      * such as ready, in-progress, snoozed -- and thus should
326      * not be redundantly inserted to readyClassQueues
327      * 
328      * @return isHeld
329      */
330     public boolean isHeld() {
331         return isHeld;
332     }
333 
334     /***
335      * Set isHeld to true
336      */
337     public void setHeld() {
338         isHeld = true;
339     }
340 
341     /***
342      * Forgive the peek, allowing a subsequent peek to 
343      * return a different item. 
344      * 
345      */
346     public void unpeek() {
347         peekItem = null;
348     }
349 
350     public final int compareTo(Object obj) {
351         if(this == obj) {
352             return 0; // for exact identity only
353         }
354         WorkQueue other = (WorkQueue) obj;
355         if(getWakeTime() > other.getWakeTime()) {
356             return 1;
357         }
358         if(getWakeTime() < other.getWakeTime()) {
359             return -1;
360         }
361         // at this point, the ordering is arbitrary, but still
362         // must be consistent/stable over time
363         return this.classKey.compareTo(other.getClassKey());
364     }
365 
366     /***
367      * Update the given CrawlURI, which should already be present. (This
368      * is not checked.) Equivalent to an enqueue without affecting the count.
369      * 
370      * @param frontier Work queues manager.
371      * @param curi CrawlURI to update.
372      */
373     public void update(final WorkQueueFrontier frontier, CrawlURI curi) {
374         try {
375             insert(frontier, curi, true);
376         } catch (IOException e) {
377             //FIXME better exception handling
378             e.printStackTrace();
379             throw new RuntimeException(e);
380         }
381     }
382 
383     /***
384      * @return Returns the count.
385      */
386     public synchronized long getCount() {
387         return this.count;
388     }
389 
390     /***
391      * Insert the given curi, whether it is already present or not. 
392      * @param frontier WorkQueueFrontier.
393      * @param curi CrawlURI to insert.
394      * @throws IOException
395      */
396     private void insert(final WorkQueueFrontier frontier, CrawlURI curi, 
397             boolean overwriteIfPresent)
398         throws IOException {
399         insertItem(frontier, curi, overwriteIfPresent);
400         lastQueued = curi.toString();
401     }
402 
403     /***
404      * Insert the given curi, whether it is already present or not.
405      * Hook for subclasses. 
406      * 
407      * @param frontier WorkQueueFrontier.
408      * @param curi CrawlURI to insert.
409      * @throws IOException  if there was a problem while inserting the item
410      */
411     protected abstract void insertItem(final WorkQueueFrontier frontier,
412         CrawlURI curi, boolean expectedPresent) throws IOException;
413 
414     /***
415      * Delete URIs matching the given pattern from this queue. 
416      * @param frontier WorkQueues manager.
417      * @param match  the pattern to match
418      * @return count of deleted URIs
419      * @throws IOException  if there was a problem while deleting
420      */
421     protected abstract long deleteMatchingFromQueue(
422         final WorkQueueFrontier frontier, final String match)
423         throws IOException;
424 
425     /***
426      * Removes the given item from the queue.
427      * 
428      * This is only used to remove the first item in the queue,
429      * so it is not necessary to implement a random-access queue.
430      * 
431      * @param frontier  Work queues manager.
432      * @throws IOException  if there was a problem while deleting the item
433      */
434     protected abstract void deleteItem(final WorkQueueFrontier frontier,
435         final CrawlURI item) throws IOException;
436 
437     /***
438      * Returns first item from queue (does not delete)
439      * 
440      * @return The peeked item, or null
441      * @throws IOException  if there was a problem while peeking
442      */
443     protected abstract CrawlURI peekItem(final WorkQueueFrontier frontier)
444         throws IOException;
445 
446     /***
447      * Suspends this WorkQueue. Closes all connections to resources etc.
448      * 
449      * @param frontier
450      * @throws IOException
451      */
452     protected void suspend(final WorkQueueFrontier frontier) throws IOException {
453     }
454 
455     /***
456      * Resumes this WorkQueue. Eventually opens connections to resources etc.
457      * 
458      * @param frontier
459      * @throws IOException
460      */
461     protected void resume(final WorkQueueFrontier frontier) throws IOException {
462     }
463 
464     public void setActive(final WorkQueueFrontier frontier, final boolean b) {
465         if(active != b) {
466             active = b;
467             try {
468                 if(active) {
469                     resume(frontier);
470                 } else {
471                     suspend(frontier);
472                 }
473             } catch (IOException e) {
474                 //FIXME better exception handling
475                 e.printStackTrace();
476                 throw new RuntimeException(e);
477             }
478         }
479     }
480     
481     // 
482     // Reporter
483     //
484 
485     /* (non-Javadoc)
486      * @see org.archive.util.Reporter#getReports()
487      */
488     public String[] getReports() {
489         return new String[] {};
490     }
491 
492     /* (non-Javadoc)
493      * @see org.archive.util.Reporter#reportTo(java.io.Writer)
494      */
495     public void reportTo(PrintWriter writer) {
496         reportTo(null,writer);
497     }
498 
499     /* (non-Javadoc)
500      * @see org.archive.util.Reporter#singleLineReportTo(java.io.Writer)
501      */
502     public void singleLineReportTo(PrintWriter writer) {
503         // queue name
504         writer.print(classKey);
505         writer.print(" ");
506         // count of items
507         writer.print(Long.toString(count));
508         writer.print(" ");
509         // enqueue count
510         writer.print(Long.toString(enqueueCount));
511         writer.print(" ");
512         writer.print(sessionBalance);
513         writer.print(" ");
514         writer.print(lastCost);
515         writer.print("(");
516         writer.print(ArchiveUtils.doubleToString(
517                     ((double) totalExpenditure / costCount), 1));
518         writer.print(")");
519         writer.print(" ");
520         // last dequeue time, if any, or '-'
521         if (lastDequeueTime != 0) {
522             writer.print(ArchiveUtils.getLog17Date(lastDequeueTime));
523         } else {
524             writer.print("-");
525         }
526         writer.print(" ");
527         // wake time if snoozed, or '-'
528         if (wakeTime != 0) {
529             writer.print(ArchiveUtils.formatMillisecondsToConventional(wakeTime - System.currentTimeMillis()));
530         } else {
531             writer.print("-");
532         }
533         writer.print(" ");
534         writer.print(Long.toString(totalExpenditure));
535         writer.print("/");
536         writer.print(Long.toString(totalBudget));
537         writer.print(" ");
538         writer.print(Long.toString(errorCount));
539         writer.print(" ");
540         writer.print(lastPeeked);
541         writer.print(" ");
542         writer.print(lastQueued);
543         writer.print("\n");
544     }
545 
546     /* (non-Javadoc)
547      * @see org.archive.util.Reporter#singleLineLegend()
548      */
549     public String singleLineLegend() {
550         return "queue currentSize totalEnqueues sessionBalance lastCost " +
551                 "(averageCost) lastDequeueTime wakeTime " +
552                 "totalSpend/totalBudget errorCount lastPeekUri lastQueuedUri";
553     }
554     
555     /* (non-Javadoc)
556      * @see org.archive.util.Reporter#singleLineReport()
557      */
558     public String singleLineReport() {
559         return ArchiveUtils.singleLineReport(this);
560     }
561     
562     /***
563      * @param writer
564      * @throws IOException
565      */
566     public void reportTo(String name, PrintWriter writer) {
567         // name is ignored: only one kind of report for now
568         writer.print("Queue ");
569         writer.print(classKey);
570         writer.print("\n");
571         writer.print("  ");
572         writer.print(Long.toString(count));
573         writer.print(" items");
574         if (wakeTime != 0) {
575             writer.print("\n   wakes in: "+ArchiveUtils.formatMillisecondsToConventional(wakeTime - System.currentTimeMillis()));
576         }
577         writer.print("\n    last enqueued: ");
578         writer.print(lastQueued);
579         writer.print("\n      last peeked: ");
580         writer.print(lastPeeked);
581         writer.print("\n");
582         writer.print("   total expended: ");
583         writer.print(Long.toString(totalExpenditure));
584         writer.print(" (total budget: ");
585         writer.print(Long.toString(totalBudget));
586         writer.print(")\n");
587         writer.print("   active balance: ");
588         writer.print(sessionBalance);
589         writer.print("\n   last(avg) cost: ");
590         writer.print(lastCost);
591         writer.print("(");
592         writer.print(ArchiveUtils.doubleToString(
593                     ((double) totalExpenditure / costCount), 1));
594         writer.print(")\n\n");
595     }
596     
597     public CrawlSubstats getSubstats() {
598         return substats;
599     }
600 
601     /***
602      * Set the retired status of this queue.
603      * 
604      * @param b new value for retired status
605      */
606     public void setRetired(boolean b) {
607         this.retired = b;
608     }
609     
610     public boolean isRetired() {
611         return retired;
612     }
613 
614     public UURI getContextUURI(WorkQueueFrontier wqf) {
615         if(lastPeeked!=null) {
616             try {
617                 return UURIFactory.getInstance(lastPeeked);
618             } catch (URIException e) {
619                 // just move along to next try
620             }
621         }
622         if(lastQueued!=null) {
623             try {
624                 return UURIFactory.getInstance(lastQueued);
625             } catch (URIException e) {
626                 // just move along to next try
627             }
628         }
629         if(peekItem!=null) {
630             return peekItem.getUURI();
631         }
632         // peek a CrawlURI temporarily just for context 
633         UURI contextUri = peek(wqf).getUURI(); 
634         unpeek(); // but don't insist on that URI being next released
635         return contextUri;
636     }
637 }