View Javadoc

1   package com.atlassian.bonnie.index;
2   
3   import com.atlassian.bonnie.LuceneConnection;
4   import com.atlassian.bonnie.LuceneException;
5   import com.atlassian.core.util.ProgressWrapper;
6   import org.apache.lucene.analysis.Analyzer;
7   import org.apache.lucene.analysis.standard.StandardAnalyzer;
8   import org.apache.lucene.index.IndexReader;
9   import org.apache.lucene.index.IndexWriter;
10  import org.apache.lucene.index.Term;
11  
12  import java.io.IOException;
13  
14  import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
15  import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
16  
17  /**
18   * Performs batch indexing in multiple threads.
19   * <p>This class is designed for:
20   * <ul>
21   * <li>an online environment where changes are being made to the index as batch reindexing occurs</li>
22   * <li>indexing large files</li>
23   * </ul>
24   * Each thread writes to its own temporary index and when all threads finish, these are merged into the final index.
25   * This has the desirable effect where a few large files do not bottleneck the entire operation.
26   * </p>
27   */
28  public class OnlineMultiThreadedIndexer extends BaseMultiThreadedIndexer implements SingleObjectIndexer
29  {
30      protected BlockingQueue reindexAddedQueue = new LinkedBlockingQueue();
31      protected BlockingQueue reindexDeletedQueue = new LinkedBlockingQueue();
32      private ObjectToDocumentConverter objectToDocumentConverter;
33  
34      protected void allThreadsComplete(final DocumentWritingScheme scheme, final boolean truncate, final ProgressWrapper progress)
35      {
36          luceneConnection.withBatchUpdate(new LuceneConnection.BatchUpdateAction()
37          {
38              public void perform() throws Exception
39              {
40                  setReindexingStatus(true);
41  
42                  if (truncate)
43                  {
44                      progress.setStatus("Truncating index");
45                      truncateIndex();
46                  }
47                  progress.setStatus("Closing index");
48                  scheme.close(luceneConnection);
49                  progress.setStatus("Flushing queues");
50                  if (flushReindexingQueues())
51                  {
52                      progress.setStatus("Optimizing index");
53  
54                      optimize(luceneConnection);
55                  }
56  
57                  setReindexingStatus(false);
58              }
59          });
60      }
61  
62      public void index(final Object o)
63      {
64          luceneConnection.withBatchUpdate(new LuceneConnection.BatchUpdateAction()
65          {
66              public void perform()
67              {
68                  if (isReindexing())
69                  {
70                      try
71                      {
72                          if (log.isDebugEnabled())
73                              log.debug("Adding object:" + o + " to reindexaddedqueue");
74                          reindexAddedQueue.put(o);
75                      }
76                      catch (InterruptedException e)
77                      {
78                          log.error("Error encountered adding object to reindexaddedqueue", e);
79                      }
80                  }
81                  unindex(o);
82                  DocumentWritingScheme scheme = getDocumentWritingScheme(false);
83                  Runnable r = new QueueProcessingRunnableImpl(new SingletonObjectQueue(o, objectToDocumentConverter), scheme);
84                  r.run();
85              }
86          });
87      }
88  
89      /**
90       * Performs the actual adding of the object to the index.
91       * @param o
92       * @param writer
93       * @throws IOException
94       */
95      protected void doAdd(final Object o, IndexWriter writer) throws IOException
96      {
97          writer.addDocument(objectToDocumentConverter.convert(o, null));
98      }
99  
100     public void unindex(final Object o)
101     {
102         luceneConnection.withBatchUpdate(new LuceneConnection.BatchUpdateAction()
103         {
104             public void perform()
105             {
106                 if (isReindexing())
107                 {
108                     try
109                     {
110                         if (log.isDebugEnabled())
111                             log.debug("Adding object:" + o + " to reindexdeletedqueue");
112                         reindexDeletedQueue.put(o);
113                     }
114                     catch (InterruptedException e)
115                     {
116                         log.error("Error encountered adding object to reindexdeletedqueue", e);
117                     }
118                 }
119                 luceneConnection.withReaderAndDeletes(new LuceneConnection.ReaderAction()
120                 {
121                     public Object perform(IndexReader reader) throws IOException
122                     {
123                         doDelete(o, reader);
124                         return null;
125                     }
126                 });
127             }
128         });
129     }
130 
131     /**
132      * Perform the actual deleting.
133      * @param o object to delete
134      * @param reader reader
135      * @throws IOException
136      */
137     protected void doDelete(Object o, IndexReader reader) throws IOException
138     {
139         final String[] identity = objectToDocumentConverter.getObjectIdentity(o);
140         reader.deleteDocuments(new Term(identity[0], identity[1]));
141     }
142 
143     /**
144      * Optimize the index.
145      * @param conn
146      */
147     protected final void optimize(LuceneConnection conn)
148     {
149         conn.withWriter(new LuceneConnection.WriterAction()
150         {
151             public void perform(IndexWriter writer) throws IOException
152             {
153                 writer.optimize();
154             }
155         });
156     }
157 
158     /**
159      * Synchronize the index with whatever happened since it started reindexing.
160      * @return true if the index was updated, false otherwise
161      */
162     private boolean flushReindexingQueues()
163     {
164         boolean somethingflushed = false;
165         // flush reindexing queues (delete before adding!)
166         if (!reindexDeletedQueue.isEmpty())
167         {
168             luceneConnection.withReaderAndDeletes(new LuceneConnection.ReaderAction()
169             {
170                 public Object perform(IndexReader reader) throws IOException
171                 {
172                     try
173                     {
174                         for (Object o = reindexDeletedQueue.take(); !reindexDeletedQueue.isEmpty();)
175                         {
176                             doDelete(o, reader);
177                             o = reindexDeletedQueue.take();
178                         }
179                     }
180                     catch (InterruptedException e)
181                     {
182                         throw new LuceneException(e);
183                     }
184                     return null;
185                 }
186             });
187             somethingflushed = true;
188         }
189         if (!reindexAddedQueue.isEmpty())
190         {
191             luceneConnection.withWriter(new LuceneConnection.WriterAction()
192             {
193                 public void perform(IndexWriter writer) throws IOException
194                 {
195                     try
196                     {
197                         for (Object o = reindexAddedQueue.take(); !reindexAddedQueue.isEmpty();)
198                         {
199                             doAdd(o, writer);
200                             o = reindexAddedQueue.take();
201                         }
202                     }
203                     catch (InterruptedException e)
204                     {
205                         throw new LuceneException(e);
206                     }
207                 }
208             });
209             somethingflushed = true;
210         }
211         return somethingflushed;
212     }
213 
214 
215     /**
216      * Creates a new instance of a DocumentWritingScheme.
217      *
218      * @param reindex  is this a reindex
219      */
220     protected DocumentWritingScheme getDocumentWritingScheme(boolean reindex)
221     {
222         if (reindex)
223             return new TempDirectoryDocumentWritingScheme(this);
224         else return new SingleDocumentWritingScheme(luceneConnection);
225     }
226 
227 
228     public void setLuceneConnection(LuceneConnection luceneConnection)
229     {
230         this.luceneConnection = luceneConnection;
231     }
232 
233 
234     protected ObjectToDocumentConverter getObjectToDocumentConverter()
235     {
236         return objectToDocumentConverter;
237     }
238 
239     public void setObjectToDocumentConverter(ObjectToDocumentConverter objectToDocumentConverter)
240     {
241         this.objectToDocumentConverter = objectToDocumentConverter;
242     }
243 
244     public Analyzer getAnalyzer()
245     {
246         return new StandardAnalyzer();
247     }
248 
249     protected void setReindexingStatus(boolean status)
250             throws IOException
251     {
252         luceneConnection.setReIndexing(status);
253         setReindexing(status);
254     }
255 }