1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.crawler.util;
26
27 import java.io.File;
28 import java.io.IOException;
29 import java.io.ObjectOutputStream;
30 import java.io.Serializable;
31 import java.util.logging.Level;
32 import java.util.logging.Logger;
33
34 import st.ata.util.FPGenerator;
35
36 import com.sleepycat.bind.tuple.LongBinding;
37 import com.sleepycat.je.Database;
38 import com.sleepycat.je.DatabaseConfig;
39 import com.sleepycat.je.DatabaseEntry;
40 import com.sleepycat.je.DatabaseException;
41 import com.sleepycat.je.DatabaseNotFoundException;
42 import com.sleepycat.je.Environment;
43 import com.sleepycat.je.EnvironmentConfig;
44 import com.sleepycat.je.OperationStatus;
45
46
47 /***
48 * A BDB implementation of an AlreadySeen list.
49 *
50 * This implementation performs adequately without blowing out
51 * the heap. See
52 * <a href="http://crawler.archive.org/cgi-bin/wiki.pl?AlreadySeen">AlreadySeen</a>.
53 *
54 * <p>Makes keys that have URIs from same server close to each other. Mercator
55 * and 2.3.5 'Elminating Already-Visited URLs' in 'Mining the Web' by Soumen
56 * Chakrabarti talk of a two-level key with the first 24 bits a hash of the
57 * host plus port and with the last 40 as a hash of the path. Testing
58 * showed adoption of such a scheme halving lookup times (This implementation
59 * actually concatenates scheme + host in first 24 bits and path + query in
60 * trailing 40 bits).
61 *
62 * @author stack
63 * @version $Date: 2007-02-21 10:18:39 +0000 (Wed, 21 Feb 2007) $, $Revision: 4927 $
64 */
65 public class BdbUriUniqFilter
66 extends SetBasedUriUniqFilter implements Serializable {
67 private static final long serialVersionUID = -8099357538178524011L;
68
69 private static Logger logger =
70 Logger.getLogger(BdbUriUniqFilter.class.getName());
71
72 protected boolean createdEnvironment = false;
73 protected long lastCacheMiss = 0;
74 protected long lastCacheMissDiff = 0;
75 protected transient Database alreadySeen = null;
76 static protected DatabaseEntry ZERO_LENGTH_ENTRY =
77 new DatabaseEntry(new byte[0]);
78 private static final String DB_NAME = "alreadySeenUrl";
79 protected long count = 0;
80 private long aggregatedLookupTime = 0;
81
82 private static final String COLON_SLASH_SLASH = "://";
83
84 /***
85 * Shutdown default constructor.
86 */
87 protected BdbUriUniqFilter() {
88 super();
89 }
90
91 /***
92 * Constructor.
93 * @param environment A bdb environment ready-configured.
94 * @throws IOException
95 */
96 public BdbUriUniqFilter(Environment environment)
97 throws IOException {
98 super();
99 try {
100 initialize(environment);
101 } catch (DatabaseException e) {
102 throw new IOException(e.getMessage());
103 }
104 }
105
106 /***
107 * Constructor.
108 * @param bdbEnv The directory that holds the bdb environment. Will
109 * make a database under here if doesn't already exit. Otherwise
110 * reopens any existing dbs.
111 * @throws IOException
112 */
113 public BdbUriUniqFilter(File bdbEnv)
114 throws IOException {
115 this(bdbEnv, -1);
116 }
117
118 /***
119 * Constructor.
120 * @param bdbEnv The directory that holds the bdb environment. Will
121 * make a database under here if doesn't already exit. Otherwise
122 * reopens any existing dbs.
123 * @param cacheSizePercentage Percentage of JVM bdb allocates as
124 * its cache. Pass -1 to get default cache size.
125 * @throws IOException
126 */
127 public BdbUriUniqFilter(File bdbEnv, final int cacheSizePercentage)
128 throws IOException {
129 super();
130 if (!bdbEnv.exists()) {
131 bdbEnv.mkdirs();
132 }
133 EnvironmentConfig envConfig = new EnvironmentConfig();
134 envConfig.setAllowCreate(true);
135 if (cacheSizePercentage > 0 && cacheSizePercentage < 100) {
136 envConfig.setCachePercent(cacheSizePercentage);
137 }
138 try {
139 createdEnvironment = true;
140 initialize(new Environment(bdbEnv, envConfig));
141 } catch (DatabaseException e) {
142 throw new IOException(e.getMessage());
143 }
144 }
145
146 /***
147 * Method shared by constructors.
148 * @param env Environment to use.
149 * @throws DatabaseException
150 */
151 protected void initialize(Environment env) throws DatabaseException {
152 DatabaseConfig dbConfig = getDatabaseConfig();
153 dbConfig.setAllowCreate(true);
154 try {
155 env.truncateDatabase(null, DB_NAME, false);
156 } catch (DatabaseNotFoundException e) {
157
158 }
159 open(env, dbConfig);
160 }
161
162 /***
163 * @return DatabaseConfig to use
164 */
165 protected DatabaseConfig getDatabaseConfig() {
166 DatabaseConfig dbConfig = new DatabaseConfig();
167 dbConfig.setDeferredWrite(true);
168 return dbConfig;
169 }
170
171 /***
172 * Call after deserializing an instance of this class. Will open the
173 * already seen in passed environment.
174 * @param env DB Environment to use.
175 * @throws DatabaseException
176 */
177 public void reopen(final Environment env)
178 throws DatabaseException {
179 DatabaseConfig dbConfig = getDatabaseConfig();
180 open(env, dbConfig);
181 }
182
183 protected void open(final Environment env, final DatabaseConfig dbConfig)
184 throws DatabaseException {
185 this.alreadySeen = env.openDatabase(null, DB_NAME, dbConfig);
186 }
187
188 public synchronized void close() {
189 Environment env = null;
190 if (this.alreadySeen != null) {
191 try {
192 env = this.alreadySeen.getEnvironment();
193 if (logger.isLoggable(Level.INFO)) {
194 logger.info("Count of alreadyseen on close " +
195 Long.toString(count));
196 }
197 this.alreadySeen.sync();
198 this.alreadySeen.close();
199 } catch (DatabaseException e) {
200 logger.severe(e.getMessage());
201 }
202 this.alreadySeen = null;
203 }
204 if (env != null && createdEnvironment) {
205 try {
206
207
208 env.sync();
209 env.close();
210 } catch (DatabaseException e) {
211 logger.severe(e.getMessage());
212 }
213 }
214 }
215
216 public synchronized long getCacheMisses() throws DatabaseException {
217 long cacheMiss = this.alreadySeen.getEnvironment().
218 getStats(null).getNCacheMiss();
219 this.lastCacheMissDiff = cacheMiss - this.lastCacheMiss;
220 this.lastCacheMiss = cacheMiss;
221 return this.lastCacheMiss;
222 }
223
224 public long getLastCacheMissDiff() {
225 return this.lastCacheMissDiff;
226 }
227
228 /***
229 * Create fingerprint.
230 * Pubic access so test code can access createKey.
231 * @param uri URI to fingerprint.
232 * @return Fingerprint of passed <code>url</code>.
233 */
234 public static long createKey(CharSequence uri) {
235 String url = uri.toString();
236 int index = url.indexOf(COLON_SLASH_SLASH);
237 if (index > 0) {
238 index = url.indexOf('/', index + COLON_SLASH_SLASH.length());
239 }
240 CharSequence hostPlusScheme = (index == -1)? url: url.subSequence(0, index);
241 long tmp = FPGenerator.std24.fp(hostPlusScheme);
242 return tmp | (FPGenerator.std40.fp(url) >>> 24);
243 }
244
245
246
247 protected boolean setAdd(CharSequence uri) {
248 DatabaseEntry key = new DatabaseEntry();
249 LongBinding.longToEntry(createKey(uri), key);
250 long started = 0;
251
252 OperationStatus status = null;
253 try {
254 if (logger.isLoggable(Level.INFO)) {
255 started = System.currentTimeMillis();
256 }
257 status = alreadySeen.putNoOverwrite(null, key, ZERO_LENGTH_ENTRY);
258 if (logger.isLoggable(Level.INFO)) {
259 aggregatedLookupTime +=
260 (System.currentTimeMillis() - started);
261 }
262 } catch (DatabaseException e) {
263 logger.severe(e.getMessage());
264 }
265 if (status == OperationStatus.SUCCESS) {
266 count++;
267 if (logger.isLoggable(Level.INFO)) {
268 final int logAt = 10000;
269 if (count > 0 && ((count % logAt) == 0)) {
270 logger.info("Average lookup " +
271 (aggregatedLookupTime / logAt) + "ms.");
272 aggregatedLookupTime = 0;
273 }
274 }
275 }
276 if(status == OperationStatus.KEYEXIST) {
277 return false;
278 } else {
279 return true;
280 }
281 }
282
283 protected long setCount() {
284 return count;
285 }
286
287 protected boolean setRemove(CharSequence uri) {
288 DatabaseEntry key = new DatabaseEntry();
289 LongBinding.longToEntry(createKey(uri), key);
290 OperationStatus status = null;
291 try {
292 status = alreadySeen.delete(null, key);
293 } catch (DatabaseException e) {
294 logger.severe(e.getMessage());
295 }
296 if (status == OperationStatus.SUCCESS) {
297 count--;
298 return true;
299 } else {
300 return false;
301 }
302 }
303
304 public long flush() {
305
306
307 return 0;
308 }
309
310 private void writeObject(ObjectOutputStream oos) throws IOException {
311
312 try {
313 alreadySeen.sync();
314 } catch (DatabaseException e) {
315
316 throw new RuntimeException(e);
317 }
318 oos.defaultWriteObject();
319 }
320 }