1 package com.atlassian.bonnie.index;
2
3 import com.atlassian.bonnie.LuceneConnection;
4 import com.atlassian.bonnie.LuceneException;
5 import com.atlassian.core.util.ProgressWrapper;
6 import org.apache.lucene.analysis.Analyzer;
7 import org.apache.lucene.analysis.standard.StandardAnalyzer;
8 import org.apache.lucene.index.IndexReader;
9 import org.apache.lucene.index.IndexWriter;
10 import org.apache.lucene.index.Term;
11
12 import java.io.IOException;
13
14 import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
15 import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
16
17
18
19
20
21
22
23
24
25
26
27
28 public class OnlineMultiThreadedIndexer extends BaseMultiThreadedIndexer implements SingleObjectIndexer
29 {
30 protected BlockingQueue reindexAddedQueue = new LinkedBlockingQueue();
31 protected BlockingQueue reindexDeletedQueue = new LinkedBlockingQueue();
32 private ObjectToDocumentConverter objectToDocumentConverter;
33
34 protected void allThreadsComplete(final DocumentWritingScheme scheme, final boolean truncate, final ProgressWrapper progress)
35 {
36 luceneConnection.withBatchUpdate(new LuceneConnection.BatchUpdateAction()
37 {
38 public void perform() throws Exception
39 {
40 setReindexingStatus(true);
41
42 if (truncate)
43 {
44 progress.setStatus("Truncating index");
45 truncateIndex();
46 }
47 progress.setStatus("Closing index");
48 scheme.close(luceneConnection);
49 progress.setStatus("Flushing queues");
50 if (flushReindexingQueues())
51 {
52 progress.setStatus("Optimizing index");
53
54 optimize(luceneConnection);
55 }
56
57 setReindexingStatus(false);
58 }
59 });
60 }
61
62 public void index(final Object o)
63 {
64 luceneConnection.withBatchUpdate(new LuceneConnection.BatchUpdateAction()
65 {
66 public void perform()
67 {
68 if (isReindexing())
69 {
70 try
71 {
72 if (log.isDebugEnabled())
73 log.debug("Adding object:" + o + " to reindexaddedqueue");
74 reindexAddedQueue.put(o);
75 }
76 catch (InterruptedException e)
77 {
78 log.error("Error encountered adding object to reindexaddedqueue", e);
79 }
80 }
81 unindex(o);
82 DocumentWritingScheme scheme = getDocumentWritingScheme(false);
83 Runnable r = new QueueProcessingRunnableImpl(new SingletonObjectQueue(o, objectToDocumentConverter), scheme);
84 r.run();
85 }
86 });
87 }
88
89
90
91
92
93
94
95 protected void doAdd(final Object o, IndexWriter writer) throws IOException
96 {
97 writer.addDocument(objectToDocumentConverter.convert(o, null));
98 }
99
100 public void unindex(final Object o)
101 {
102 luceneConnection.withBatchUpdate(new LuceneConnection.BatchUpdateAction()
103 {
104 public void perform()
105 {
106 if (isReindexing())
107 {
108 try
109 {
110 if (log.isDebugEnabled())
111 log.debug("Adding object:" + o + " to reindexdeletedqueue");
112 reindexDeletedQueue.put(o);
113 }
114 catch (InterruptedException e)
115 {
116 log.error("Error encountered adding object to reindexdeletedqueue", e);
117 }
118 }
119 luceneConnection.withReaderAndDeletes(new LuceneConnection.ReaderAction()
120 {
121 public Object perform(IndexReader reader) throws IOException
122 {
123 doDelete(o, reader);
124 return null;
125 }
126 });
127 }
128 });
129 }
130
131
132
133
134
135
136
137 protected void doDelete(Object o, IndexReader reader) throws IOException
138 {
139 final String[] identity = objectToDocumentConverter.getObjectIdentity(o);
140 reader.deleteDocuments(new Term(identity[0], identity[1]));
141 }
142
143
144
145
146
147 protected final void optimize(LuceneConnection conn)
148 {
149 conn.withWriter(new LuceneConnection.WriterAction()
150 {
151 public void perform(IndexWriter writer) throws IOException
152 {
153 writer.optimize();
154 }
155 });
156 }
157
158
159
160
161
162 private boolean flushReindexingQueues()
163 {
164 boolean somethingflushed = false;
165
166 if (!reindexDeletedQueue.isEmpty())
167 {
168 luceneConnection.withReaderAndDeletes(new LuceneConnection.ReaderAction()
169 {
170 public Object perform(IndexReader reader) throws IOException
171 {
172 try
173 {
174 for (Object o = reindexDeletedQueue.take(); !reindexDeletedQueue.isEmpty();)
175 {
176 doDelete(o, reader);
177 o = reindexDeletedQueue.take();
178 }
179 }
180 catch (InterruptedException e)
181 {
182 throw new LuceneException(e);
183 }
184 return null;
185 }
186 });
187 somethingflushed = true;
188 }
189 if (!reindexAddedQueue.isEmpty())
190 {
191 luceneConnection.withWriter(new LuceneConnection.WriterAction()
192 {
193 public void perform(IndexWriter writer) throws IOException
194 {
195 try
196 {
197 for (Object o = reindexAddedQueue.take(); !reindexAddedQueue.isEmpty();)
198 {
199 doAdd(o, writer);
200 o = reindexAddedQueue.take();
201 }
202 }
203 catch (InterruptedException e)
204 {
205 throw new LuceneException(e);
206 }
207 }
208 });
209 somethingflushed = true;
210 }
211 return somethingflushed;
212 }
213
214
215
216
217
218
219
220 protected DocumentWritingScheme getDocumentWritingScheme(boolean reindex)
221 {
222 if (reindex)
223 return new TempDirectoryDocumentWritingScheme(this);
224 else return new SingleDocumentWritingScheme(luceneConnection);
225 }
226
227
228 public void setLuceneConnection(LuceneConnection luceneConnection)
229 {
230 this.luceneConnection = luceneConnection;
231 }
232
233
234 protected ObjectToDocumentConverter getObjectToDocumentConverter()
235 {
236 return objectToDocumentConverter;
237 }
238
239 public void setObjectToDocumentConverter(ObjectToDocumentConverter objectToDocumentConverter)
240 {
241 this.objectToDocumentConverter = objectToDocumentConverter;
242 }
243
244 public Analyzer getAnalyzer()
245 {
246 return new StandardAnalyzer();
247 }
248
249 protected void setReindexingStatus(boolean status)
250 throws IOException
251 {
252 luceneConnection.setReIndexing(status);
253 setReindexing(status);
254 }
255 }