1 package com.atlassian.bonnie.index;
2
3 import java.io.IOException;
4 import java.util.concurrent.ExecutorService;
5 import java.util.concurrent.Executors;
6 import java.util.concurrent.TimeUnit;
7
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10
11 import com.atlassian.bonnie.ILuceneConnection;
12 import com.atlassian.bonnie.search.IndexerThreadFactory;
13 import com.atlassian.core.util.ProgressMeter;
14 import com.atlassian.core.util.ProgressWrapper;
15
16
17
18
19 public abstract class BaseMultiThreadedIndexer implements BatchOpIndexer
20 {
21 private static Logger log = LoggerFactory.getLogger(BaseMultiThreadedIndexer.class);
22 protected ILuceneConnection luceneConnection;
23 private boolean reindexing;
24
25 public final void reindex(ObjectQueue queue, DocumentWritingScheme documentWritingScheme,
26 ProgressMeter meter, boolean truncate)
27 {
28 try
29 {
30 reindexing = true;
31 int numThreads = calculateNumberOfThreads(queue.size());
32 ProgressWrapper progress = meter != null ?
33 new ProgressWrapper(meter, queue.size()) : NoOpProgressWrapper.INSTANCE;
34 documentWritingScheme.setProgressWrapper(progress);
35
36 if (log.isDebugEnabled())
37 log.debug("Starting reindexing thread pool with " + numThreads + " threads");
38
39 ExecutorService executor = Executors.newFixedThreadPool(numThreads,
40 new IndexerThreadFactory(getClass().getName()));
41
42 Runnable r = getQueueProcessingRunnable(queue, documentWritingScheme);
43 for (int i = 0; i < numThreads; i++)
44 {
45 executor.execute(r);
46 }
47
48 executor.shutdown();
49 if (log.isDebugEnabled())
50 log.debug("Waiting for queue to shutdown...");
51 try
52 {
53 while (!executor.awaitTermination(60L, TimeUnit.SECONDS))
54 {
55 log.warn("Timed out while waiting for reindexing threads to terminate. " +
56 "Continuing to wait...");
57 }
58 }
59 catch (InterruptedException e)
60 {
61 log.error("Problem in parallelising indexing? " + e, e);
62 }
63
64 progress.setPercentage(99);
65 progress.setStatus("Merging indices.");
66
67 allThreadsComplete(documentWritingScheme, truncate, progress);
68
69 progress.setPercentage(100);
70 progress.setStatus("Finished reindexing " + progress.getTotal() + " objects.");
71 }
72 finally
73 {
74 reindexing = false;
75 }
76 }
77
78 protected Runnable getQueueProcessingRunnable(ObjectQueue queue, DocumentWritingScheme documentWritingScheme)
79 {
80 return new QueueProcessingRunnableImpl(queue, documentWritingScheme);
81 }
82
83 public void truncateIndex() throws IOException
84 {
85 luceneConnection.truncateIndex();
86 }
87
88
89
90
91
92
93
94 protected abstract void allThreadsComplete(DocumentWritingScheme scheme, boolean truncate, ProgressWrapper progress);
95
96
97
98
99
100
101
102 protected int calculateNumberOfThreads(int numObjects)
103 {
104 if (numObjects < 10)
105 return 1;
106 else if (numObjects < 100)
107 return 3;
108 else if (numObjects < 500)
109 return 5;
110 else
111 return 10;
112 }
113
114
115
116
117
118
119 protected boolean isReindexing()
120 {
121 return reindexing;
122 }
123
124 public void setReindexing(boolean reindexing)
125 {
126 this.reindexing = reindexing;
127 }
128 }