View Javadoc

1   /*
2    *  This file is part of the Heritrix web crawler (crawler.archive.org).
3    *
4    *  Licensed to the Internet Archive (IA) by one or more individual 
5    *  contributors. 
6    *
7    *  The IA licenses this file to You under the Apache License, Version 2.0
8    *  (the "License"); you may not use this file except in compliance with
9    *  the License.  You may obtain a copy of the License at
10   *
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   *
13   *  Unless required by applicable law or agreed to in writing, software
14   *  distributed under the License is distributed on an "AS IS" BASIS,
15   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   *  See the License for the specific language governing permissions and
17   *  limitations under the License.
18   */
19  
20  package org.archive.util;
21  
22  import java.io.Closeable;
23  import java.io.File;
24  import java.io.Serializable;
25  import java.lang.ref.PhantomReference;
26  import java.lang.ref.Reference;
27  import java.lang.ref.ReferenceQueue;
28  import java.lang.ref.SoftReference;
29  import java.lang.reflect.Field;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.atomic.AtomicLong;
33  import java.util.logging.Level;
34  import java.util.logging.Logger;
35  
36  import com.sleepycat.bind.EntryBinding;
37  import com.sleepycat.bind.serial.SerialBinding;
38  import com.sleepycat.bind.serial.StoredClassCatalog;
39  import com.sleepycat.bind.tuple.TupleBinding;
40  import com.sleepycat.collections.StoredSortedMap;
41  import com.sleepycat.je.Database;
42  import com.sleepycat.je.DatabaseConfig;
43  import com.sleepycat.je.DatabaseException;
44  import com.sleepycat.je.Environment;
45  
46  /***
47   * A BDB JE backed object cache. 
48   * 
49   * Soft references to previously-instantiated objects are held so that
50   * unless/until an object is garbage collected, subsequent get()s will
51   * return the exact same object. (If all outside references are lost,
52   * when the soft reference is broken, the object state -- still 
53   * accessible to this class via reflective access to a phantom 
54   * referent --is flushed to disk. The next get() will reconsitute a new
55   * object, from the disk state.)
56   * <p/>
57   * The backing disk is only guaranteed to be up-to-date after a flush 
58   * of all in-memory values to disk, as can be forced by sync().
59   * <p/>
60   * To ensure that changes/mutations to values in this map are coherent and
61   * consistent at the application level, it is assumed that the application
62   * level only mutates values that are in this map and does not retain references
63   * to values longer than necessary.  This allows mappings to be persisted
64   * during GC without explicit transactions or write operations.
65   * <p/>
66   * Based on the earlier CachedBdbMap. 
67   * <p/>
68   * 
69   * @author John Erik Halse
70   * @author stack
71   * @author gojomo
72   * @author paul baclace (conversion to ConcurrentMap)
73   *  
74   */
75  public class ObjectIdentityBdbCache<V> 
76  implements ObjectIdentityCache<String, V>, Closeable, Serializable {
77      private static final long serialVersionUID = 1L;
78      private static final Logger logger =
79          Logger.getLogger(ObjectIdentityBdbCache.class.getName());
80  
81      /*** The BDB JE database used for this instance. */
82      protected transient Database db;
83  
84      /*** in-memory map of new/recent/still-referenced-elsewhere instances */
85      protected transient ConcurrentHashMap<String,SoftEntry<V>> memMap;
86      protected transient ReferenceQueue<V> refQueue;
87  
88      /*** The Collection view of the BDB JE database used for this instance. */
89      protected transient StoredSortedMap<String, V> diskMap;
90  
91      protected AtomicLong count;
92      
93      //
94      // USAGE STATS
95      //
96      /*** Count of times we got an object from in-memory cache */
97      private AtomicLong cacheHit = new AtomicLong(0);
98      /*** Count of times the {@link ObjectIdentityBdbCache#get} method was called. */
99      private AtomicLong countOfGets = new AtomicLong(0);
100     /*** Count of every time disk-based map provided non-null object */ 
101     private AtomicLong diskHit = new AtomicLong(0);
102     /*** Count of times Supplier was used for new object */
103     private AtomicLong supplierUsed = new AtomicLong(0);
104     /*** count of expunge put() to BDB (implies disk) */
105     private AtomicLong expungeStatsDiskPut = new AtomicLong(0);
106     /*** count of {@link #sync()} use */
107     private AtomicLong useStatsSyncUsed = new AtomicLong(0);
108     
109     /*** Reference to the Reference#referent Field. */
110     protected static Field referentField;
111     static {
112         // We need access to the referent field in the PhantomReference.
113         // For more on this trick, see
114         //
115         // http://www.javaspecialists.co.za/archive/Issue098.html and for
116         // discussion:
117         // http://www.theserverside.com/tss?service=direct/0/NewsThread/threadViewer.markNoisy.link&sp=l29865&sp=l146901
118         try {
119             referentField = Reference.class.getDeclaredField("referent");
120             referentField.setAccessible(true);
121         } catch (SecurityException e) {
122             throw new RuntimeException(e);
123         } catch (NoSuchFieldException e) {
124             throw new RuntimeException(e);
125         }
126     }
127 
128     /***
129      * Constructor. You must call
130      * {@link #initialize(Environment, Class, Class, StoredClassCatalog)}
131      * to finish construction. Construction is two-stepped to support
132      * reconnecting a deserialized CachedBdbMap with its backing bdbje
133      * database.
134      * 
135      * @param dbName Name of the backing db this instance should use.
136      */
137     public ObjectIdentityBdbCache() {
138         super();
139     }
140     
141     /***
142      * Call this method when you have an instance when you used the
143      * default constructor or when you have a deserialized instance that you
144      * want to reconnect with an extant bdbje environment.  Do not
145      * call this method if you used the
146      * {@link #CachedBdbMap(File, String, Class, Class)} constructor.
147      * @param env
148      * @param keyClass
149      * @param valueClass
150      * @param classCatalog
151      * @throws DatabaseException
152      */
153     public void initialize(final Environment env, String dbName,
154             final Class valueClass, final StoredClassCatalog classCatalog)
155     throws DatabaseException {
156         // TODO: initial capacity should be related to number of seeds, max depth, max docs
157         this.memMap = new ConcurrentHashMap<String,SoftEntry<V>>(
158                                                             8192, // initial capacity
159                                                             0.9f, // acceptable load factor
160                                                             64 // est. number of concurrent threads
161                                                             ); 
162         this.refQueue = new ReferenceQueue<V>();
163         canary = new SoftReference<LowMemoryCanary>(new LowMemoryCanary());
164         
165         this.db = openDatabase(env, dbName);
166         this.diskMap = createDiskMap(this.db, classCatalog, valueClass);
167         this.count = new AtomicLong(diskMap.size());
168     }
169 
170     @SuppressWarnings("unchecked")
171     protected StoredSortedMap<String, V> createDiskMap(Database database,
172             StoredClassCatalog classCatalog, Class valueClass) {
173         EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(String.class);
174         EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
175         if(valueBinding == null) {
176             valueBinding = new SerialBinding(classCatalog, valueClass);
177         }
178         return new StoredSortedMap<String,V>(database, keyBinding, valueBinding, true);
179     }
180 
181     protected Database openDatabase(final Environment environment,
182             final String dbName) throws DatabaseException {
183         DatabaseConfig dbConfig = new DatabaseConfig();
184         dbConfig.setTransactional(false);
185         dbConfig.setAllowCreate(true);
186         dbConfig.setDeferredWrite(true);
187         return environment.openDatabase(null, dbName, dbConfig);
188     }
189 
190     /* (non-Javadoc)
191      * @see org.archive.util.ObjectIdentityCache#close()
192      */
193     public synchronized void close() {
194         // Close out my bdb db.
195         if (this.db != null) {
196             try {
197                 sync(); 
198                 this.db.sync();
199                 this.db.close();
200             } catch (DatabaseException e) {
201                 logger.log(Level.WARNING,"problem closing ObjectIdentityBdbCache",e);
202             } finally {
203                 this.db = null;
204             }
205         }
206     }
207 
208     protected void finalize() throws Throwable {
209         close();
210         super.finalize();
211     }
212 
213     /* (non-Javadoc)
214      * @see org.archive.util.ObjectIdentityCache#get(java.lang.String)
215      */
216     public V get(final String key) {
217         return getOrUse(key,null); 
218     }
219     
220     /* (non-Javadoc)
221      * @see org.archive.util.ObjectIdentityCache#get(java.lang.String, org.archive.util.ObjectIdentityBdbCache)
222      */
223     public V getOrUse(final String key, Supplier<V> supplierOrNull) {
224         countOfGets.incrementAndGet();
225         
226         if (countOfGets.get() % 10000 == 0) {
227             logCacheSummary();
228         }
229         
230         // check mem cache
231         SoftEntry<V> entry = memMap.get(key);
232         if(entry != null) {
233             V val = entry.get();
234             if(val != null) {
235                 // the concurrent garden path: in mem, valid
236                 cacheHit.incrementAndGet();
237                 return val;
238             } 
239         }
240         
241         // everything in other difficult cases happens inside this block
242         synchronized(this) {
243             // recheck mem cache -- if another thread beat us into sync 
244             // block and already filled the key 
245             entry = memMap.get(key);
246             if(entry != null) {
247                 V val = entry.get();
248                 if(val != null) {
249                     cacheHit.incrementAndGet();
250                     return val;
251                 } 
252             }
253             // persist to disk all ref-enqueued stale (soft-ref-cleared) entries now
254             pageOutStaleEntries();
255             // and catch if this exact entry not yet ref-enqueued 
256             if(memMap.get(key)!=null) {
257                 pageOutStaleEntry(entry);
258                 if(memMap.get(key)!=null) {
259                     logger.log(Level.SEVERE,"nulled key "+key+" not paged-out", new Exception());
260                 }
261             }
262             
263             // check disk 
264             V valDisk = (V) diskMap.get(key); 
265             if(valDisk==null) {
266                 // never yet created, consider creating
267                 if(supplierOrNull==null) {
268                     return null;
269                 }
270                 // create using provided Supplier
271                 valDisk = supplierOrNull.get();
272                 supplierUsed.incrementAndGet();
273                 // putting initial value directly into diskMap
274                 // (rather than just the memMap until page-out)
275                 // ensures diskMap.keySet() provides complete view
276                 V prevVal = diskMap.putIfAbsent(key, valDisk); 
277                 count.incrementAndGet();
278                 if(prevVal!=null) {
279                     // ERROR: diskMap modification since previous
280                     // diskMap.get() should be impossible
281                     logger.log(Level.SEVERE,"diskMap modified outside synchronized block?");
282                 }
283             } else {
284                 diskHit.incrementAndGet();
285             }
286 
287             // keep new val in memMap
288             SoftEntry<V> newEntry = new SoftEntry<V>(key, valDisk, refQueue);
289             SoftEntry<V> prevVal = memMap.putIfAbsent(key, newEntry); 
290             if(prevVal != null) {
291                 // ERROR: memMap modification since previous 
292                 // memMap.get() should be impossible
293                 logger.log(Level.SEVERE,"memMap modified outside synchronized block?", new Exception());
294             }
295             return valDisk; 
296         }
297     }
298 
299     /* (non-Javadoc)
300      * @see org.archive.util.ObjectIdentityCache#keySet()
301      */
302     public Set<String> keySet() {
303         return diskMap.keySet();
304     }
305     
306     /***
307      * Info to log, if at FINE level
308      */
309     private void logCacheSummary() {
310         if (logger.isLoggable((Level.FINE))) {
311             logger.fine(composeCacheSummary());
312         }
313     }
314     
315     private String composeCacheSummary() {
316         long totalHits = cacheHit.get() + diskHit.get();
317         if (totalHits < 1) {
318             return "";
319         }
320         long cacheHitPercent 
321                 = (cacheHit.get() * 100) / totalHits;
322         StringBuilder sb = new StringBuilder(120);
323         sb.append("DB name:")
324           .append(getDatabaseName())
325           .append(", ")
326           .append(" hit%: ")
327           .append(cacheHitPercent)
328           .append("%, gets=")
329           .append(countOfGets.get())
330           .append(" memHits=")
331           .append(cacheHit.get())
332           .append(" diskHits=")
333           .append(diskHit.get())
334           .append(" supplieds=")
335           .append(supplierUsed.get())
336           .append(" expungePuts=")
337           .append(expungeStatsDiskPut.get())
338           .append(" syncs=")
339           .append(useStatsSyncUsed.get());
340         return sb.toString();
341     }
342 
343     /* (non-Javadoc)
344      * @see org.archive.util.ObjectIdentityCache#size()
345      */
346     public int size() {
347         if(db==null) {
348             return 0; 
349         }
350         return (int) count.get();
351     }
352     
353     protected String getDatabaseName() {
354         String name = "DbName-Lookup-Failed";
355         try {
356             if (this.db != null) {
357                 name = this.db.getDatabaseName();
358             }
359         } catch (DatabaseException e) {
360             // Ignore.
361         }
362         return name;
363     }
364     
365     /***
366      * Sync all in-memory map entries to backing disk store.
367      */
368     @SuppressWarnings("unchecked")
369     public synchronized void sync() {
370         String dbName = null;
371         // Sync. memory and disk.
372         useStatsSyncUsed.incrementAndGet();
373         long startTime = 0;
374         if (logger.isLoggable(Level.INFO)) {
375             dbName = getDatabaseName();
376             startTime = System.currentTimeMillis();
377             logger.info(dbName + " start sizes: disk " + this.diskMap.size() +
378                 ", mem " + this.memMap.size());
379         }
380         
381         for (String key : this.memMap.keySet()) {
382             SoftEntry<V> entry = memMap.get(key);
383             if (entry != null) {
384                 // Get & hold so not cleared pre-return.
385                 V value = entry.get();
386                 if (value != null) {
387                     expungeStatsDiskPut.incrementAndGet();
388                     this.diskMap.put(key, value); // unchecked cast
389                 } 
390             }
391         }
392         pageOutStaleEntries();
393         
394         // force sync of deferred-writes
395         try {
396             this.db.sync();
397         } catch (DatabaseException e) {
398             throw new RuntimeException(e);
399         }
400         
401         if (logger.isLoggable(Level.INFO)) {
402             logger.info(dbName + " sync took " +
403                 (System.currentTimeMillis() - startTime) + "ms. " +
404                 "Finish sizes: disk " +
405                 this.diskMap.size() + ", mem " + this.memMap.size());
406         }
407     }
408 
409     /*** An incremental, poll-based expunger.
410      * 
411      * Package-protected for unit-test visibility. 
412      */
413     @SuppressWarnings("unchecked")
414     synchronized void pageOutStaleEntries() {
415         int c = 0;
416         long startTime = System.currentTimeMillis();
417         for(SoftEntry<V> entry; (entry = (SoftEntry<V>)refQueue.poll()) != null;) {
418             pageOutStaleEntry(entry);
419             c++;
420         }
421         if (c > 0 && logger.isLoggable(Level.FINER)) {
422             long endTime = System.currentTimeMillis();
423             try {
424                 logger.finer("DB: " + db.getDatabaseName() + ",  Expunged: "
425                         + c + ", Diskmap size: " + diskMap.size()
426                         + ", Cache size: " + memMap.size()
427                         + ", in "+(endTime-startTime)+"ms");
428             } catch (DatabaseException e) {
429                 logger.log(Level.FINER,"exception while logging",e);
430             }
431         }
432     }
433     
434     /*** 
435      * Expunge an entry from memMap while updating diskMap.
436      * 
437      * @param entry a SoftEntry<V> obtained from refQueuePoll()
438      */
439    synchronized private void pageOutStaleEntry(SoftEntry<V> entry) {
440         PhantomEntry<V> phantom = entry.phantom;
441         
442         // Still in memMap? if not, was paged-out by earlier direct access
443         // before placed into reference-queue; just return
444         if (memMap.get(phantom.key) != entry) { // NOTE: intentional identity compare
445             return; 
446         }
447         
448         // recover hidden value
449         V phantomValue = phantom.doctoredGet(); 
450 
451         // Expected value present? (should be; only clear is at end of
452         // this method, after entry removal from memMap)
453         if(phantomValue == null) {
454             logger.log(Level.WARNING,"unexpected null phantomValue", new Exception());
455             return; // nothing to do
456         }
457         
458         // given instance entry still in memMap;
459         // we have the key and phantom Value, 
460         // the diskMap can be updated.
461         diskMap.put(phantom.key, phantomValue); // unchecked cast
462         expungeStatsDiskPut.incrementAndGet();
463         
464         //  remove memMap entry 
465         boolean removed = memMap.remove(phantom.key, entry);
466         if(!removed) {
467             logger.log(Level.WARNING,"expunge memMap.remove() ineffective",new Exception());
468         }
469         phantom.clear(); // truly allows GC of unreferenced V object
470     }
471     
472     private static class PhantomEntry<V> extends PhantomReference<V> {
473         final String key;
474 
475         public PhantomEntry(String key, V referent) {
476             super(referent, null);
477             this.key = key;
478         }
479 
480         /***
481          * @return Return the referent. The contract for {@link #get()}
482          * always returns a null referent.  We've cheated and doctored
483          * PhantomReference to return the actual referent value.  See notes
484          * at {@link #referentField};
485          */
486         @SuppressWarnings("unchecked")
487         final public V doctoredGet() {
488             try {
489                 // Here we use the referentField saved off on static
490                 // initialization of this class to get at this References'
491                 // private referent field.
492                 return (V) referentField.get(this);
493             } catch (IllegalAccessException e) {
494                 throw new RuntimeException(e);
495             }
496         }
497     }
498 
499     /*** 
500      * SoftReference cache entry.
501      * 
502      * A PhantomReference is used to hold the key and value as a last
503      * chance before GC hook that can effect the update of diskMap.
504      * <p/>
505      * Entries are not recycled.
506      */
507     private static class SoftEntry<V> extends SoftReference<V> {
508         PhantomEntry<V> phantom;
509 
510         @SuppressWarnings("unchecked")
511         public SoftEntry(String key, V referent, ReferenceQueue<V> q) {
512             super(referent, q);
513             this.phantom = new PhantomEntry(key, referent); // unchecked cast
514         }
515 
516         public V get() {
517             // ensure visibility 
518             synchronized (this) {
519                 return super.get();
520             }
521         }
522 
523         public String toString() {
524             if (phantom != null) {
525                 return "SoftEntry(key=" + phantom.key + ")";
526             } else {
527                 return "SoftEntry()";
528             }
529         }
530     }
531 
532     //
533     // Crude, probably unreliable/fragile but harmless mechanism to 
534     // trigger expunge of cleared SoftReferences in low-memory 
535     // conditions even without any of the other get/put triggers. 
536     //
537     
538     protected transient SoftReference<LowMemoryCanary> canary;
539     protected class LowMemoryCanary {
540         /*** When collected/finalized -- as should be expected in 
541          *  low-memory conditions -- trigger an expunge and a 
542          *  new 'canary' insertion. */
543         public void finalize() {
544             ObjectIdentityBdbCache.this.pageOutStaleEntries();
545             // only install new canary if map still 'open' with db reference
546             if(ObjectIdentityBdbCache.this.db !=null) {
547                 ObjectIdentityBdbCache.this.canary = 
548                     new SoftReference<LowMemoryCanary>(new LowMemoryCanary());
549             } else {
550                 ObjectIdentityBdbCache.this.canary = null; 
551             }
552         }
553     }
554 }