1 package com.atlassian.bonnie.index;
2
3 import java.io.IOException;
4 import java.util.concurrent.BlockingQueue;
5 import java.util.concurrent.LinkedBlockingQueue;
6
7 import org.apache.lucene.analysis.Analyzer;
8 import org.apache.lucene.analysis.standard.StandardAnalyzer;
9 import org.apache.lucene.index.IndexReader;
10 import org.apache.lucene.index.IndexWriter;
11 import org.apache.lucene.index.Term;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14
15 import com.atlassian.bonnie.ILuceneConnection;
16 import com.atlassian.bonnie.LuceneException;
17 import com.atlassian.core.util.ProgressWrapper;
18
19
20
21
22
23
24
25
26
27
28
29
30 public class OnlineMultiThreadedIndexer extends BaseMultiThreadedIndexer implements SingleObjectIndexer
31 {
32 private static final Logger log = LoggerFactory.getLogger(OnlineMultiThreadedIndexer.class);
33 protected BlockingQueue<? super Object> reindexAddedQueue = new LinkedBlockingQueue<Object>();
34 protected BlockingQueue<? super Object> reindexDeletedQueue = new LinkedBlockingQueue<Object>();
35 private ObjectToDocumentConverter objectToDocumentConverter;
36
37 protected void allThreadsComplete(final DocumentWritingScheme scheme, final boolean truncate, final ProgressWrapper progress)
38 {
39 luceneConnection.withBatchUpdate(new ILuceneConnection.BatchUpdateAction()
40 {
41 public void perform() throws Exception
42 {
43 setReindexingStatus(true);
44
45 if (truncate)
46 {
47 progress.setStatus("Truncating index");
48 truncateIndex();
49 }
50 progress.setStatus("Closing index");
51 scheme.close(luceneConnection);
52 progress.setStatus("Flushing queues");
53 if (flushReindexingQueues())
54 {
55 progress.setStatus("Optimizing index");
56
57 optimize(luceneConnection);
58 }
59
60 setReindexingStatus(false);
61 }
62 });
63 }
64
65 public void index(final Object o)
66 {
67 luceneConnection.withBatchUpdate(new ILuceneConnection.BatchUpdateAction()
68 {
69 public void perform()
70 {
71 if (isReindexing())
72 {
73 try
74 {
75 if (log.isDebugEnabled())
76 log.debug("Adding object:" + o + " to reindexaddedqueue");
77 reindexAddedQueue.put(o);
78 }
79 catch (InterruptedException e)
80 {
81 log.error("Error encountered adding object to reindexaddedqueue", e);
82 }
83 }
84 unindex(o);
85 DocumentWritingScheme scheme = getDocumentWritingScheme(false);
86 Runnable r = new QueueProcessingRunnableImpl(new SingletonObjectQueue(o, objectToDocumentConverter), scheme);
87 r.run();
88 }
89 });
90 }
91
92
93
94
95
96
97
98 protected void doAdd(final Object o, IndexWriter writer) throws IOException
99 {
100 writer.addDocument(objectToDocumentConverter.convert(o, null));
101 }
102
103 public void unindex(final Object o)
104 {
105 luceneConnection.withBatchUpdate(new ILuceneConnection.BatchUpdateAction()
106 {
107 public void perform()
108 {
109 if (isReindexing())
110 {
111 try
112 {
113 if (log.isDebugEnabled())
114 log.debug("Adding object:" + o + " to reindexdeletedqueue");
115 reindexDeletedQueue.put(o);
116 }
117 catch (InterruptedException e)
118 {
119 log.error("Error encountered adding object to reindexdeletedqueue", e);
120 }
121 }
122 luceneConnection.withWriter(new ILuceneConnection.WriterAction()
123 {
124 public void perform(IndexWriter writer) throws IOException
125 {
126 doDelete(o, writer);
127 }
128 });
129 }
130 });
131 }
132
133
134
135
136
137
138
139
140 protected void doDelete(Object o, IndexWriter writer) throws IOException
141 {
142 final String[] identity = objectToDocumentConverter.getObjectIdentity(o);
143 writer.deleteDocuments(new Term(identity[0], identity[1]));
144 }
145
146
147
148
149
150 protected final void optimize(ILuceneConnection conn)
151 {
152 conn.withWriter(new ILuceneConnection.WriterAction()
153 {
154 public void perform(IndexWriter writer) throws IOException
155 {
156 writer.optimize();
157 }
158 });
159 }
160
161
162
163
164
165 private boolean flushReindexingQueues()
166 {
167 boolean somethingflushed = false;
168
169 if (!reindexDeletedQueue.isEmpty())
170 {
171 luceneConnection.withWriter(new ILuceneConnection.WriterAction()
172 {
173 public void perform(IndexWriter writer) throws IOException
174 {
175 try
176 {
177 for (Object o = reindexDeletedQueue.take(); !reindexDeletedQueue.isEmpty(); )
178 {
179 doDelete(o, writer);
180 o = reindexDeletedQueue.take();
181 }
182 }
183 catch (InterruptedException e)
184 {
185 throw new LuceneException(e);
186 }
187 }
188 });
189 somethingflushed = true;
190 }
191 if (!reindexAddedQueue.isEmpty())
192 {
193 luceneConnection.withWriter(new ILuceneConnection.WriterAction()
194 {
195 public void perform(IndexWriter writer) throws IOException
196 {
197 try
198 {
199 for (Object o = reindexAddedQueue.take(); !reindexAddedQueue.isEmpty();)
200 {
201 doAdd(o, writer);
202 o = reindexAddedQueue.take();
203 }
204 }
205 catch (InterruptedException e)
206 {
207 throw new LuceneException(e);
208 }
209 }
210 });
211 somethingflushed = true;
212 }
213 return somethingflushed;
214 }
215
216
217
218
219
220
221
222 protected DocumentWritingScheme getDocumentWritingScheme(boolean reindex)
223 {
224 if (reindex)
225 return new TempDirectoryDocumentWritingScheme(this);
226 else return new SingleDocumentWritingScheme(luceneConnection);
227 }
228
229
230 public void setLuceneConnection(ILuceneConnection luceneConnection)
231 {
232 this.luceneConnection = luceneConnection;
233 }
234
235
236 protected ObjectToDocumentConverter getObjectToDocumentConverter()
237 {
238 return objectToDocumentConverter;
239 }
240
241 public void setObjectToDocumentConverter(ObjectToDocumentConverter objectToDocumentConverter)
242 {
243 this.objectToDocumentConverter = objectToDocumentConverter;
244 }
245
246 public Analyzer getAnalyzer()
247 {
248 return new StandardAnalyzer();
249 }
250
251 protected void setReindexingStatus(boolean status)
252 throws IOException
253 {
254 setReindexing(status);
255 }
256 }