View Javadoc

1   package com.atlassian.bonnie.index;
2   
3   import com.atlassian.bonnie.ILuceneConnection;
4   import org.apache.lucene.analysis.Analyzer;
5   import org.apache.lucene.document.Document;
6   import org.apache.lucene.index.IndexWriter;
7   import org.apache.lucene.store.Directory;
8   import org.apache.lucene.store.FSDirectory;
9   import org.slf4j.Logger;
10  import org.slf4j.LoggerFactory;
11  
12  import java.io.File;
13  import java.io.IOException;
14  import java.util.Collections;
15  import java.util.HashMap;
16  import java.util.Iterator;
17  import java.util.LinkedList;
18  import java.util.List;
19  import java.util.Map;
20  
21  /**
22   * Index writer that is able to write to one or more temporary indices. This writer is typically used to:
23   * <ul>
24   * <li>allow writing index changes to a single temporary location and merging back to the proper location when finished
25   * <li>allow parallel construction of a index into multiple temporary indices, and merging all temporary indices back to the proper location.
26   * </ul>
27   */
28  public class TempIndexWriter
29  {
30      private static final Logger log = LoggerFactory.getLogger(TempIndexWriter.class);
31  
32      private final Map writers = Collections.synchronizedMap(new HashMap());
33      private final Analyzer analyzerForIndexing;
34      private final String tmpDir;
35      private final ILuceneConnection.Configuration configuration;
36  
37      public TempIndexWriter(Analyzer analyzerForIndexing, String tmpDir)
38      {
39          this(analyzerForIndexing, tmpDir, ILuceneConnection.DEFAULT_CONFIGURATION);
40      }
41  
42      /**
43       * Constructs a temp index writer.
44       *
45       * @param analyzerForIndexing analyzer used by {@link org.apache.lucene.index.IndexWriter}s created and managed by this temp index writer. Required parameter.
46       * @param tmpDir directory where temporary directories will be created in. Required parameter.
47       * @param configuration allows configuration of the {@link org.apache.lucene.index.IndexWriter}s created and managed
48       * by this temp index writer. Only the batch settings will be used (as the assumption here is that
49       * {@link com.atlassian.bonnie.index.TempIndexWriter}s will be used for batch indexing purposes only).
50       * @throws IllegalArgumentException if analyzerForIndexing, tmpDir or configuration are null
51       */
52      public TempIndexWriter(Analyzer analyzerForIndexing, String tmpDir, ILuceneConnection.Configuration configuration)
53      {
54          if (analyzerForIndexing == null)
55              throw new IllegalArgumentException("analyzerForIndexing is required.");
56          if (tmpDir == null)
57              throw new IllegalArgumentException("tmpDir is required.");
58          if (configuration == null)
59              throw new IllegalArgumentException("configuration is required.");
60  
61          this.analyzerForIndexing = analyzerForIndexing;
62          this.tmpDir = tmpDir;
63          this.configuration = configuration;
64      }
65  
66      /**
67       * Performs a {@link ILuceneConnection.WriterAction}
68       * @param key key representing which index to write to. A new writer will be created if no writer exists for the key.
69       * @param writerAction the index write action to perform
70       */
71      public void perform(String key, ILuceneConnection.WriterAction writerAction) throws IOException
72      {
73          writerAction.perform(getWriterData(key).writer);
74      }
75  
76      /**
77       * Add a document to an index.
78       * @param key key representing which index to write to. A new writer will be created if no writer exists for the key.
79       * @param doc document to add
80       * @throws IOException
81       */
82      public void addDocument(String key, Document doc) throws IOException
83      {
84          getWriterData(key).writer.addDocument(doc);
85      }
86  
87      private WriterData getWriterData(String key) throws IOException
88      {
89          synchronized (key.intern())
90          {
91              WriterData writerData = (WriterData) writers.get(key);
92              if (writerData == null)
93              {
94                  writerData = createData();
95                  writers.put(key, writerData);
96              }
97              return writerData;
98          }
99      }
100 
101     /**
102      * Delete the respective temp indices.
103      * @param prefix
104      * @throws IOException
105      */
106     public void close(String prefix) throws IOException
107     {
108         for (Iterator it = writers.keySet().iterator(); it.hasNext();)
109         {
110             String key = it.next().toString();
111             if (key.startsWith(prefix))
112             {
113                 WriterData writerData = (WriterData) writers.get(key);
114                 if (writerData != null) {
115                     delete(writerData.tmpIndexDir);
116                 }
117             }
118         }
119     }
120 
121     /**
122      * Delete the respective temp indices.
123      * @throws IOException
124      */
125     public void closeAll() throws IOException
126     {
127         for (Iterator it = writers.keySet().iterator(); it.hasNext();)
128         {
129             WriterData writerData = (WriterData) writers.get(it.next());
130             delete(writerData.tmpIndexDir);
131         }
132         writers.clear();
133     }
134 
135     /**
136      * Close all temporary writers, and merge all temporary indices to the writer provided.
137      * When the merge is complete, the target index will be optimized. 
138      * @param writer
139      * @throws IOException
140      */
141     public void merge(IndexWriter writer) throws IOException
142     {
143         final List indexDirectories = new LinkedList();
144         for (Iterator it = writers.keySet().iterator(); it.hasNext();)
145         {
146             WriterData writerData = (WriterData) writers.get(it.next());
147             writerData.writer.close();
148             indexDirectories.add(writerData.dir);
149         }
150         writer.addIndexes((Directory[]) indexDirectories.toArray(new Directory[indexDirectories.size()]));
151     }
152 
153     /**
154      * Close relevant temporary writers, and merge temporary indices identified by keys which start with the provided prefix to the
155      * writer provided.
156      * @param prefix key prefix
157      * @param writer
158      * @throws IOException
159      */
160     public void merge(String prefix, IndexWriter writer) throws IOException
161     {
162         final List indexDirectories = new LinkedList();
163         for (Iterator it = writers.keySet().iterator(); it.hasNext();)
164         {
165             String key = it.next().toString();
166             if (key.startsWith(prefix))
167             {
168                 WriterData writerData = (WriterData) writers.get(key);
169                 writerData.writer.close();
170                 indexDirectories.add(writerData.dir);
171             }
172         }
173         System.out.println("Index directories:" + indexDirectories);
174         if (indexDirectories.size() > 0)
175             writer.addIndexes((Directory[]) indexDirectories.toArray(new Directory[indexDirectories.size()]));
176     }
177 
178     protected final boolean delete(File directory)
179     {
180         File[] files = directory.listFiles();
181         for (int i = 0; i < files.length; i++)
182         {
183             if (!files[i].delete())
184             {
185                 log.error("Failed to delete index file: " + files[i].getAbsolutePath());
186                 return false;
187             }
188         }
189         return directory.delete();
190     }
191 
192     /**
193      * Synchronized to deal with the situation that the two threads get assigned the same temp directory.
194      */
195     private synchronized WriterData createData() throws IOException
196     {
197         // create the temp index directory.
198         File tmpIndexDir = getTmpDir() == null ? File.createTempFile("lucene", "index") :
199                 File.createTempFile("lucene", "index", new File(getTmpDir()));
200         if (!tmpIndexDir.delete() || !tmpIndexDir.mkdirs())
201         {
202             throw new IOException("Unable to create temporary index directory: " + tmpIndexDir);
203         }
204         Directory dir = FSDirectory.getDirectory(tmpIndexDir);
205         //TODO: This probably doesn't need to be here. We should just construct the next IndexWriter with create=true
206         new IndexWriter(dir, null, true).close();
207 
208         IndexWriter writer = new IndexWriter(dir, analyzerForIndexing, false);
209         writer.setMergeFactor(configuration.getBatchMergeFactor());
210         writer.setMaxBufferedDocs(configuration.getBatchMaxBufferedDocs());
211         writer.setMaxMergeDocs(configuration.getBatchMaxMergeDocs());    // better for fast searches and batch indexing, but not if we need small segments
212         writer.setUseCompoundFile(configuration.isCompoundIndexFileFormat());
213         writer.setMaxFieldLength(configuration.getMaxFieldLength());
214 
215         return new WriterData(writer, dir, tmpIndexDir);
216     }
217 
218     public String getTmpDir()
219     {
220         return tmpDir;
221     }
222 
223     private class WriterData
224     {
225         final IndexWriter writer;
226         final Directory dir;
227         final File tmpIndexDir;
228 
229         protected WriterData(IndexWriter writer, Directory dir, File tmpIndexDir)
230         {
231             this.writer = writer;
232             this.dir = dir;
233             this.tmpIndexDir = tmpIndexDir;
234         }
235     }
236 }