View Javadoc

1   package com.atlassian.bonnie.index;
2   
3   import com.atlassian.bonnie.LuceneConnection;
4   import com.atlassian.bonnie.search.IndexerThreadFactory;
5   import com.atlassian.core.util.ProgressMeter;
6   import com.atlassian.core.util.ProgressWrapper;
7   import org.apache.log4j.Category;
8   
9   import java.io.IOException;
10  
11  import edu.emory.mathcs.backport.java.util.concurrent.*;
12  
13  /**
14   * Multiple-thread implementation of a BatchOpIndexer
15   */
16  public abstract class BaseMultiThreadedIndexer implements BatchOpIndexer
17  {
18      protected static Category log = Category.getInstance(BaseMultiThreadedIndexer.class);
19      protected LuceneConnection luceneConnection;
20      private boolean reindexing;
21  
22      public final void reindex(ObjectQueue queue, DocumentWritingScheme documentWritingScheme,
23                                ProgressMeter meter, boolean truncate)
24      {
25          try
26          {
27              reindexing = true;
28              int numThreads = calculateNumberOfThreads(queue.size());
29              ProgressWrapper progress = meter != null ?
30                      new ProgressWrapper(meter, queue.size()) : NoOpProgressWrapper.INSTANCE;
31              documentWritingScheme.setProgressWrapper(progress);
32  
33              if (log.isDebugEnabled())
34                  log.debug("Starting reindexing thread pool with " + numThreads + " threads");
35  
36              ExecutorService executor = Executors.newFixedThreadPool(numThreads,
37                  new IndexerThreadFactory(getClass().getName()));
38  
39              Runnable r = getQueueProcessingRunnable(queue, documentWritingScheme);
40              for (int i = 0; i < numThreads; i++)
41              {
42                  executor.execute(r);
43              }
44  
45              executor.shutdown();
46              if (log.isDebugEnabled())
47                  log.debug("Waiting for queue to shutdown...");
48              try
49              {
50                  while (!executor.awaitTermination(60L, TimeUnit.SECONDS))
51                  {
52                      log.warn("Timed out while waiting for reindexing threads to terminate. " +
53                          "Continuing to wait...");
54                  }
55              }
56              catch (InterruptedException e)
57              {
58                  log.error("Problem in parallelising indexing? " + e, e);
59              }
60  
61              progress.setPercentage(99);
62              progress.setStatus("Merging indices.");
63  
64              allThreadsComplete(documentWritingScheme, truncate, progress);
65  
66              progress.setPercentage(100);
67              progress.setStatus("Finished reindexing " + progress.getTotal() + " objects.");
68          }
69          finally
70          {
71              reindexing = false;
72          }
73      }
74  
75      protected Runnable getQueueProcessingRunnable(ObjectQueue queue, DocumentWritingScheme documentWritingScheme)
76      {
77          return new QueueProcessingRunnableImpl(queue, documentWritingScheme);
78      }
79  
80      public void truncateIndex() throws IOException
81      {
82          luceneConnection.truncateIndex();
83      }
84  
85      /**
86       * Perform clean-up operations such as closing writers, optimizing, merging with original index, etc.
87       *
88       * @param scheme   scheme
89       * @param truncate should the original index be truncated
90       */
91      protected abstract void allThreadsComplete(DocumentWritingScheme scheme, boolean truncate, ProgressWrapper progress);
92  
93      /**
94       * Determine how many threads to use.
95       *
96       * @param numObjects the number of objects to index
97       * @return number of threads
98       */
99      protected int calculateNumberOfThreads(int numObjects)
100     {
101         if (numObjects < 10)
102             return 1;
103         else if (numObjects < 100)
104             return 3;
105         else if (numObjects < 500)
106             return 5;
107         else
108             return 10;
109     }
110 
111     /**
112      * Is reindexing currently underway? Distinct from {@link com.atlassian.bonnie.LuceneConnection#isReIndexing()}
113      * in that this method is for internal use, whilst the LuceneConnection equivalent is for all potential users
114      * of the index, whether in- or out-of-process clients.
115      *
116      * @return true if the reindex method is currently running.
117      */
118     protected boolean isReindexing()
119     {
120         return reindexing;
121     }
122 
123     public void setReindexing(boolean reindexing)
124     {
125         this.reindexing = reindexing;
126     }
127 }