1   package com.atlassian.bonnie;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.util.concurrent.atomic.AtomicBoolean;
6   import java.util.concurrent.atomic.AtomicReference;
7   import java.util.concurrent.locks.Lock;
8   
9   import org.apache.lucene.analysis.Analyzer;
10  import org.apache.lucene.index.IndexReader;
11  import org.apache.lucene.index.IndexWriter;
12  import org.apache.lucene.search.DelayCloseIndexSearcher;
13  import org.apache.lucene.search.IndexSearcher;
14  import org.apache.lucene.store.Directory;
15  import org.apache.lucene.store.FSDirectory;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  import com.atlassian.bonnie.search.SearcherInitialisation;
20  
21  /**
22   * ILuceneConnection implementation that allows concurrent searching/reading and
23   * writing/deleting. Concurrent writes and deletes block each other.
24   * <p/>
25   * This class optimizes use of Lucene reader instances by holding a common
26   * IndexReader that is shared by idempotent operations on an unmodified index.
27   * Any mutative operations cause the current IndexReader to be cleared, and
28   * subsequent reads will see the results of the previous index change.
29   * <p/>
30   */
31  public class LuceneConnection implements ILuceneConnection
32  {
33      private static final Logger log = LoggerFactory.getLogger(LuceneConnection.class);
34      
35      private static final SearcherInitialisation NOOP_SEARCHER_INITIALISATION = new SearcherInitialisation() {
36          public void initialise(IndexSearcher searcher) {}
37      };
38      
39      private final Analyzer analyzerForIndexing;
40  
41      private final Configuration configuration;
42  
43      private final SearcherInitialisation searcherInitialisation;
44  
45      private final Lock indexWriteLock = new LoggingReentrantLock("indexWriteLock");
46  
47      private final Lock searcherRefreshLock = new LoggingReentrantLock("searcherRefreshLock");
48      
49      private final AtomicBoolean isClosed = new AtomicBoolean(false);
50  
51      /**
52       * Used to indicate that we are currently in batchMode and need to use the batch configuration when using an
53       * IndexWriter
54       */
55      private final AtomicBoolean batchMode = new AtomicBoolean(false);
56  
57      private IndexWriter writer;
58  
59      private volatile DelayCloseIndexSearcher searcher;
60  
61      public LuceneConnection(Directory directory, Analyzer analyzer, Configuration configuration, final SearcherInitialisation searcherInitialisation)
62      {
63          this.analyzerForIndexing = analyzer;
64          this.configuration = configuration;
65          this.searcherInitialisation = searcherInitialisation;
66          
67          ensureIndexExists(directory);
68          try
69          {
70              this.writer = new IndexWriter(directory, analyzerForIndexing, false);
71          }
72          catch (IOException e)
73          {
74              throw new LuceneException(e);
75          }
76  
77          searcher = createSearcher();
78      }
79  
80      public LuceneConnection(Directory directory, Analyzer analyzer, Configuration configuration)
81      {
82          this(directory, analyzer, configuration, NOOP_SEARCHER_INITIALISATION);
83      }
84  
85      public LuceneConnection(Directory directory, Analyzer analyzer)
86      {
87          this(directory, analyzer, DEFAULT_CONFIGURATION);
88      }
89      
90      public LuceneConnection(File path, Analyzer analyzer, Configuration configuration, SearcherInitialisation searcherInitialisation)
91      {
92          this(getDirectory(path), analyzer, configuration, searcherInitialisation);
93      }
94  
95      public LuceneConnection(File path, Analyzer analyzer, Configuration configuration)
96      {
97          this(getDirectory(path), analyzer, configuration);
98      }
99  
100     public LuceneConnection(File path, Analyzer analyzer)
101     {
102         this(path, analyzer, DEFAULT_CONFIGURATION);
103     }
104     
105     public int getNumDocs()
106     {
107         return (Integer) withReader(new ReaderAction() {
108             public Object perform(IndexReader reader)
109             {
110                 return reader.numDocs();
111             }
112         });
113     }
114 
115     /**
116      * Blocks and waits until all write operations to the index complete. Optimizes the index while holding the write
117      * lock to ensure no concurrent modifications. Optimize is done using interactive mode configuration.
118      */
119     public void optimize() throws LuceneException
120     {
121         withWriter(new WriterAction()
122         {
123             public void perform(IndexWriter writer) throws IOException
124             {
125                 writer.optimize();
126             }
127         });
128     }
129 
130     public void close() throws LuceneException
131     {
132         assertNotClosed();
133         try
134         {
135             searcher.closeWhenDone();
136             writer.close();
137             isClosed.set(true);
138         }
139         catch (IOException e)
140         {
141             throw new LuceneException(e);
142         }
143     }
144 
145     private void assertNotClosed() throws LuceneException
146     {
147         if (isClosed.get())
148             throw new LuceneConnectionClosedException("Cannot operate on closed " + getClass().getSimpleName());
149     }
150     
151     /**
152      * This implementation does not respect the boolean return of the of the
153      * {@link ILuceneConnection.SearcherAction#perform(org.apache.lucene.search.IndexSearcher)} method
154      */
155     public void withSearch(SearcherAction action) throws LuceneException
156     {
157         assertNotClosed();
158 
159         // keep a reference to the searcher in case it changes before the end of the search
160         DelayCloseIndexSearcher currentSearcher = searcher;
161         currentSearcher.open();
162         try
163         {
164             action.perform(currentSearcher);
165         }
166         catch (IOException e)
167         {
168             throw new LuceneException(e);
169         }
170         finally
171         {
172             closeSearcher(currentSearcher);
173         }
174     }
175 
176     public Object withReader(final ReaderAction action) throws LuceneException
177     {
178         final AtomicReference<Object> result = new AtomicReference<Object>();
179         withSearch(new SearcherAction()
180         {
181             public void perform(IndexSearcher searcher) throws IOException
182             {
183                 result.set(action.perform(searcher.getIndexReader()));
184             }
185         });
186         return result.get();
187     }
188 
189     /**
190      * Blocks and waits until all write operations to the index complete. Executes the WriterAction while holding the
191      * write lock to ensure no concurrent modifications.
192      */
193     public void withWriter(WriterAction action) throws LuceneException
194     {
195         assertNotClosed();
196         indexWriteLock.lock();
197         try
198         {
199             configureIndexWriter(writer, configuration);
200             try
201             {
202                 action.perform(writer);
203             }
204             catch (IOException e)
205             {
206                 throw new LuceneException(e);
207             }
208             // Only refresh the current searcher if the index update succeeded
209             commitAndRefreshSearcher();
210         }
211         finally
212         {
213             // Release the lock, no matter what.
214             indexWriteLock.unlock();
215         }
216     }
217 
218     /**
219      * Blocks and waits until all write operations to the index complete. Executes the BatchUpdateAction while holding
220      * the write lock to ensure no concurrent modifications. <p/> Note: This method holds the writeLock for the whole
221      * operation, so is used to ensure a set of deletes and writes are effectively executed atomically.
222      * <p/>
223      * Refreshes the searcher only once, at the end of the batch update action.
224      */
225     public void withBatchUpdate(BatchUpdateAction action)
226     {
227         assertNotClosed();
228         indexWriteLock.lock();
229         try
230         {
231             batchMode.set(true);
232             try
233             {
234                 action.perform();
235             }
236             catch (Exception e) // unfortunately, the API requires us to catch Exception here
237             {
238                 throw new LuceneException(e);
239             }
240             finally
241             {
242                 batchMode.set(false);
243             }
244             commitAndRefreshSearcher();
245         }
246         finally
247         {
248             indexWriteLock.unlock();
249         }
250     }
251 
252     /**
253      * Marks the current searcher to be closed once all searches are finished,
254      * and creates a new one for later searches.
255      * <p/>
256      * Doesn't refresh the searcher if a batch operation is in progress (i.e.
257      * {@link #batchMode} is <tt>true</tt>.
258      */
259     private void commitAndRefreshSearcher()
260     {
261         // don't refresh searcher during a batch operation
262         if (batchMode.get())
263             return;
264 
265         searcherRefreshLock.lock();
266         try
267         {
268             DelayCloseIndexSearcher oldSearcher = searcher;
269             writer.commit();
270             searcher = createSearcher();
271 
272             if (log.isDebugEnabled())
273                 log.debug("Closing current searcher");
274 
275             oldSearcher.closeWhenDone();
276         }
277         catch (IOException e)
278         {
279             throw new LuceneException("Error refreshing index searcher", e);
280         }
281         finally
282         {
283             searcherRefreshLock.unlock();
284         }
285     }
286 
287     /**
288      * @return a newly created and initialised searcher
289      */
290     private DelayCloseIndexSearcher createSearcher() throws LuceneException
291     {
292         if (log.isDebugEnabled())
293             log.debug("Creating new searcher");
294         
295         DelayCloseIndexSearcher searcher;
296         try
297         {
298             searcher = new DelayCloseIndexSearcher(writer.getReader());
299             searcherInitialisation.initialise(searcher);
300         }
301         catch (IOException e)
302         {
303             throw new LuceneException("Failed to create searcher for directory: ", e);
304         }
305         return searcher;
306     }
307 
308     /**
309      * Closes the provided reader and logs any exceptions.
310      */
311     private void closeReader(IndexReader reader)
312     {
313         if (reader == null)
314             return;
315 
316         if (log.isDebugEnabled())
317             log.debug("Closing index reader: " + reader.directory());
318         try
319         {
320             reader.close();
321         }
322         catch (IOException e)
323         {
324             log.error("Error closing reader: " + reader.directory(), e);
325         }
326     }
327 
328     /**
329      * Closes the provided searcher and logs any exceptions.
330      */
331     private void closeSearcher(IndexSearcher searcher)
332     {
333         if (searcher == null)
334             return;
335         try
336         {
337             searcher.close();
338         }
339         catch (IOException e)
340         {
341             log.error("Error occurred while closing searcher " + searcher.getIndexReader().directory(), e);
342         }
343     }
344 
345     /**
346      * Closes the provided writer and logs any thrown exceptions without rethrowing them.
347      */
348     private void closeWriter(IndexWriter writer)
349     {
350         if (writer == null)
351             return;
352         
353         if (log.isDebugEnabled())
354             log.debug("Closing index writer " + writer.getDirectory());
355         try
356         {
357             writer.close();
358         }
359         catch (IOException e)
360         {
361             log.error("Error closing writer " + writer.getDirectory(), e);
362         }
363     }
364 
365     private void configureIndexWriter(final IndexWriter indexWriter, Configuration configuration)
366     {
367         if (batchMode.get())
368         {
369             // using batchMode config
370             indexWriter.setMaxBufferedDocs(configuration.getBatchMaxBufferedDocs());
371             indexWriter.setMaxMergeDocs(configuration.getBatchMaxMergeDocs());
372             indexWriter.setMergeFactor(configuration.getBatchMergeFactor());
373         }
374         else
375         {
376             // using interactiveMode config
377             indexWriter.setMaxBufferedDocs(configuration.getInteractiveMaxBufferedDocs());
378             indexWriter.setMaxMergeDocs(configuration.getInteractiveMaxMergeDocs());
379             indexWriter.setMergeFactor(configuration.getInteractiveMergeFactor());
380         }
381         indexWriter.setMaxFieldLength(configuration.getMaxFieldLength());
382         indexWriter.setUseCompoundFile(configuration.isCompoundIndexFileFormat());
383     }
384     
385 	public void truncateIndex() throws LuceneException
386     {
387     	withWriter(new ILuceneConnection.WriterAction()
388         {
389             public void perform(IndexWriter writer) throws IOException
390             {
391                 writer.deleteAll();
392             }
393         });
394     }
395 	
396     /**
397      * get a Directory from a path and don't throw a whingy bloody IOException
398      */
399     private static Directory getDirectory(File path)
400     {
401         try
402         {
403             if (!path.exists() && !path.mkdir())
404             {
405                 throw new IOException("Unable to create index directory '" + path.getAbsolutePath() + "'");
406             }
407             return FSDirectory.getDirectory(path);
408         }
409         catch (IOException e)
410         {
411             throw new LuceneException(e);
412         }
413     }
414 
415     /**
416      * Ensures the index directory exists, contains a Lucene index, and is available
417      * for writing.
418      */
419     private void ensureIndexExists(Directory directory)
420     {
421         indexWriteLock.lock();
422         try
423         {
424             if (!IndexReader.indexExists(directory))
425             {
426                 new IndexWriter(directory, null, true).close();
427             }
428             
429             if (IndexWriter.isLocked(directory))
430             {
431                 // happens if the index was locked by a process which then died (or is still running -- hence the warning)
432                 log.warn("Forcing unlock of locked index directory: " + directory);
433                 IndexWriter.unlock(directory);
434             }
435         }
436         catch (IOException e)
437         {
438             throw new LuceneException("Cannot create index directory", e);
439         }
440         finally
441         {
442             indexWriteLock.unlock();
443         }
444     }
445 }