View Javadoc

1   package com.atlassian.bonnie;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.util.concurrent.atomic.AtomicBoolean;
6   import java.util.concurrent.atomic.AtomicReference;
7   import java.util.concurrent.locks.Lock;
8   
9   import org.apache.lucene.analysis.Analyzer;
10  import org.apache.lucene.index.IndexReader;
11  import org.apache.lucene.index.IndexWriter;
12  import org.apache.lucene.search.DelayCloseIndexSearcher;
13  import org.apache.lucene.search.IndexSearcher;
14  import org.apache.lucene.store.Directory;
15  import org.apache.lucene.store.FSDirectory;
16  import org.apache.lucene.store.RAMDirectory;
17  import org.slf4j.Logger;
18  import org.slf4j.LoggerFactory;
19  
20  import com.atlassian.bonnie.search.SearcherInitialisation;
21  
22  /**
23   * ILuceneConnection implementation that allows concurrent searching/reading and
24   * writing/deleting. Concurrent writes and deletes block each other.
25   * <p/>
26   * This class optimizes use of Lucene reader instances by holding a common
27   * IndexReader that is shared by idempotent operations on an unmodified index.
28   * Any mutative operations cause the current IndexReader to be cleared, and
29   * subsequent reads will see the results of the previous index change.
30   * <p/>
31   * TODO We might need to keep track of currently open readers and block
32   * {@link #recreateIndexDirectory()} until they are all closed. We would also
33   * need to prevent any new Readers being created during this time. The reason
34   * for this is that Windows can throw exceptions when trying to move or
35   * rename open files.
36   */
37  public class LuceneConnection implements ILuceneConnection
38  {
39      private static final Logger log = LoggerFactory.getLogger(LuceneConnection.class);
40      
41      private static final SearcherInitialisation NOOP_SEARCHER_INITIALISATION = new SearcherInitialisation() {
42          public void initialise(IndexSearcher searcher) {}
43      };
44      
45      /**
46       * @deprecated since 3.2
47       * @see #withWriter(WriterAction)
48       */
49      public static final int WRITER_DEFAULT = 0x01;
50  
51      /**
52       * @deprecated since 3.2
53       * @see #withWriter(WriterAction)
54       */
55      public static final int WRITER_INTERACTIVE = 0x02;
56  
57      /**
58       * @deprecated since 3.2
59       * @see #withWriter(WriterAction)
60       */
61      public static final int WRITER_BATCH = 0x04;
62  
63      private final Analyzer analyzerForIndexing;
64  
65      private final Configuration configuration;
66  
67      private final SearcherInitialisation searcherInitialisation;
68  
69      private final Lock indexWriteLock = new LoggingReentrantLock("indexWriteLock");
70  
71      private final Lock searcherRefreshLock = new LoggingReentrantLock("searcherRefreshLock");
72      
73      private final AtomicBoolean isClosed = new AtomicBoolean(false);
74  
75      /**
76       * Used to indicate that we are currently in batchMode and need to use the batch configuration when using an
77       * IndexWriter
78       */
79      private final AtomicBoolean batchMode = new AtomicBoolean(false);
80  
81      private volatile Directory directory;
82  
83      private volatile DelayCloseIndexSearcher searcher;
84  
85      public LuceneConnection(Directory directory, Analyzer analyzer, Configuration configuration, SearcherInitialisation searcherInitialisation)
86      {
87          this.directory = directory;
88          this.analyzerForIndexing = analyzer;
89          this.configuration = configuration;
90          this.searcherInitialisation = searcherInitialisation;
91          
92          ensureIndexExists();
93          searcher = createSearcher();
94      }
95  
96      public LuceneConnection(Directory directory, Analyzer analyzer, Configuration configuration)
97      {
98          this(directory, analyzer, configuration, NOOP_SEARCHER_INITIALISATION);
99      }
100 
101     public LuceneConnection(Directory directory, Analyzer analyzer)
102     {
103         this(directory, analyzer, DEFAULT_CONFIGURATION);
104     }
105     
106     public LuceneConnection(File path, Analyzer analyzer, Configuration configuration, SearcherInitialisation searcherInitialisation)
107     {
108         this(getDirectory(path), analyzer, configuration, searcherInitialisation);
109     }
110 
111     public LuceneConnection(File path, Analyzer analyzer, Configuration configuration)
112     {
113         this(getDirectory(path), analyzer, configuration);
114     }
115 
116     public LuceneConnection(File path, Analyzer analyzer)
117     {
118         this(path, analyzer, DEFAULT_CONFIGURATION);
119     }
120     
121     public int getNumDocs()
122     {
123         return (Integer) withReader(new ReaderAction() {
124             public Object perform(IndexReader reader)
125             {
126                 return reader.numDocs();
127             }
128         });
129     }
130 
131     /**
132      * @deprecated since 3.2
133      * @see ILuceneConnection#isIndexCreated()
134      */
135     public boolean isIndexCreated()
136     {
137         assertNotClosed();
138         try
139         {
140             return IndexReader.indexExists(directory);
141         }
142         catch (IOException e)
143         {
144             throw new LuceneException(e);
145         }
146     }
147 
148     /**
149      * @deprecated since 3.2
150      * @see ILuceneConnection#leakSearcher()
151      */
152     public IndexSearcher leakSearcher()
153     {
154         assertNotClosed();
155 
156         // ensure we return the same searcher that we open
157         DelayCloseIndexSearcher currentSearcher = searcher;
158         currentSearcher.open();
159         return currentSearcher;
160     }
161 
162     /**
163      * Blocks and waits until all write operations to the index complete. Optimizes the index while holding the write
164      * lock to ensure no concurrent modifications. Optimize is done using interactive mode configuration.
165      */
166     public void optimize() throws LuceneException
167     {
168         withWriter(new WriterAction()
169         {
170             public void perform(IndexWriter writer) throws IOException
171             {
172                 writer.optimize();
173             }
174         });
175     }
176 
177     /**
178      * Blocks and waits until all write operations to the index complete. Recreates the index while holding the write
179      * lock to ensure no concurrent modifications.
180      */
181     public void recreateIndexDirectory()
182     {
183         assertNotClosed();
184         indexWriteLock.lock();
185         try
186         {
187             /*
188              * According to http://issues.apache.org/jira/browse/LUCENE-140 it is not sufficient to create an IndexWriter with the create flag, you need to
189              * recreate the directory as well
190              */
191             directory.close();
192             if (directory instanceof FSDirectory)
193             {
194                 directory = FSDirectory.getDirectory(((FSDirectory) directory).getFile());
195             }
196             else if (directory instanceof RAMDirectory)
197             {
198                 directory = new RAMDirectory();
199             }
200             
201             // Pass 'true' for 'create' to the IndexWriter so that the index is
202             // either:
203             // - created if it does not exists
204             // - or blown away and empty one recreated if index existed already
205             new IndexWriter(directory, null, true).close();
206             
207             refreshSearcher();
208         }
209         catch (IOException e)
210         {
211             throw new LuceneException("Cannot create index directory: " + directory, e);
212         }
213         finally
214         {
215             indexWriteLock.unlock();
216         }
217     }
218 
219     public void close() throws LuceneException
220     {
221         assertNotClosed();
222         try
223         {
224             searcher.closeWhenDone();
225             isClosed.set(true);
226         }
227         catch (IOException e)
228         {
229             throw new LuceneException(e);
230         }
231     }
232     
233     private void assertNotClosed() throws LuceneException
234     {
235         if (isClosed.get())
236             throw new LuceneConnectionClosedException("Cannot operate on closed " + getClass().getSimpleName());
237     }
238     
239     /**
240      * @deprecated since 3.2, use {@link #close()} instead. 
241      */
242     public void flushWriter() throws LuceneException
243     {
244         close();
245     }
246     
247     /**
248      * This implementation does not respect the boolean return of the of the
249      * {@link ILuceneConnection.SearcherAction#perform(org.apache.lucene.search.IndexSearcher)} method
250      */
251     public void withSearch(SearcherAction action) throws LuceneException
252     {
253         assertNotClosed();
254 
255         // keep a reference to the searcher in case it changes before the end of the search
256         DelayCloseIndexSearcher currentSearcher = searcher;
257         currentSearcher.open();
258         try
259         {
260             action.perform(currentSearcher);
261         }
262         catch (IOException e)
263         {
264             throw new LuceneException(e);
265         }
266         finally
267         {
268             closeSearcher(currentSearcher);
269         }
270     }
271 
272     public Object withReader(final ReaderAction action) throws LuceneException
273     {
274         final AtomicReference<Object> result = new AtomicReference<Object>();
275         withSearch(new SearcherAction()
276         {
277             public void perform(IndexSearcher searcher) throws IOException
278             {
279                 result.set(action.perform(searcher.getIndexReader()));
280             }
281         });
282         return result.get();
283     }
284 
285     /**
286      * Blocks and waits until all write operations to the index complete. Executes the ReaderAction while holding the
287      * write lock to ensure no concurrent modifications.
288      */
289     public void withReaderAndDeletes(ReaderAction action) throws LuceneException
290     {
291         assertNotClosed();
292         indexWriteLock.lock();
293         try
294         {
295             IndexReader deleter = openReader();
296             try
297             {
298                 action.perform(deleter);
299             }
300             catch (IOException e)
301             {
302                 throw new LuceneException(e);
303             }
304             finally
305             {
306                 closeReader(deleter);
307             }
308             // Only close the current searcher if the index update succeeded
309             refreshSearcher();
310         }
311         finally
312         {
313             indexWriteLock.unlock();
314         }
315     }
316 
317     /**
318      * Blocks and waits until all write operations to the index complete. Executes the WriterAction while holding the
319      * write lock to ensure no concurrent modifications.
320      */
321     public void withWriter(WriterAction action) throws LuceneException
322     {
323         assertNotClosed();
324         indexWriteLock.lock();
325         try
326         {
327             IndexWriter writer = new IndexWriter(directory, analyzerForIndexing, false);
328             configureIndexWriter(writer, configuration);
329             try
330             {
331                 action.perform(writer);
332             }
333             catch (IOException e)
334             {
335                 throw new LuceneException(e);
336             }
337             finally
338             {
339                 closeWriter(writer);
340             }
341             // Only refresh the current searcher if the index update succeeded
342             refreshSearcher();
343         }
344         catch (IOException e)
345         {
346             throw new LuceneException("Couldn't open writer on directory: " + directory, e);
347         }
348         finally
349         {
350             // Release the lock, no matter what.
351             indexWriteLock.unlock();
352         }
353     }
354 
355     /**
356      * @deprecated since 3.2 use {@link #withWriter(WriterAction)} for interactive writes, or
357      * {@link #withBatchUpdate(BatchUpdateAction)} for batch writes.
358      */
359     public void withWriter(final WriterAction action, int flags) throws LuceneException
360     {
361         if (flags == WRITER_BATCH)
362         {
363             withBatchUpdate(new BatchUpdateAction() {
364                 public void perform() throws Exception {
365                     withWriter(action);
366                 }
367             });
368         }
369         else
370         {
371             withWriter(action);
372         }
373     }
374 
375     public void withDeleteAndWrites(ReaderAction readerAction, WriterAction writerAction) throws LuceneException
376     {
377         indexWriteLock.lock();
378         try
379         {
380             withReaderAndDeletes(readerAction);
381             withWriter(writerAction);
382         }
383         finally
384         {
385             indexWriteLock.unlock();
386         }
387     }
388 
389     /**
390      * Blocks and waits until all write operations to the index complete. Executes the BatchUpdateAction while holding
391      * the write lock to ensure no concurrent modifications. <p/> Note: This method holds the writeLock for the whole
392      * operation, so is used to ensure a set of deletes and writes are effectively executed atomically.
393      * <p/>
394      * Refreshes the searcher only once, at the end of the batch update action.
395      */
396     public void withBatchUpdate(BatchUpdateAction action)
397     {
398         assertNotClosed();
399         indexWriteLock.lock();
400         try
401         {
402             batchMode.set(true);
403             try
404             {
405                 action.perform();
406             }
407             catch (Exception e) // unfortunately, the API requires us to catch Exception here
408             {
409                 throw new LuceneException(e);
410             }
411             finally
412             {
413                 batchMode.set(false);
414             }
415             refreshSearcher();
416         }
417         finally
418         {
419             indexWriteLock.unlock();
420         }
421     }
422 
423     /**
424      * Closes the searcher that is currently in use.
425      * @deprecated since 3.2
426      * @see #withWriter(WriterAction)
427      * @see #withDeleteAndWrites(ReaderAction, WriterAction)
428      * @see #withSearch(SearcherAction)
429      */
430     public void flipCurrentSearcher()
431     {
432         assertNotClosed();
433         refreshSearcher();
434     }
435 
436     /**
437      * Marks the current searcher to be closed once all searches are finished,
438      * and creates a new one for later searches.
439      * <p/>
440      * Doesn't refresh the searcher if a batch operation is in progress (i.e.
441      * {@link #batchMode} is <tt>true</tt>.
442      */
443     private void refreshSearcher()
444     {
445         // don't refresh searcher during a batch operation
446         if (batchMode.get()) 
447             return; 
448         
449         searcherRefreshLock.lock();
450         try
451         {
452             DelayCloseIndexSearcher oldSearcher = searcher;
453             searcher = createSearcher();
454 
455             if (log.isDebugEnabled())
456                 log.debug("Closing current searcher");
457             try
458             {
459                 oldSearcher.closeWhenDone();
460             }
461             catch (IOException e)
462             {
463                 throw new LuceneException("Error closing index searcher", e);
464             }
465         }
466         finally
467         {
468             searcherRefreshLock.unlock();
469         }
470     }
471 
472     /**
473      * @return a newly created and initialised searcher
474      */
475     private DelayCloseIndexSearcher createSearcher() throws LuceneException
476     {
477         if (log.isDebugEnabled())
478             log.debug("Creating new searcher");
479         
480         DelayCloseIndexSearcher searcher;
481         try
482         {
483             searcher = new DelayCloseIndexSearcher(directory);
484         }
485         catch (IOException e)
486         {
487             throw new LuceneException("Failed to create searcher for directory: " + directory, e);
488         }
489         searcherInitialisation.initialise(searcher);
490         return searcher;
491     }
492 
493     private IndexReader openReader() throws LuceneException
494     {
495         try
496         {
497             return IndexReader.open(directory);
498         }
499         catch (IOException e)
500         {
501             throw new LuceneException(e);
502         }
503     }
504 
505     /**
506      * Closes the provided reader and logs any exceptions.
507      */
508     private void closeReader(IndexReader reader)
509     {
510         if (reader == null)
511             return;
512 
513         if (log.isDebugEnabled())
514             log.debug("Closing index reader: " + reader.directory());
515         try
516         {
517             reader.close();
518         }
519         catch (IOException e)
520         {
521             log.error("Error closing reader: " + reader.directory(), e);
522         }
523     }
524 
525     /**
526      * Closes the provided searcher and logs any exceptions.
527      */
528     private void closeSearcher(IndexSearcher searcher)
529     {
530         if (searcher == null)
531             return;
532         try
533         {
534             searcher.close();
535         }
536         catch (IOException e)
537         {
538             log.error("Error occurred while closing searcher " + searcher.getIndexReader().directory(), e);
539         }
540     }
541 
542     /**
543      * Closes the provided writer and logs any thrown exceptions without rethrowing them.
544      */
545     private void closeWriter(IndexWriter writer)
546     {
547         if (writer == null)
548             return;
549         
550         if (log.isDebugEnabled())
551             log.debug("Closing index writer " + writer.getDirectory());
552         try
553         {
554             writer.close();
555         }
556         catch (IOException e)
557         {
558             log.error("Error closing writer " + writer.getDirectory(), e);
559         }
560     }
561 
562     private void configureIndexWriter(final IndexWriter indexWriter, Configuration configuration)
563     {
564         if (batchMode.get())
565         {
566             // using batchMode config
567             indexWriter.setMaxBufferedDocs(configuration.getBatchMaxBufferedDocs());
568             indexWriter.setMaxMergeDocs(configuration.getBatchMaxMergeDocs());
569             indexWriter.setMergeFactor(configuration.getBatchMergeFactor());
570         }
571         else
572         {
573             // using interactiveMode config
574             indexWriter.setMaxBufferedDocs(configuration.getInteractiveMaxBufferedDocs());
575             indexWriter.setMaxMergeDocs(configuration.getInteractiveMaxMergeDocs());
576             indexWriter.setMergeFactor(configuration.getInteractiveMergeFactor());
577         }
578         indexWriter.setMaxFieldLength(configuration.getMaxFieldLength());
579         indexWriter.setUseCompoundFile(configuration.isCompoundIndexFileFormat());
580     }
581     
582 	public void truncateIndex() throws LuceneException
583     {
584     	withReaderAndDeletes(new ILuceneConnection.ReaderAction() {
585 			public Object perform(IndexReader reader) throws IOException {
586                 for (int i = 0; i < reader.maxDoc(); i++) {
587 		            reader.deleteDocument(i);
588 		        }
589 				return null;
590 			}
591     	});
592     }
593 	
594 	/**
595 	 * @deprecated since 3.2 because this class encapsulates the use of the index directory
596 	 */
597 	public File getIndexDir()
598     {
599 	    assertNotClosed();
600 
601         // keep a reference in case this changes
602         Directory currentDirectory = directory;
603         if (currentDirectory instanceof FSDirectory)
604 	    {
605 	        return ((FSDirectory) currentDirectory).getFile();
606 	    }
607 	    else
608 	    {
609 	        throw new UnsupportedOperationException("This " + this.getClass().getSimpleName() + " is not using a file system directory");
610 	    }
611     }
612         
613     /**
614      * get a Directory from a path and don't throw a whingy bloody IOException
615      */
616     private static Directory getDirectory(File path)
617     {
618         try
619         {
620             if (!path.exists() && !path.mkdir())
621             {
622                 throw new IOException("Unable to create index directory '" + path.getAbsolutePath() + "'");
623             }
624             return FSDirectory.getDirectory(path);
625         }
626         catch (IOException e)
627         {
628             throw new LuceneException(e);
629         }
630     }
631 
632     /**
633      * Ensures the index directory exists, contains a Lucene index, and is available
634      * for writing.
635      */
636     private void ensureIndexExists()
637     {
638         indexWriteLock.lock();
639         try
640         {
641             if (!IndexReader.indexExists(directory))
642             {
643                 new IndexWriter(directory, null, true).close();
644             }
645             
646             if (IndexReader.isLocked(directory))
647             {
648                 // happens if the index was locked by a process which then died (or is still running -- hence the warning)
649                 log.warn("Forcing unlock of locked index directory: " + directory);
650                 IndexReader.unlock(directory);
651             }
652         }
653         catch (IOException e)
654         {
655             throw new LuceneException("Cannot create index directory", e);
656         }
657         finally
658         {
659             indexWriteLock.unlock();
660         }
661     }
662 }