1   package com.atlassian.bonnie.index;
2   
3   import java.io.IOException;
4   import java.util.concurrent.ExecutorService;
5   import java.util.concurrent.Executors;
6   import java.util.concurrent.TimeUnit;
7   
8   import org.slf4j.Logger;
9   import org.slf4j.LoggerFactory;
10  
11  import com.atlassian.bonnie.ILuceneConnection;
12  import com.atlassian.bonnie.search.IndexerThreadFactory;
13  import com.atlassian.core.util.ProgressMeter;
14  import com.atlassian.core.util.ProgressWrapper;
15  
16  /**
17   * Multiple-thread implementation of a BatchOpIndexer
18   */
19  public abstract class BaseMultiThreadedIndexer implements BatchOpIndexer
20  {
21      private static Logger log = LoggerFactory.getLogger(BaseMultiThreadedIndexer.class);
22      protected ILuceneConnection luceneConnection;
23      private boolean reindexing;
24  
25      public final void reindex(ObjectQueue queue, DocumentWritingScheme documentWritingScheme,
26                                ProgressMeter meter, boolean truncate)
27      {
28          try
29          {
30              reindexing = true;
31              int numThreads = calculateNumberOfThreads(queue.size());
32              ProgressWrapper progress = meter != null ?
33                      new ProgressWrapper(meter, queue.size()) : NoOpProgressWrapper.INSTANCE;
34              documentWritingScheme.setProgressWrapper(progress);
35  
36              if (log.isDebugEnabled())
37                  log.debug("Starting reindexing thread pool with " + numThreads + " threads");
38  
39              ExecutorService executor = Executors.newFixedThreadPool(numThreads,
40                  new IndexerThreadFactory(getClass().getName()));
41  
42              Runnable r = getQueueProcessingRunnable(queue, documentWritingScheme);
43              for (int i = 0; i < numThreads; i++)
44              {
45                  executor.execute(r);
46              }
47  
48              executor.shutdown();
49              if (log.isDebugEnabled())
50                  log.debug("Waiting for queue to shutdown...");
51              try
52              {
53                  while (!executor.awaitTermination(60L, TimeUnit.SECONDS))
54                  {
55                      log.warn("Timed out while waiting for reindexing threads to terminate. " +
56                          "Continuing to wait...");
57                  }
58              }
59              catch (InterruptedException e)
60              {
61                  log.error("Problem in parallelising indexing? " + e, e);
62              }
63  
64              progress.setPercentage(99);
65              progress.setStatus("Merging indices.");
66  
67              allThreadsComplete(documentWritingScheme, truncate, progress);
68  
69              progress.setPercentage(100);
70              progress.setStatus("Finished reindexing " + progress.getTotal() + " objects.");
71          }
72          finally
73          {
74              reindexing = false;
75          }
76      }
77  
78      protected Runnable getQueueProcessingRunnable(ObjectQueue queue, DocumentWritingScheme documentWritingScheme)
79      {
80          return new QueueProcessingRunnableImpl(queue, documentWritingScheme);
81      }
82  
83      public void truncateIndex() throws IOException
84      {
85          luceneConnection.truncateIndex();
86      }
87  
88      /**
89       * Perform clean-up operations such as closing writers, optimizing, merging with original index, etc.
90       *
91       * @param scheme   scheme
92       * @param truncate should the original index be truncated
93       */
94      protected abstract void allThreadsComplete(DocumentWritingScheme scheme, boolean truncate, ProgressWrapper progress);
95  
96      /**
97       * Determine how many threads to use.
98       *
99       * @param numObjects the number of objects to index
100      * @return number of threads
101      */
102     protected int calculateNumberOfThreads(int numObjects)
103     {
104         if (numObjects < 10)
105             return 1;
106         else if (numObjects < 100)
107             return 3;
108         else if (numObjects < 500)
109             return 5;
110         else
111             return 10;
112     }
113 
114     /**
115      * Is reindexing currently underway?
116      *
117      * @return true if the reindex method is currently running.
118      */
119     protected boolean isReindexing()
120     {
121         return reindexing;
122     }
123 
124     public void setReindexing(boolean reindexing)
125     {
126         this.reindexing = reindexing;
127     }
128 }