View Javadoc

1   package com.atlassian.bonnie.index;
2   
3   import java.io.IOException;
4   import java.util.concurrent.BlockingQueue;
5   import java.util.concurrent.LinkedBlockingQueue;
6   
7   import org.apache.lucene.analysis.Analyzer;
8   import org.apache.lucene.analysis.standard.StandardAnalyzer;
9   import org.apache.lucene.index.IndexReader;
10  import org.apache.lucene.index.IndexWriter;
11  import org.apache.lucene.index.Term;
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  
15  import com.atlassian.bonnie.ILuceneConnection;
16  import com.atlassian.bonnie.LuceneException;
17  import com.atlassian.core.util.ProgressWrapper;
18  
19  /**
20   * Performs batch indexing in multiple threads.
21   * <p>This class is designed for:
22   * <ul>
23   * <li>an online environment where changes are being made to the index as batch reindexing occurs</li>
24   * <li>indexing large files</li>
25   * </ul>
26   * Each thread writes to its own temporary index and when all threads finish, these are merged into the final index.
27   * This has the desirable effect where a few large files do not bottleneck the entire operation.
28   * </p>
29   */
30  public class OnlineMultiThreadedIndexer extends BaseMultiThreadedIndexer implements SingleObjectIndexer
31  {
32      private static final Logger log = LoggerFactory.getLogger(OnlineMultiThreadedIndexer.class);
33      protected BlockingQueue<? super Object> reindexAddedQueue = new LinkedBlockingQueue<Object>();
34      protected BlockingQueue<? super Object> reindexDeletedQueue = new LinkedBlockingQueue<Object>();
35      private ObjectToDocumentConverter objectToDocumentConverter;
36  
37      protected void allThreadsComplete(final DocumentWritingScheme scheme, final boolean truncate, final ProgressWrapper progress)
38      {
39          luceneConnection.withBatchUpdate(new ILuceneConnection.BatchUpdateAction()
40          {
41              public void perform() throws Exception
42              {
43                  setReindexingStatus(true);
44  
45                  if (truncate)
46                  {
47                      progress.setStatus("Truncating index");
48                      truncateIndex();
49                  }
50                  progress.setStatus("Closing index");
51                  scheme.close(luceneConnection);
52                  progress.setStatus("Flushing queues");
53                  if (flushReindexingQueues())
54                  {
55                      progress.setStatus("Optimizing index");
56  
57                      optimize(luceneConnection);
58                  }
59  
60                  setReindexingStatus(false);
61              }
62          });
63      }
64  
65      public void index(final Object o)
66      {
67          luceneConnection.withBatchUpdate(new ILuceneConnection.BatchUpdateAction()
68          {
69              public void perform()
70              {
71                  if (isReindexing())
72                  {
73                      try
74                      {
75                          if (log.isDebugEnabled())
76                              log.debug("Adding object:" + o + " to reindexaddedqueue");
77                          reindexAddedQueue.put(o);
78                      }
79                      catch (InterruptedException e)
80                      {
81                          log.error("Error encountered adding object to reindexaddedqueue", e);
82                      }
83                  }
84                  unindex(o);
85                  DocumentWritingScheme scheme = getDocumentWritingScheme(false);
86                  Runnable r = new QueueProcessingRunnableImpl(new SingletonObjectQueue(o, objectToDocumentConverter), scheme);
87                  r.run();
88              }
89          });
90      }
91  
92      /**
93       * Performs the actual adding of the object to the index.
94       * @param o
95       * @param writer
96       * @throws IOException
97       */
98      protected void doAdd(final Object o, IndexWriter writer) throws IOException
99      {
100         writer.addDocument(objectToDocumentConverter.convert(o, null));
101     }
102 
103     public void unindex(final Object o)
104     {
105         luceneConnection.withBatchUpdate(new ILuceneConnection.BatchUpdateAction()
106         {
107             public void perform()
108             {
109                 if (isReindexing())
110                 {
111                     try
112                     {
113                         if (log.isDebugEnabled())
114                             log.debug("Adding object:" + o + " to reindexdeletedqueue");
115                         reindexDeletedQueue.put(o);
116                     }
117                     catch (InterruptedException e)
118                     {
119                         log.error("Error encountered adding object to reindexdeletedqueue", e);
120                     }
121                 }
122                 luceneConnection.withReaderAndDeletes(new ILuceneConnection.ReaderAction()
123                 {
124                     public Object perform(IndexReader reader) throws IOException
125                     {
126                         doDelete(o, reader);
127                         return null;
128                     }
129                 });
130             }
131         });
132     }
133 
134     /**
135      * Perform the actual deleting.
136      * @param o object to delete
137      * @param reader reader
138      * @throws IOException
139      */
140     protected void doDelete(Object o, IndexReader reader) throws IOException
141     {
142         final String[] identity = objectToDocumentConverter.getObjectIdentity(o);
143         reader.deleteDocuments(new Term(identity[0], identity[1]));
144     }
145 
146     /**
147      * Optimize the index.
148      * @param conn
149      */
150     protected final void optimize(ILuceneConnection conn)
151     {
152         conn.withWriter(new ILuceneConnection.WriterAction()
153         {
154             public void perform(IndexWriter writer) throws IOException
155             {
156                 writer.optimize();
157             }
158         });
159     }
160 
161     /**
162      * Synchronize the index with whatever happened since it started reindexing.
163      * @return true if the index was updated, false otherwise
164      */
165     private boolean flushReindexingQueues()
166     {
167         boolean somethingflushed = false;
168         // flush reindexing queues (delete before adding!)
169         if (!reindexDeletedQueue.isEmpty())
170         {
171             luceneConnection.withReaderAndDeletes(new ILuceneConnection.ReaderAction()
172             {
173                 public Object perform(IndexReader reader) throws IOException
174                 {
175                     try
176                     {
177                         for (Object o = reindexDeletedQueue.take(); !reindexDeletedQueue.isEmpty();)
178                         {
179                             doDelete(o, reader);
180                             o = reindexDeletedQueue.take();
181                         }
182                     }
183                     catch (InterruptedException e)
184                     {
185                         throw new LuceneException(e);
186                     }
187                     return null;
188                 }
189             });
190             somethingflushed = true;
191         }
192         if (!reindexAddedQueue.isEmpty())
193         {
194             luceneConnection.withWriter(new ILuceneConnection.WriterAction()
195             {
196                 public void perform(IndexWriter writer) throws IOException
197                 {
198                     try
199                     {
200                         for (Object o = reindexAddedQueue.take(); !reindexAddedQueue.isEmpty();)
201                         {
202                             doAdd(o, writer);
203                             o = reindexAddedQueue.take();
204                         }
205                     }
206                     catch (InterruptedException e)
207                     {
208                         throw new LuceneException(e);
209                     }
210                 }
211             });
212             somethingflushed = true;
213         }
214         return somethingflushed;
215     }
216 
217 
218     /**
219      * Creates a new instance of a DocumentWritingScheme.
220      *
221      * @param reindex  is this a reindex
222      */
223     protected DocumentWritingScheme getDocumentWritingScheme(boolean reindex)
224     {
225         if (reindex)
226             return new TempDirectoryDocumentWritingScheme(this);
227         else return new SingleDocumentWritingScheme(luceneConnection);
228     }
229 
230 
231     public void setLuceneConnection(ILuceneConnection luceneConnection)
232     {
233         this.luceneConnection = luceneConnection;
234     }
235 
236 
237     protected ObjectToDocumentConverter getObjectToDocumentConverter()
238     {
239         return objectToDocumentConverter;
240     }
241 
242     public void setObjectToDocumentConverter(ObjectToDocumentConverter objectToDocumentConverter)
243     {
244         this.objectToDocumentConverter = objectToDocumentConverter;
245     }
246 
247     public Analyzer getAnalyzer()
248     {
249         return new StandardAnalyzer();
250     }
251 
252     protected void setReindexingStatus(boolean status)
253             throws IOException
254     {
255         setReindexing(status);
256     }
257 }