View Javadoc

1   package com.atlassian.bonnie;
2   
3   import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
4   import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
5   import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
6   import org.apache.log4j.Logger;
7   import org.apache.lucene.analysis.Analyzer;
8   import org.apache.lucene.index.IndexReader;
9   import org.apache.lucene.index.IndexWriter;
10  import org.apache.lucene.search.DelayCloseIndexSearcher;
11  import org.apache.lucene.search.IndexSearcher;
12  import org.apache.lucene.store.Directory;
13  import org.apache.lucene.store.FSDirectory;
14  import org.apache.lucene.store.RAMDirectory;
15  
16  import java.io.File;
17  import java.io.IOException;
18  
19  /**
20   * ILuceneConnection implementation that allows concurrent searching/reading and writing/deleting. Concurrent writes and
21   * deletes block each other. <p/> This class optimizes use of Lucene reader instances by holding a common IndexReader
22   * that is shared by idempotent operations on an unmodified index. Any mutative operations cause the current IndexReader
23   * to be cleared, and subsequent reads will see the results of the previous index change. <p/> TODO We might need to
24   * keep track of currently open readers and block {@link #recreateIndexDirectory()} until they are all closed. We would
25   * also need to prevent any new Readers being created during this time. The reason for this is that Windoze can whinge
26   * about open file crap and refuse to delete files rather than listen to its l33t haxor ruler.
27   */
28  public class ConcurrentLuceneConnection implements ILuceneConnection
29  {
30      private static Logger log = Logger.getLogger(ConcurrentLuceneConnection.class);
31  
32      private transient Directory directory;
33  
34      private final Analyzer analyzerForIndexing;
35  
36      private final Configuration configuration;
37  
38      private final Lock indexWriteLock = new ReentrantLock();
39  
40      private final Lock searcherCreationLock = new ReentrantLock();
41  
42      /**
43       * Used to indicate that we are currently in batchMode and need to use the batch configuration when using an
44       * IndexWriter
45       */
46      private final AtomicBoolean batchMode = new AtomicBoolean(false);
47  
48      // private final Configuration
49  
50      /**
51       * Used to search the index
52       */
53      private DelayCloseIndexSearcher currentSearcher;
54  
55      // --------------------------------------
56      // ctors
57      // --------------------------------------
58  
59      public ConcurrentLuceneConnection(Directory directory, Analyzer analyzer, Configuration configuration)
60      {
61          this.directory = directory;
62          this.analyzerForIndexing = analyzer;
63          this.configuration = configuration;
64      }
65  
66      public ConcurrentLuceneConnection(File path, Analyzer analyzer, Configuration configuration)
67      {
68          this(getDirectory(path), analyzer, configuration);
69      }
70  
71      public int getNumDocs()
72      {
73          return ((Integer) withReader(new ReaderAction()
74          {
75              public Object perform(IndexReader reader)
76              {
77                  return new Integer(reader.numDocs());
78              }
79          })).intValue();
80      }
81  
82      public boolean isIndexCreated()
83      {
84          try
85          {
86              return IndexReader.indexExists(directory);
87          }
88          catch (IOException e)
89          {
90              throw new LuceneException(e);
91          }
92      }
93  
94      public IndexSearcher leakSearcher()
95      {
96          try
97          {
98              return getOpenedSearcher();
99          }
100         catch (Throwable e)
101         {
102             flipCurrentSearcher();
103             // Throw the correct error or exception
104             throwLuceneException(e);
105             // Never happens as the above method throws something all the time
106             throw new IllegalStateException("Exception should have been thrown.");
107         }
108     }
109 
110     /**
111      * Blocks and waits until all write operations to the index complete. Optimizes the index while holding the write
112      * lock to ensure no concurrent modifications. Optimize is done using interactive mode configuration.
113      */
114     public void optimize() throws LuceneException
115     {
116         withWriter(new WriterAction()
117         {
118             public void perform(IndexWriter writer) throws IOException
119             {
120                 writer.optimize();
121             }
122         });
123     }
124 
125     /**
126      * Blocks and waits until all write operations to the index complete. Recreates the index while holding the write
127      * lock to ensure no concurrent modifications.
128      */
129     public void recreateIndexDirectory()
130     {
131         // We are about to update (nuke) the index. So get the lock before
132         // touching the index.
133         indexWriteLock.lock();
134         try
135         {
136             // Pass 'true' for 'create' to the IndexWriter so that the index is
137             // either:
138             // - created if it does not exists
139             // - or blown away and empty one recreated if index existed already
140             /*
141              * According to http://issues.apache.org/jira/browse/LUCENE-140 it is not sufficient to create an IndexWriter with the create flag, you need to
142              * recreate the directory as well
143              */
144             directory.close();
145             if (directory instanceof FSDirectory)
146             {
147                 directory = FSDirectory.getDirectory(((FSDirectory) directory).getFile());
148             }
149             else if (directory instanceof RAMDirectory)
150             {
151                 directory = new RAMDirectory();
152             }
153             new IndexWriter(directory, null, true).close();
154         }
155         catch (IOException e)
156         {
157             throw new LuceneException("Cannot create index directory.", e);
158         }
159         finally
160         {
161             try
162             {
163                 flipCurrentSearcher();
164             }
165             finally
166             {
167                 // Release the lock no matter what happened.
168                 indexWriteLock.unlock();
169             }
170         }
171     }
172 
173     public void close()
174     {
175         flipCurrentSearcher();
176     }
177 
178     /**
179      * This implementation does not respect the boolean return of the of the
180      * {@link SearcherAction#perform(org.apache.lucene.search.IndexSearcher)} method
181      */
182     public void withSearch(SearcherAction action) throws LuceneException
183     {
184         IndexSearcher searcher;
185         try
186         {
187             searcher = getOpenedSearcher();
188             try
189             {
190                 boolean b = action.perform(searcher);
191                 if (!b)
192                 {
193                     throw new UnsupportedOperationException(
194                         "Searchers are always closed. The searcherAction should always return true, we do not allow them to control closing of the searchers");
195                 }
196             }
197             finally
198             {
199                 // If we are here then the open() call has succeeded. Call the
200                 // close() method to ensure
201                 // that the usage count of the searcher is decremented, so that
202                 // the searcher can be closed when needed
203                 closeSearcher(searcher);
204             }
205         }
206         catch (Throwable e)
207         {
208             flipCurrentSearcher();
209             // Throw the correct error or exception
210             throwLuceneException(e);
211         }
212     }
213 
214     public Object withReader(ReaderAction action) throws LuceneException
215     {
216         // We do not need to close the reader as it should remain open and get
217         // resused until it is specifically flipped.
218         try
219         {
220             IndexSearcher searcher = getOpenedSearcher();
221             try
222             {
223                 return action.perform(searcher.getIndexReader());
224             }
225             finally
226             {
227                 // If we are here then the open() call has succeeded. Call
228                 // the close() method to ensure that the usage count of the
229                 // searcher is decremented, so that the searcher can be
230                 // closed when needed
231                 closeSearcher(searcher);
232             }
233         }
234         catch (Throwable e)
235         {
236             // On error close the current searcher to ensure no one else uses
237             // it. If the searcher got closed in the mean time we do not
238             // really care - all this means is that the searcher will be closed
239             // (and potentially created) one more time.
240             // As we should not be here very often the inefficiency is bearable
241             flipCurrentSearcher();
242             // Throw the correct error or exception
243             throwLuceneException(e);
244             // fool the stupid compiler
245             return null;
246         }
247     }
248 
249     /**
250      * Blocks and waits until all write operations to the index complete. Executes the ReaderAction while holding the
251      * write lock to ensure no concurrent modifications.
252      */
253     public void withReaderAndDeletes(ReaderAction action) throws LuceneException
254     {
255         // We are about to update (remove documents from) the index. So grab the
256         // lock before modifying the index
257         indexWriteLock.lock();
258         IndexReader deleter = null;
259         try
260         {
261             deleter = constructIndexDeleter();
262             action.perform(deleter);
263             // Only close the current searcher if the index update succeeded
264             flipCurrentSearcher();
265         }
266         catch (IOException e)
267         {
268             throw new LuceneException(e);
269         }
270         finally
271         {
272             try
273             {
274                 closeReader(deleter);
275             }
276             finally
277             {
278                 indexWriteLock.unlock();
279             }
280         }
281     }
282 
283     /**
284      * Blocks and waits until all write operations to the index complete. Executes the WriterAction while holding the
285      * write lock to ensure no concurrent modifications.
286      */
287     public void withWriter(WriterAction action) throws LuceneException
288     {
289         // The index is about to be updated so get the lock
290         indexWriteLock.lock();
291 
292         IndexWriter writer = null;
293         try
294         {
295             writer = constructIndexWriter();
296             action.perform(writer);
297             // TODO Should this be moved to the finally block?
298             // Only close the current searcher if the index update succeeded
299             flipCurrentSearcher();
300         }
301         catch (IOException e)
302         {
303             throw new LuceneException(e);
304         }
305         finally
306         {
307             try
308             {
309                 closeWriter(writer);
310             }
311             finally
312             {
313                 // Release the lock, no matter what.
314                 indexWriteLock.unlock();
315             }
316         }
317     }
318 
319     public void withDeleteAndWrites(ReaderAction readerAction, WriterAction writerAction) throws LuceneException
320     {
321         indexWriteLock.lock();
322         try
323         {
324             withReaderAndDeletes(readerAction);
325             withWriter(writerAction);
326         }
327         finally
328         {
329             indexWriteLock.unlock();
330         }
331     }
332 
333     /**
334      * Blocks and waits until all write operations to the index complete. Executes the BatchUpdateAction while holding
335      * the write lock to ensure no concurrent modifications. <p/> Note: This method holds the writeLock for the whole
336      * operation, so is used to ensure a set of deletes and writes are effectively executed atomically.
337      */
338     public void withBatchUpdate(BatchUpdateAction action)
339     {
340         // The index is about to be modified, so grab the lock.
341         indexWriteLock.lock();
342         try
343         {
344             batchMode.set(true);
345             action.perform();
346         }
347         catch (Exception e)
348         {
349             throwLuceneException(e);
350         }
351         finally
352         {
353             batchMode.set(false);
354             // Release the lock no matter what happened.
355             indexWriteLock.unlock();
356         }
357     }
358 
359     /**
360      * Closes the searcher that is currently in use.
361      */
362     public void flipCurrentSearcher()
363     {
364         if (log.isDebugEnabled())
365         {
366             log.debug("Closing current searcher..");
367         }
368 
369         searcherCreationLock.lock();
370         try
371         {
372             if (this.currentSearcher != null)
373             {
374                 try
375                 {
376                     this.currentSearcher.closeWhenDone();
377                 }
378                 catch (Exception e)
379                 {
380                     log.error("Error occured attempting to close Index searcher", e);
381                 }
382                 finally
383                 {
384                     this.currentSearcher = null;
385                 }
386             }
387         }
388         finally
389         {
390             searcherCreationLock.unlock();
391         }
392     }
393 
394     /**
395      * Lazily instantiates a {@link DelayCloseIndexSearcher} and calls its
396      * {@link org.apache.lucene.search.DelayCloseIndexSearcher#open()} method to increment its usage count. Ensure that
397      * the searcher's {@link org.apache.lucene.search.DelayCloseIndexSearcher#close()} method is called after it has
398      * been used.
399      */
400     private DelayCloseIndexSearcher getOpenedSearcher() throws IOException
401     {
402         searcherCreationLock.lock();
403         try
404         {
405             if (this.currentSearcher == null)
406             {
407                 this.currentSearcher = new DelayCloseIndexSearcher(directory);
408             }
409 
410             // As we are about to use the searcher, call open() method to ensure
411             // that the usage count of the
412             // searcher is incremented, so that the searcher is not closed until
413             // we are finished with it
414             this.currentSearcher.open();
415 
416             return this.currentSearcher;
417         }
418         finally
419         {
420             searcherCreationLock.unlock();
421         }
422     }
423 
424     /**
425      * This method always returns a new instance of a {@link IndexReader} which uses the {@link #directory}.
426      * 
427      * @throws LuceneException
428      *             if an exception is thrown while constructing the reader.
429      */
430     private IndexReader constructIndexDeleter()
431     {
432         try
433         {
434             return IndexReader.open(directory);
435         }
436         catch (IOException e)
437         {
438             throw new LuceneException(e);
439         }
440     }
441 
442     /**
443      * Closes the provided reader and logs any thrown exceptions without rethrowing them.
444      * 
445      * @param reader
446      *            reader to close.
447      */
448     private void closeReader(IndexReader reader)
449     {
450         try
451         {
452             if (reader != null)
453             {
454                 if (log.isDebugEnabled())
455                 {
456                     log.debug(Thread.currentThread().getName() + "## closing reader");
457                 }
458 
459                 reader.close();
460             }
461         }
462         catch (IOException e)
463         {
464             log.error("Error closing reader. " + e, e);
465         }
466     }
467 
468     /**
469      * Close a searcher and don't throw IOExceptions
470      */
471     private void closeSearcher(IndexSearcher searcher)
472     {
473         try
474         {
475             if (searcher != null)
476             {
477                 searcher.close();
478             }
479         }
480         catch (IOException e)
481         {
482             log.error("Error occurred while closing searcher.", e);
483         }
484     }
485 
486     /**
487      * Closes the provided writer and logs any thrown exceptions without rethrowing them.
488      * 
489      * @param writer
490      *            writer to close.
491      */
492     private void closeWriter(IndexWriter writer)
493     {
494         try
495         {
496             if (writer != null)
497             {
498                 if (log.isDebugEnabled())
499                 {
500                     log.debug("## closing writer");
501                 }
502 
503                 writer.close();
504             }
505             else
506             {
507                 log.warn("## trying to close null writer.");
508             }
509         }
510         catch (IOException e)
511         {
512             log.error("Error closing writer. " + e, e);
513         }
514     }
515 
516     /**
517      * This method always returns a new instance of a {@link IndexWriter} which uses the
518      * {@link #directory). The constructed writer expects the index to already exist.
519      * 
520      * @throws LuceneException
521      *             if an exception is thrown while constructing the writer.
522      */
523     private IndexWriter constructIndexWriter() throws LuceneException
524     {
525         try
526         {
527             if (log.isDebugEnabled())
528             {
529                 log.debug(Thread.currentThread().getName() + "## opening writer");
530             }
531             final IndexWriter indexWriter = new IndexWriter(directory, analyzerForIndexing, false);
532             if (batchMode.get())
533             {
534                 // using batchMode config
535                 indexWriter.setMaxBufferedDocs(configuration.getBatchMaxBufferedDocs());
536                 indexWriter.setMaxMergeDocs(configuration.getBatchMaxMergeDocs());
537                 indexWriter.setMergeFactor(configuration.getBatchMergeFactor());
538             }
539             else
540             {
541                 // using interactiveMode config
542                 indexWriter.setMaxBufferedDocs(configuration.getInteractiveMaxBufferedDocs());
543                 indexWriter.setMaxMergeDocs(configuration.getInteractiveMaxMergeDocs());
544                 indexWriter.setMergeFactor(configuration.getInteractiveMergeFactor());
545             }
546             indexWriter.setMaxFieldLength(configuration.getMaxFieldLength());
547             return indexWriter;
548         }
549         catch (IOException e)
550         {
551             throw new LuceneException(e);
552         }
553     }
554 
555     private void throwLuceneException(Throwable e)
556     {
557         if (e instanceof Error)
558         {
559             throw (Error) e;
560         }
561 
562         if (e instanceof RuntimeException)
563         {
564             throw (RuntimeException) e;
565         }
566 
567         throw new LuceneException(e);
568     }
569 
570     /**
571      * get a Directory from a path and don't throw a whingy bloody IOException
572      */
573     private static Directory getDirectory(File path)
574     {
575         try
576         {
577             return FSDirectory.getDirectory(path);
578         }
579         catch (IOException e)
580         {
581             throw new LuceneException(e);
582         }
583     }
584 }