1 package com.atlassian.bonnie.index;
2
3 import com.atlassian.bonnie.LuceneConnection;
4 import com.atlassian.bonnie.search.IndexerThreadFactory;
5 import com.atlassian.core.util.ProgressMeter;
6 import com.atlassian.core.util.ProgressWrapper;
7 import org.apache.log4j.Category;
8
9 import java.io.IOException;
10
11 import edu.emory.mathcs.backport.java.util.concurrent.*;
12
13
14
15
16 public abstract class BaseMultiThreadedIndexer implements BatchOpIndexer
17 {
18 protected static Category log = Category.getInstance(BaseMultiThreadedIndexer.class);
19 protected LuceneConnection luceneConnection;
20 private boolean reindexing;
21
22 public final void reindex(ObjectQueue queue, DocumentWritingScheme documentWritingScheme,
23 ProgressMeter meter, boolean truncate)
24 {
25 try
26 {
27 reindexing = true;
28 int numThreads = calculateNumberOfThreads(queue.size());
29 ProgressWrapper progress = meter != null ?
30 new ProgressWrapper(meter, queue.size()) : NoOpProgressWrapper.INSTANCE;
31 documentWritingScheme.setProgressWrapper(progress);
32
33 if (log.isDebugEnabled())
34 log.debug("Starting reindexing thread pool with " + numThreads + " threads");
35
36 ExecutorService executor = Executors.newFixedThreadPool(numThreads,
37 new IndexerThreadFactory(getClass().getName()));
38
39 Runnable r = getQueueProcessingRunnable(queue, documentWritingScheme);
40 for (int i = 0; i < numThreads; i++)
41 {
42 executor.execute(r);
43 }
44
45 executor.shutdown();
46 if (log.isDebugEnabled())
47 log.debug("Waiting for queue to shutdown...");
48 try
49 {
50 while (!executor.awaitTermination(60L, TimeUnit.SECONDS))
51 {
52 log.warn("Timed out while waiting for reindexing threads to terminate. " +
53 "Continuing to wait...");
54 }
55 }
56 catch (InterruptedException e)
57 {
58 log.error("Problem in parallelising indexing? " + e, e);
59 }
60
61 progress.setPercentage(99);
62 progress.setStatus("Merging indices.");
63
64 allThreadsComplete(documentWritingScheme, truncate, progress);
65
66 progress.setPercentage(100);
67 progress.setStatus("Finished reindexing " + progress.getTotal() + " objects.");
68 }
69 finally
70 {
71 reindexing = false;
72 }
73 }
74
75 protected Runnable getQueueProcessingRunnable(ObjectQueue queue, DocumentWritingScheme documentWritingScheme)
76 {
77 return new QueueProcessingRunnableImpl(queue, documentWritingScheme);
78 }
79
80 public void truncateIndex() throws IOException
81 {
82 luceneConnection.truncateIndex();
83 }
84
85
86
87
88
89
90
91 protected abstract void allThreadsComplete(DocumentWritingScheme scheme, boolean truncate, ProgressWrapper progress);
92
93
94
95
96
97
98
99 protected int calculateNumberOfThreads(int numObjects)
100 {
101 if (numObjects < 10)
102 return 1;
103 else if (numObjects < 100)
104 return 3;
105 else if (numObjects < 500)
106 return 5;
107 else
108 return 10;
109 }
110
111
112
113
114
115
116
117
118 protected boolean isReindexing()
119 {
120 return reindexing;
121 }
122
123 public void setReindexing(boolean reindexing)
124 {
125 this.reindexing = reindexing;
126 }
127 }