1 package com.atlassian.bonnie;
2
3 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
4 import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
5 import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
6 import org.apache.log4j.Logger;
7 import org.apache.lucene.analysis.Analyzer;
8 import org.apache.lucene.index.IndexReader;
9 import org.apache.lucene.index.IndexWriter;
10 import org.apache.lucene.search.DelayCloseIndexSearcher;
11 import org.apache.lucene.search.IndexSearcher;
12 import org.apache.lucene.store.Directory;
13 import org.apache.lucene.store.FSDirectory;
14 import org.apache.lucene.store.RAMDirectory;
15
16 import java.io.File;
17 import java.io.IOException;
18
19 /**
20 * ILuceneConnection implementation that allows concurrent searching/reading and writing/deleting. Concurrent writes and
21 * deletes block each other. <p/> This class optimizes use of Lucene reader instances by holding a common IndexReader
22 * that is shared by idempotent operations on an unmodified index. Any mutative operations cause the current IndexReader
23 * to be cleared, and subsequent reads will see the results of the previous index change. <p/> TODO We might need to
24 * keep track of currently open readers and block {@link #recreateIndexDirectory()} until they are all closed. We would
25 * also need to prevent any new Readers being created during this time. The reason for this is that Windoze can whinge
26 * about open file crap and refuse to delete files rather than listen to its l33t haxor ruler.
27 */
28 public class ConcurrentLuceneConnection implements ILuceneConnection
29 {
30 private static Logger log = Logger.getLogger(ConcurrentLuceneConnection.class);
31
32 private transient Directory directory;
33
34 private final Analyzer analyzerForIndexing;
35
36 private final Configuration configuration;
37
38 private final Lock indexWriteLock = new ReentrantLock();
39
40 private final Lock searcherCreationLock = new ReentrantLock();
41
42 /**
43 * Used to indicate that we are currently in batchMode and need to use the batch configuration when using an
44 * IndexWriter
45 */
46 private final AtomicBoolean batchMode = new AtomicBoolean(false);
47
48 // private final Configuration
49
50 /**
51 * Used to search the index
52 */
53 private DelayCloseIndexSearcher currentSearcher;
54
55 // --------------------------------------
56 // ctors
57 // --------------------------------------
58
59 public ConcurrentLuceneConnection(Directory directory, Analyzer analyzer, Configuration configuration)
60 {
61 this.directory = directory;
62 this.analyzerForIndexing = analyzer;
63 this.configuration = configuration;
64 }
65
66 public ConcurrentLuceneConnection(File path, Analyzer analyzer, Configuration configuration)
67 {
68 this(getDirectory(path), analyzer, configuration);
69 }
70
71 public int getNumDocs()
72 {
73 return ((Integer) withReader(new ReaderAction()
74 {
75 public Object perform(IndexReader reader)
76 {
77 return new Integer(reader.numDocs());
78 }
79 })).intValue();
80 }
81
82 public boolean isIndexCreated()
83 {
84 try
85 {
86 return IndexReader.indexExists(directory);
87 }
88 catch (IOException e)
89 {
90 throw new LuceneException(e);
91 }
92 }
93
94 public IndexSearcher leakSearcher()
95 {
96 try
97 {
98 return getOpenedSearcher();
99 }
100 catch (Throwable e)
101 {
102 flipCurrentSearcher();
103 // Throw the correct error or exception
104 throwLuceneException(e);
105 // Never happens as the above method throws something all the time
106 throw new IllegalStateException("Exception should have been thrown.");
107 }
108 }
109
110 /**
111 * Blocks and waits until all write operations to the index complete. Optimizes the index while holding the write
112 * lock to ensure no concurrent modifications. Optimize is done using interactive mode configuration.
113 */
114 public void optimize() throws LuceneException
115 {
116 withWriter(new WriterAction()
117 {
118 public void perform(IndexWriter writer) throws IOException
119 {
120 writer.optimize();
121 }
122 });
123 }
124
125 /**
126 * Blocks and waits until all write operations to the index complete. Recreates the index while holding the write
127 * lock to ensure no concurrent modifications.
128 */
129 public void recreateIndexDirectory()
130 {
131 // We are about to update (nuke) the index. So get the lock before
132 // touching the index.
133 indexWriteLock.lock();
134 try
135 {
136 // Pass 'true' for 'create' to the IndexWriter so that the index is
137 // either:
138 // - created if it does not exists
139 // - or blown away and empty one recreated if index existed already
140 /*
141 * According to http://issues.apache.org/jira/browse/LUCENE-140 it is not sufficient to create an IndexWriter with the create flag, you need to
142 * recreate the directory as well
143 */
144 directory.close();
145 if (directory instanceof FSDirectory)
146 {
147 directory = FSDirectory.getDirectory(((FSDirectory) directory).getFile());
148 }
149 else if (directory instanceof RAMDirectory)
150 {
151 directory = new RAMDirectory();
152 }
153 new IndexWriter(directory, null, true).close();
154 }
155 catch (IOException e)
156 {
157 throw new LuceneException("Cannot create index directory.", e);
158 }
159 finally
160 {
161 try
162 {
163 flipCurrentSearcher();
164 }
165 finally
166 {
167 // Release the lock no matter what happened.
168 indexWriteLock.unlock();
169 }
170 }
171 }
172
173 public void close()
174 {
175 flipCurrentSearcher();
176 }
177
178 /**
179 * This implementation does not respect the boolean return of the of the
180 * {@link SearcherAction#perform(org.apache.lucene.search.IndexSearcher)} method
181 */
182 public void withSearch(SearcherAction action) throws LuceneException
183 {
184 IndexSearcher searcher;
185 try
186 {
187 searcher = getOpenedSearcher();
188 try
189 {
190 boolean b = action.perform(searcher);
191 if (!b)
192 {
193 throw new UnsupportedOperationException(
194 "Searchers are always closed. The searcherAction should always return true, we do not allow them to control closing of the searchers");
195 }
196 }
197 finally
198 {
199 // If we are here then the open() call has succeeded. Call the
200 // close() method to ensure
201 // that the usage count of the searcher is decremented, so that
202 // the searcher can be closed when needed
203 closeSearcher(searcher);
204 }
205 }
206 catch (Throwable e)
207 {
208 flipCurrentSearcher();
209 // Throw the correct error or exception
210 throwLuceneException(e);
211 }
212 }
213
214 public Object withReader(ReaderAction action) throws LuceneException
215 {
216 // We do not need to close the reader as it should remain open and get
217 // resused until it is specifically flipped.
218 try
219 {
220 IndexSearcher searcher = getOpenedSearcher();
221 try
222 {
223 return action.perform(searcher.getIndexReader());
224 }
225 finally
226 {
227 // If we are here then the open() call has succeeded. Call
228 // the close() method to ensure that the usage count of the
229 // searcher is decremented, so that the searcher can be
230 // closed when needed
231 closeSearcher(searcher);
232 }
233 }
234 catch (Throwable e)
235 {
236 // On error close the current searcher to ensure no one else uses
237 // it. If the searcher got closed in the mean time we do not
238 // really care - all this means is that the searcher will be closed
239 // (and potentially created) one more time.
240 // As we should not be here very often the inefficiency is bearable
241 flipCurrentSearcher();
242 // Throw the correct error or exception
243 throwLuceneException(e);
244 // fool the stupid compiler
245 return null;
246 }
247 }
248
249 /**
250 * Blocks and waits until all write operations to the index complete. Executes the ReaderAction while holding the
251 * write lock to ensure no concurrent modifications.
252 */
253 public void withReaderAndDeletes(ReaderAction action) throws LuceneException
254 {
255 // We are about to update (remove documents from) the index. So grab the
256 // lock before modifying the index
257 indexWriteLock.lock();
258 IndexReader deleter = null;
259 try
260 {
261 deleter = constructIndexDeleter();
262 action.perform(deleter);
263 // Only close the current searcher if the index update succeeded
264 flipCurrentSearcher();
265 }
266 catch (IOException e)
267 {
268 throw new LuceneException(e);
269 }
270 finally
271 {
272 try
273 {
274 closeReader(deleter);
275 }
276 finally
277 {
278 indexWriteLock.unlock();
279 }
280 }
281 }
282
283 /**
284 * Blocks and waits until all write operations to the index complete. Executes the WriterAction while holding the
285 * write lock to ensure no concurrent modifications.
286 */
287 public void withWriter(WriterAction action) throws LuceneException
288 {
289 // The index is about to be updated so get the lock
290 indexWriteLock.lock();
291
292 IndexWriter writer = null;
293 try
294 {
295 writer = constructIndexWriter();
296 action.perform(writer);
297 // TODO Should this be moved to the finally block?
298 // Only close the current searcher if the index update succeeded
299 flipCurrentSearcher();
300 }
301 catch (IOException e)
302 {
303 throw new LuceneException(e);
304 }
305 finally
306 {
307 try
308 {
309 closeWriter(writer);
310 }
311 finally
312 {
313 // Release the lock, no matter what.
314 indexWriteLock.unlock();
315 }
316 }
317 }
318
319 public void withDeleteAndWrites(ReaderAction readerAction, WriterAction writerAction) throws LuceneException
320 {
321 indexWriteLock.lock();
322 try
323 {
324 withReaderAndDeletes(readerAction);
325 withWriter(writerAction);
326 }
327 finally
328 {
329 indexWriteLock.unlock();
330 }
331 }
332
333 /**
334 * Blocks and waits until all write operations to the index complete. Executes the BatchUpdateAction while holding
335 * the write lock to ensure no concurrent modifications. <p/> Note: This method holds the writeLock for the whole
336 * operation, so is used to ensure a set of deletes and writes are effectively executed atomically.
337 */
338 public void withBatchUpdate(BatchUpdateAction action)
339 {
340 // The index is about to be modified, so grab the lock.
341 indexWriteLock.lock();
342 try
343 {
344 batchMode.set(true);
345 action.perform();
346 }
347 catch (Exception e)
348 {
349 throwLuceneException(e);
350 }
351 finally
352 {
353 batchMode.set(false);
354 // Release the lock no matter what happened.
355 indexWriteLock.unlock();
356 }
357 }
358
359 /**
360 * Closes the searcher that is currently in use.
361 */
362 public void flipCurrentSearcher()
363 {
364 if (log.isDebugEnabled())
365 {
366 log.debug("Closing current searcher..");
367 }
368
369 searcherCreationLock.lock();
370 try
371 {
372 if (this.currentSearcher != null)
373 {
374 try
375 {
376 this.currentSearcher.closeWhenDone();
377 }
378 catch (Exception e)
379 {
380 log.error("Error occured attempting to close Index searcher", e);
381 }
382 finally
383 {
384 this.currentSearcher = null;
385 }
386 }
387 }
388 finally
389 {
390 searcherCreationLock.unlock();
391 }
392 }
393
394 /**
395 * Lazily instantiates a {@link DelayCloseIndexSearcher} and calls its
396 * {@link org.apache.lucene.search.DelayCloseIndexSearcher#open()} method to increment its usage count. Ensure that
397 * the searcher's {@link org.apache.lucene.search.DelayCloseIndexSearcher#close()} method is called after it has
398 * been used.
399 */
400 private DelayCloseIndexSearcher getOpenedSearcher() throws IOException
401 {
402 searcherCreationLock.lock();
403 try
404 {
405 if (this.currentSearcher == null)
406 {
407 this.currentSearcher = new DelayCloseIndexSearcher(directory);
408 }
409
410 // As we are about to use the searcher, call open() method to ensure
411 // that the usage count of the
412 // searcher is incremented, so that the searcher is not closed until
413 // we are finished with it
414 this.currentSearcher.open();
415
416 return this.currentSearcher;
417 }
418 finally
419 {
420 searcherCreationLock.unlock();
421 }
422 }
423
424 /**
425 * This method always returns a new instance of a {@link IndexReader} which uses the {@link #directory}.
426 *
427 * @throws LuceneException
428 * if an exception is thrown while constructing the reader.
429 */
430 private IndexReader constructIndexDeleter()
431 {
432 try
433 {
434 return IndexReader.open(directory);
435 }
436 catch (IOException e)
437 {
438 throw new LuceneException(e);
439 }
440 }
441
442 /**
443 * Closes the provided reader and logs any thrown exceptions without rethrowing them.
444 *
445 * @param reader
446 * reader to close.
447 */
448 private void closeReader(IndexReader reader)
449 {
450 try
451 {
452 if (reader != null)
453 {
454 if (log.isDebugEnabled())
455 {
456 log.debug(Thread.currentThread().getName() + "## closing reader");
457 }
458
459 reader.close();
460 }
461 }
462 catch (IOException e)
463 {
464 log.error("Error closing reader. " + e, e);
465 }
466 }
467
468 /**
469 * Close a searcher and don't throw IOExceptions
470 */
471 private void closeSearcher(IndexSearcher searcher)
472 {
473 try
474 {
475 if (searcher != null)
476 {
477 searcher.close();
478 }
479 }
480 catch (IOException e)
481 {
482 log.error("Error occurred while closing searcher.", e);
483 }
484 }
485
486 /**
487 * Closes the provided writer and logs any thrown exceptions without rethrowing them.
488 *
489 * @param writer
490 * writer to close.
491 */
492 private void closeWriter(IndexWriter writer)
493 {
494 try
495 {
496 if (writer != null)
497 {
498 if (log.isDebugEnabled())
499 {
500 log.debug("## closing writer");
501 }
502
503 writer.close();
504 }
505 else
506 {
507 log.warn("## trying to close null writer.");
508 }
509 }
510 catch (IOException e)
511 {
512 log.error("Error closing writer. " + e, e);
513 }
514 }
515
516 /**
517 * This method always returns a new instance of a {@link IndexWriter} which uses the
518 * {@link #directory). The constructed writer expects the index to already exist.
519 *
520 * @throws LuceneException
521 * if an exception is thrown while constructing the writer.
522 */
523 private IndexWriter constructIndexWriter() throws LuceneException
524 {
525 try
526 {
527 if (log.isDebugEnabled())
528 {
529 log.debug(Thread.currentThread().getName() + "## opening writer");
530 }
531 final IndexWriter indexWriter = new IndexWriter(directory, analyzerForIndexing, false);
532 if (batchMode.get())
533 {
534 // using batchMode config
535 indexWriter.setMaxBufferedDocs(configuration.getBatchMaxBufferedDocs());
536 indexWriter.setMaxMergeDocs(configuration.getBatchMaxMergeDocs());
537 indexWriter.setMergeFactor(configuration.getBatchMergeFactor());
538 }
539 else
540 {
541 // using interactiveMode config
542 indexWriter.setMaxBufferedDocs(configuration.getInteractiveMaxBufferedDocs());
543 indexWriter.setMaxMergeDocs(configuration.getInteractiveMaxMergeDocs());
544 indexWriter.setMergeFactor(configuration.getInteractiveMergeFactor());
545 }
546 indexWriter.setMaxFieldLength(configuration.getMaxFieldLength());
547 return indexWriter;
548 }
549 catch (IOException e)
550 {
551 throw new LuceneException(e);
552 }
553 }
554
555 private void throwLuceneException(Throwable e)
556 {
557 if (e instanceof Error)
558 {
559 throw (Error) e;
560 }
561
562 if (e instanceof RuntimeException)
563 {
564 throw (RuntimeException) e;
565 }
566
567 throw new LuceneException(e);
568 }
569
570 /**
571 * get a Directory from a path and don't throw a whingy bloody IOException
572 */
573 private static Directory getDirectory(File path)
574 {
575 try
576 {
577 return FSDirectory.getDirectory(path);
578 }
579 catch (IOException e)
580 {
581 throw new LuceneException(e);
582 }
583 }
584 }