1   package com.atlassian.bonnie.index;
2   
3   import java.io.IOException;
4   import java.util.concurrent.BlockingQueue;
5   import java.util.concurrent.LinkedBlockingQueue;
6   
7   import org.apache.lucene.analysis.Analyzer;
8   import org.apache.lucene.analysis.standard.StandardAnalyzer;
9   import org.apache.lucene.index.IndexReader;
10  import org.apache.lucene.index.IndexWriter;
11  import org.apache.lucene.index.Term;
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  
15  import com.atlassian.bonnie.ILuceneConnection;
16  import com.atlassian.bonnie.LuceneException;
17  import com.atlassian.core.util.ProgressWrapper;
18  
19  /**
20   * Performs batch indexing in multiple threads.
21   * <p>This class is designed for:
22   * <ul>
23   * <li>an online environment where changes are being made to the index as batch reindexing occurs</li>
24   * <li>indexing large files</li>
25   * </ul>
26   * Each thread writes to its own temporary index and when all threads finish, these are merged into the final index.
27   * This has the desirable effect where a few large files do not bottleneck the entire operation.
28   * </p>
29   */
30  public class OnlineMultiThreadedIndexer extends BaseMultiThreadedIndexer implements SingleObjectIndexer
31  {
32      private static final Logger log = LoggerFactory.getLogger(OnlineMultiThreadedIndexer.class);
33      protected BlockingQueue<? super Object> reindexAddedQueue = new LinkedBlockingQueue<Object>();
34      protected BlockingQueue<? super Object> reindexDeletedQueue = new LinkedBlockingQueue<Object>();
35      private ObjectToDocumentConverter objectToDocumentConverter;
36  
37      protected void allThreadsComplete(final DocumentWritingScheme scheme, final boolean truncate, final ProgressWrapper progress)
38      {
39          luceneConnection.withBatchUpdate(new ILuceneConnection.BatchUpdateAction()
40          {
41              public void perform() throws Exception
42              {
43                  setReindexingStatus(true);
44  
45                  if (truncate)
46                  {
47                      progress.setStatus("Truncating index");
48                      truncateIndex();
49                  }
50                  progress.setStatus("Closing index");
51                  scheme.close(luceneConnection);
52                  progress.setStatus("Flushing queues");
53                  if (flushReindexingQueues())
54                  {
55                      progress.setStatus("Optimizing index");
56  
57                      optimize(luceneConnection);
58                  }
59  
60                  setReindexingStatus(false);
61              }
62          });
63      }
64  
65      public void index(final Object o)
66      {
67          luceneConnection.withBatchUpdate(new ILuceneConnection.BatchUpdateAction()
68          {
69              public void perform()
70              {
71                  if (isReindexing())
72                  {
73                      try
74                      {
75                          if (log.isDebugEnabled())
76                              log.debug("Adding object:" + o + " to reindexaddedqueue");
77                          reindexAddedQueue.put(o);
78                      }
79                      catch (InterruptedException e)
80                      {
81                          log.error("Error encountered adding object to reindexaddedqueue", e);
82                      }
83                  }
84                  unindex(o);
85                  DocumentWritingScheme scheme = getDocumentWritingScheme(false);
86                  Runnable r = new QueueProcessingRunnableImpl(new SingletonObjectQueue(o, objectToDocumentConverter), scheme);
87                  r.run();
88              }
89          });
90      }
91  
92      /**
93       * Performs the actual adding of the object to the index.
94       * @param o
95       * @param writer
96       * @throws IOException
97       */
98      protected void doAdd(final Object o, IndexWriter writer) throws IOException
99      {
100         writer.addDocument(objectToDocumentConverter.convert(o, null));
101     }
102 
103     public void unindex(final Object o)
104     {
105         luceneConnection.withBatchUpdate(new ILuceneConnection.BatchUpdateAction()
106         {
107             public void perform()
108             {
109                 if (isReindexing())
110                 {
111                     try
112                     {
113                         if (log.isDebugEnabled())
114                             log.debug("Adding object:" + o + " to reindexdeletedqueue");
115                         reindexDeletedQueue.put(o);
116                     }
117                     catch (InterruptedException e)
118                     {
119                         log.error("Error encountered adding object to reindexdeletedqueue", e);
120                     }
121                 }
122                 luceneConnection.withWriter(new ILuceneConnection.WriterAction()
123                 {
124                     public void perform(IndexWriter writer) throws IOException
125                     {
126                         doDelete(o, writer);
127                     }
128                 });
129             }
130         });
131     }
132 
133     /**
134      * Perform the actual deleting.
135      *
136      * @param o object to delete
137      * @param writer
138      * @throws IOException
139      */
140     protected void doDelete(Object o, IndexWriter writer) throws IOException
141     {
142         final String[] identity = objectToDocumentConverter.getObjectIdentity(o);
143         writer.deleteDocuments(new Term(identity[0], identity[1]));
144     }
145 
146     /**
147      * Optimize the index.
148      * @param conn
149      */
150     protected final void optimize(ILuceneConnection conn)
151     {
152         conn.withWriter(new ILuceneConnection.WriterAction()
153         {
154             public void perform(IndexWriter writer) throws IOException
155             {
156                 writer.optimize();
157             }
158         });
159     }
160 
161     /**
162      * Synchronize the index with whatever happened since it started reindexing.
163      * @return true if the index was updated, false otherwise
164      */
165     private boolean flushReindexingQueues()
166     {
167         boolean somethingflushed = false;
168         // flush reindexing queues (delete before adding!)
169         if (!reindexDeletedQueue.isEmpty())
170         {
171             luceneConnection.withWriter(new ILuceneConnection.WriterAction()
172             {
173                 public void perform(IndexWriter writer) throws IOException
174                 {
175                     try
176                     {
177                         for (Object o = reindexDeletedQueue.take(); !reindexDeletedQueue.isEmpty(); )
178                         {
179                             doDelete(o, writer);
180                             o = reindexDeletedQueue.take();
181                         }
182                     }
183                     catch (InterruptedException e)
184                     {
185                         throw new LuceneException(e);
186                     }
187                 }
188             });
189             somethingflushed = true;
190         }
191         if (!reindexAddedQueue.isEmpty())
192         {
193             luceneConnection.withWriter(new ILuceneConnection.WriterAction()
194             {
195                 public void perform(IndexWriter writer) throws IOException
196                 {
197                     try
198                     {
199                         for (Object o = reindexAddedQueue.take(); !reindexAddedQueue.isEmpty();)
200                         {
201                             doAdd(o, writer);
202                             o = reindexAddedQueue.take();
203                         }
204                     }
205                     catch (InterruptedException e)
206                     {
207                         throw new LuceneException(e);
208                     }
209                 }
210             });
211             somethingflushed = true;
212         }
213         return somethingflushed;
214     }
215 
216 
217     /**
218      * Creates a new instance of a DocumentWritingScheme.
219      *
220      * @param reindex  is this a reindex
221      */
222     protected DocumentWritingScheme getDocumentWritingScheme(boolean reindex)
223     {
224         if (reindex)
225             return new TempDirectoryDocumentWritingScheme(this);
226         else return new SingleDocumentWritingScheme(luceneConnection);
227     }
228 
229 
230     public void setLuceneConnection(ILuceneConnection luceneConnection)
231     {
232         this.luceneConnection = luceneConnection;
233     }
234 
235 
236     protected ObjectToDocumentConverter getObjectToDocumentConverter()
237     {
238         return objectToDocumentConverter;
239     }
240 
241     public void setObjectToDocumentConverter(ObjectToDocumentConverter objectToDocumentConverter)
242     {
243         this.objectToDocumentConverter = objectToDocumentConverter;
244     }
245 
246     public Analyzer getAnalyzer()
247     {
248         return new StandardAnalyzer();
249     }
250 
251     protected void setReindexingStatus(boolean status)
252             throws IOException
253     {
254         setReindexing(status);
255     }
256 }