1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package org.archive.crawler.postprocessor;
27
28 import java.io.IOException;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.logging.Level;
32 import java.util.logging.Logger;
33 import java.util.regex.Matcher;
34 import java.util.regex.Pattern;
35
36 import org.archive.crawler.datamodel.CrawlURI;
37 import org.archive.crawler.framework.Processor;
38 import org.archive.crawler.settings.SimpleType;
39 import org.archive.crawler.settings.Type;
40 import org.archive.util.IoUtils;
41
42 /***
43 * Processor module which uses 'df -k', where available and with
44 * the expected output format (on Linux), to monitor available
45 * disk space and pause the crawl if free space on monitored
46 * filesystems falls below certain thresholds.
47 */
48 public class LowDiskPauseProcessor extends Processor {
49
50 private static final long serialVersionUID = 3338337700768396302L;
51
52 /***
53 * Logger.
54 */
55 private static final Logger logger =
56 Logger.getLogger(LowDiskPauseProcessor.class.getName());
57
58 /***
59 * List of mounts to monitor; should match "Mounted on" column of 'df' output
60 */
61 public static final String ATTR_MONITOR_MOUNTS = "monitor-mounts";
62 public static final String DEFAULT_MONITOR_MOUNTS = "";
63
64 /***
65 * Space available level below which a crawl-pause should be triggered.
66 */
67 public static final String ATTR_PAUSE_THRESHOLD = "pause-threshold-kb";
68 public static final int DEFAULT_PAUSE_THRESHOLD = 500 * 1024;
69
70 /***
71 * Amount of content received between each recheck of free space
72 */
73 public static final String ATTR_RECHECK_THRESHOLD = "recheck-threshold-kb";
74 public static final int DEFAULT_RECHECK_THRESHOLD = 200 * 1024;
75
76 protected int contentSinceCheck = 0;
77
78 public static final Pattern VALID_DF_OUTPUT =
79 Pattern.compile("(?s)^Filesystem//s+1K-blocks//s+Used//s+Available//s+Use%//s+Mounted on//n.*");
80 public static final Pattern AVAILABLE_EXTRACTOR =
81 Pattern.compile("(?m)//s(//d+)//s+//d+%//s+(//S+)$");
82
83 /***
84 * @param name Name of this writer.
85 */
86 public LowDiskPauseProcessor(String name) {
87 super(name, "LowDiskPause processor");
88 Type e = addElementToDefinition(
89 new SimpleType(ATTR_MONITOR_MOUNTS,
90 "Space-delimited list of filessystem mounts whose " +
91 "'available' space should be monitored via 'df' " +
92 "(if available).",
93 DEFAULT_MONITOR_MOUNTS));
94 e.setOverrideable(false);
95 e = addElementToDefinition(
96 new SimpleType(ATTR_PAUSE_THRESHOLD,
97 "When available space on any monitored mounts falls " +
98 "below this threshold, the crawl will be paused. ",
99 new Integer(DEFAULT_PAUSE_THRESHOLD)));
100 e = addElementToDefinition(
101 new SimpleType(ATTR_RECHECK_THRESHOLD,
102 "Available space via 'df' is rechecked after every " +
103 "increment of this much content (uncompressed) is " +
104 "observed. ",
105 new Integer(DEFAULT_RECHECK_THRESHOLD)));
106 e.setOverrideable(false);
107 }
108
109 /***
110 * Notes a CrawlURI's content size in its running tally. If the
111 * recheck increment of content has passed through since the last
112 * available-space check, checks available space and pauses the
113 * crawl if any monitored mounts are below the configured threshold.
114 *
115 * @param curi CrawlURI to process.
116 */
117 protected void innerProcess(CrawlURI curi) {
118 contentSinceCheck += curi.getContentSize();
119 synchronized (this) {
120 if (contentSinceCheck/1024 > ((Integer) getUncheckedAttribute(null,
121 ATTR_RECHECK_THRESHOLD)).intValue()) {
122 checkAvailableSpace(curi);
123 contentSinceCheck = 0;
124 }
125 }
126 }
127
128
129 /***
130 * Probe via 'df' to see if monitored mounts have fallen
131 * below the pause available threshold. If so, request a
132 * crawl pause.
133 * @param curi Current context.
134 */
135 private void checkAvailableSpace(CrawlURI curi) {
136 try {
137 String df = IoUtils.readFullyAsString(Runtime.getRuntime().exec(
138 "df -k").getInputStream());
139 Matcher matcher = VALID_DF_OUTPUT.matcher(df);
140 if(!matcher.matches()) {
141 logger.severe("'df -k' output unacceptable for low-disk checking");
142 return;
143 }
144 List monitoredMounts = Arrays.asList(((String) getUncheckedAttribute(null,
145 ATTR_MONITOR_MOUNTS)).split("//s*"));
146 matcher = AVAILABLE_EXTRACTOR.matcher(df);
147 while (matcher.find()) {
148 String mount = matcher.group(2);
149 if (monitoredMounts.contains(mount)) {
150 long availKilobytes = Long.parseLong(matcher.group(1));
151 int thresholdKilobytes = ((Integer) getUncheckedAttribute(
152 null, ATTR_PAUSE_THRESHOLD)).intValue();
153 if (availKilobytes < thresholdKilobytes ) {
154 getController().requestCrawlPause();
155 logger.log(Level.SEVERE, "Low Disk Pause",
156 availKilobytes + "K available on " + mount
157 + " (below threshold "
158 + thresholdKilobytes + "K)");
159 break;
160 }
161 }
162 }
163 } catch (IOException e) {
164 curi.addLocalizedError(this.getName(), e,
165 "problem checking available space via 'df'");
166 }
167 }
168 }