Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lucene 4.4.0 new ControlledRealTimeReopenThread sample usage

Tags:

lucene

In the new 4.4.0 release of Lucene the Near Real Time Manager (org.apache.lucene.search.NRTManage) has been replaced by ControlledRealTimeReopenThread

Does anyone have some sample code for the new ControlledRealTimeReopenThread usage?

EDIT: I answer my own question below

like image 768
futuretelematics Avatar asked Dec 11 '22 12:12

futuretelematics


2 Answers

I've made some research about the question and built an utillity type... not throughfully tested (specially in concurrency conditions), anyway it works and i'm pretty sure it's thread safe.

@Slf4j
public class LuceneIndex {
    private final IndexWriter _indexWriter;
    private final TrackingIndexWriter _trackingIndexWriter;
    private final ReferenceManager<IndexSearcher> _indexSearcherReferenceManager;
    private final ControlledRealTimeReopenThread<IndexSearcher> _indexSearcherReopenThread;

    private long _reopenToken;      // index update/delete methods returned token

////////// CONSTRUCTOR & FINALIZE
    /**
     * Constructor based on an instance of the type responsible of the lucene index persistence
     */
    @Inject
    public LuceneIndex(final Directory luceneDirectory,
                       final Analyzer analyzer) {
        try {
            // [1]: Create the indexWriter
            _indexWriter = new IndexWriter(luceneDirectory,
                                           new IndexWriterConfig(LuceneConstants.VERSION,
                                                                 analyzer));

            // [2a]: Create the TrackingIndexWriter to track changes to the delegated previously created IndexWriter 
            _trackingIndexWriter = new TrackingIndexWriter(_indexWriter);

            // [2b]: Create an IndexSearcher ReferenceManager to safelly share IndexSearcher instances across
            //       multiple threads
            _indexSearcherReferenceManager = new SearcherManager(_indexWriter,
                                                                 true,
                                                                 null);

            // [3]: Create the ControlledRealTimeReopenThread that reopens the index periodically having into 
            //      account the changes made to the index and tracked by the TrackingIndexWriter instance
            //      The index is refreshed every 60sc when nobody is waiting 
            //      and every 100 millis whenever is someone waiting (see search method)
            //      (see http://lucene.apache.org/core/4_3_0/core/org/apache/lucene/search/NRTManagerReopenThread.html)
            _indexSearcherReopenThread = new ControlledRealTimeReopenThread<IndexSearcher>(_trackingIndexWriter,
                                                                                           _indexSearcherReferenceManager,
                                                                                           60.00,   // when there is nobody waiting
                                                                                           0.1);    // when there is someone waiting
            _indexSearcherReopenThread.start(); // start the refresher thread
        } catch (IOException ioEx) {
            throw new IllegalStateException("Lucene index could not be created: " + ioEx.getMessage());
        }
    }
    @Override
    protected void finalize() throws Throwable {
        this.close();
        super.finalize();
    }
    /**
     * Closes every index
     */
    public void close() {
        try {
            // stop the index reader re-open thread
            _indexSearcherReopenThread.interrupt();
            _indexSearcherReopenThread.close();

            // Close the indexWriter, commiting everithing that's pending
            _indexWriter.commit();
            _indexWriter.close();

        } catch(IOException ioEx) {
            log.error("Error while closing lucene index: {}",ioEx.getMessage(),
                                                             ioEx);
        }
    }
////////// INDEX
    /**
     * Index a Lucene document
     * @param doc the document to be indexed
     */
    public void index(final Document doc) { 
        try {
            _reopenToken = _trackingIndexWriter.addDocument(doc);
            log.debug("document indexed in lucene");
        } catch(IOException ioEx) {
            log.error("Error while in Lucene index operation: {}",ioEx.getMessage(),
                                                                  ioEx);
        } finally {
            try {
                _indexWriter.commit();
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Updates the index info for a lucene document
     * @param doc the document to be indexed
     */
    public void reIndex(final Term recordIdTerm,
                        final Document doc) {   
        try {
            _reopenToken = _trackingIndexWriter.updateDocument(recordIdTerm, 
                                                               doc);
            log.debug("{} document re-indexed in lucene",recordIdTerm.text());
        } catch(IOException ioEx) {
            log.error("Error in lucene re-indexing operation: {}",ioEx.getMessage(),
                                                                  ioEx);
        } finally {
            try {
                _indexWriter.commit();
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Unindex a lucene document
     * @param idTerm term used to locate the document to be unindexed
     *               IMPORTANT! the term must filter only the document and only the document
     *                          otherwise all matching docs will be unindexed
     */
    public void unIndex(final Term idTerm) {
        try {
            _reopenToken = _trackingIndexWriter.deleteDocuments(idTerm);
            log.debug("{}={} term matching records un-indexed from lucene",idTerm.field(),
                                                                           idTerm.text());
        } catch(IOException ioEx) {
            log.error("Error in un-index lucene operation: {}",ioEx.getMessage(),
                                                               ioEx);           
        } finally {
            try {
                _indexWriter.commit(); 
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Delete all lucene index docs
     */
    public void truncate() {
        try {
            _reopenToken = _trackingIndexWriter.deleteAll();
            log.warn("lucene index truncated");
        } catch(IOException ioEx) {
            log.error("Error truncating lucene index: {}",ioEx.getMessage(),
                                                          ioEx);            
        } finally {
            try {
                _indexWriter.commit(); 
            } catch (IOException ioEx) {
                log.error("Error truncating lucene index: {}",ioEx.getMessage(),
                                                              ioEx);
            }
        }
    }
/////// COUNT-SEARCH
    /**
     * Count the number of results returned by a search against the lucene index
     * @param qry the query
     * @return
     */
    public long count(final Query qry) {
        long outCount = 0;
        try {
            _indexSearcherReopenThread.waitForGeneration(_reopenToken);     // wait untill the index is re-opened
            IndexSearcher searcher = _indexSearcherReferenceManager.acquire();
            try {
                TopDocs docs = searcher.search(qry,0);
                if (docs != null) outCount = docs.totalHits;
                log.debug("count-search executed against lucene index returning {}",outCount);
            } finally {
                _indexSearcherReferenceManager.release(searcher);
            }
        } catch (IOException ioEx) {
            log.error("Error re-opening the index {}",ioEx.getMessage(),
                                                      ioEx);
        } catch (InterruptedException intEx) {
            log.error("The index writer periodically re-open thread has stopped",intEx.getMessage(),
                                                                                 intEx);
        }
        return outCount;
    }
    /**
     * Executes a search query
     * @param qry the query to be executed
     * @param sortFields the search query criteria
     * @param firstResultItemOrder the order number of the first element to be returned
     * @param numberOfResults number of results to be returnee
     * @return a page of search results
     */
    public LucenePageResults search(final Query qry,Set<SortField> sortFields,
                                    final int firstResultItemOrder,final int numberOfResults) {
        LucenePageResults outDocs = null;
        try {
            _indexSearcherReopenThread.waitForGeneration(_reopenToken); // wait until the index is re-opened for the last update
            IndexSearcher searcher = _indexSearcherReferenceManager.acquire();
            try {
                // sort crieteria
                SortField[] theSortFields = null;
                if (CollectionUtils.hasData(sortFields)) theSortFields = CollectionUtils.toArray(sortFields,SortField.class);
                Sort theSort = CollectionUtils.hasData(theSortFields) ? new Sort(theSortFields)
                                                                      : null;
                // number of results to be returned
                int theNumberOfResults = firstResultItemOrder + numberOfResults;

                // Exec the search (if the sort criteria is null, they're not used)
                TopDocs scoredDocs = theSort != null ? searcher.search(qry,
                                                                       theNumberOfResults,
                                                                       theSort)
                                                     : searcher.search(qry,
                                                                       theNumberOfResults);
                log.debug("query {} {} executed against lucene index: returned {} total items, {} in this page",qry.toString(),
                                                                                                                (theSort != null ? theSort.toString() : ""),
                                                                                                                scoredDocs != null ? scoredDocs.totalHits : 0,
                                                                                                                scoredDocs != null ? scoredDocs.scoreDocs.length : 0);
                outDocs = LucenePageResults.create(searcher,
                                                   scoredDocs,
                                                   firstResultItemOrder,numberOfResults);
            } finally {
                _indexSearcherReferenceManager.release(searcher);
            }
        } catch (IOException ioEx) {
            log.error("Error freeing the searcher {}",ioEx.getMessage(),
                                                      ioEx);
        } catch (InterruptedException intEx) {
            log.error("The index writer periodically re-open thread has stopped",intEx.getMessage(),
                                                                                 intEx);
        }
        return outDocs;
    }
/////// INDEX MAINTEINANCE
    /**
     * Mergest the lucene index segments into one
     * (this should NOT be used, only rarely for index mainteinance)
     */
    public void optimize() {
        try {
            _indexWriter.forceMerge(1);
            log.debug("Lucene index merged into one segment");
        } catch (IOException ioEx) {
            log.error("Error optimizing lucene index {}",ioEx.getMessage(),
                                                         ioEx);
        }
    }
}

EDIT 2: For thoose using the previous lucene 4.3 NearRealTime Manager type this is the analogous code

@Slf4j
public class LuceneIndexForLucene43 {
    private final IndexWriter _indexWriter;
    private final TrackingIndexWriter _trackingIndexWriter;
    private final NRTManager _searchManager;

    LuceneNRTReopenThread _reopenThread = null;
    private long _reopenToken;  // index update/delete methods returned token

///// CONSTRUCTOR
    /**
     * Constructor based on an instance of the type responsible of the lucene index persistence
     */
    @Inject
    public LuceneIndexForLucene43(final Directory luceneDirectory,
                       final Analyzer analyzer) {
        try {
            // Create the indexWriter
            _indexWriter = new IndexWriter(luceneDirectory,
                                           new IndexWriterConfig(LuceneConstants.VERSION,
                                                                 analyzer));
            _trackingIndexWriter = new NRTManager.TrackingIndexWriter(_indexWriter);
            // Create the SearchManager to exec the search
            _searchManager = new NRTManager(_trackingIndexWriter,
                                            new SearcherFactory(),
                                            true);

            // Open the thread in charge of re-open the index to allow it to see real-time changes
            //      The index is refreshed every 60sc when nobody is waiting 
            //      and every 100 millis whenever is someone waiting (see search method)
            // (see http://lucene.apache.org/core/4_3_0/core/org/apache/lucene/search/NRTManagerReopenThread.html)
            _reopenThread = new LuceneNRTReopenThread(_searchManager,
                                                      60.0,     // when there is nobody waiting
                                                      0.1);     // when there is someone waiting
            _reopenThread.startReopening();

        } catch (IOException ioEx) {
            throw new IllegalStateException("Lucene index could not be created: " + ioEx.getMessage());
        }
    }
    @Override
    protected void finalize() throws Throwable {
        this.close();
        super.finalize();
    }
    /**
     * Closes every index
     */
    public void close() {
        try {
            // stop the index reader re-open thread
            _reopenThread.stopReopening();
            _reopenThread.interrupt();

            // Close the search manager
            _searchManager.close();

            // Close the indexWriter, commiting everithing that's pending
            _indexWriter.commit();
            _indexWriter.close();

        } catch(IOException ioEx) {
            log.error("Error while closing lucene index: {}",ioEx.getMessage(),
                                                             ioEx);
        }
    }
//////// REOPEN-THREAD: Thread in charge of re-open the IndexReader to have access to the 
//                      latest IndexWriter changes
    private class LuceneNRTReopenThread
          extends NRTManagerReopenThread {

        volatile boolean _finished = false;

        public LuceneNRTReopenThread(final NRTManager manager,
                                     final double targetMaxStaleSec,final double targetMinStaleSec) {
            super(manager, targetMaxStaleSec, targetMinStaleSec);
            this.setName("NRT Reopen Thread");
            this.setPriority(Math.min(Thread.currentThread().getPriority()+2, 
                                      Thread.MAX_PRIORITY));
            this.setDaemon(true);
        }
        public synchronized  void startReopening() {
            _finished = false;
            this.start();
        }
        public synchronized void stopReopening() {
            _finished = true;
        }
        @Override
        public void run() {
            while (!_finished) {
                super.run();
            }
        }
    }
/////// INDEX
    /**
     * Index a Lucene document
     * @param doc the document to be indexed
     */
    public void index(final Document doc) { 
        try {
            _reopenToken = _trackingIndexWriter.addDocument(doc);
            log.debug("document indexed in lucene");
        } catch(IOException ioEx) {
            log.error("Error while in Lucene index operation: {}",ioEx.getMessage(),
                                                                  ioEx);
        } finally {
            try {
                _indexWriter.commit();
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Updates the index info for a lucene document
     * @param doc the document to be indexed
     */
    public void reIndex(final Term recordIdTerm,
                        final Document doc) {   
        try {
            _reopenToken = _trackingIndexWriter.updateDocument(recordIdTerm, 
                                                               doc);
            log.debug("{} document re-indexed in lucene",recordIdTerm.text());
        } catch(IOException ioEx) {
            log.error("Error in lucene re-indexing operation: {}",ioEx.getMessage(),
                                                                  ioEx);
        } finally {
            try {
                _indexWriter.commit();
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Unindex a lucene document
     * @param idTerm term used to locate the document to be unindexed
     *               IMPORTANT! the term must filter only the document and only the document
     *                          otherwise all matching docs will be unindexed
     */
    public void unIndex(final Term idTerm) {
        try {
            _reopenToken = _trackingIndexWriter.deleteDocuments(idTerm);
            log.debug("{}={} term matching records un-indexed from lucene",idTerm.field(),
                                                                           idTerm.text());
        } catch(IOException ioEx) {
            log.error("Error in un-index lucene operation: {}",ioEx.getMessage(),
                                                               ioEx);           
        } finally {
            try {
                _indexWriter.commit(); 
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Delete all lucene index docs
     */
    public void truncate() {
        try {
            _reopenToken = _trackingIndexWriter.deleteAll();
            log.warn("lucene index truncated");
        } catch(IOException ioEx) {
            log.error("Error truncating lucene index: {}",ioEx.getMessage(),
                                                          ioEx);            
        } finally {
            try {
                _indexWriter.commit(); 
            } catch (IOException ioEx) {
                log.error("Error truncating lucene index: {}",ioEx.getMessage(),
                                                              ioEx);
            }
        }
    }
////// COUNT-SEARCH
    /**
     * Count the number of results returned by a search against the lucene index
     * @param qry the query
     * @return
     */
    public long count(final Query qry) {
        long outCount = 0;
        try {
            _searchManager.waitForGeneration(_reopenToken);     // wait untill the index is re-opened
            IndexSearcher searcher = _searchManager.acquire();
            try {
                TopDocs docs = searcher.search(qry,0);
                if (docs != null) outCount = docs.totalHits;
                log.debug("count-search executed against lucene index returning {}",outCount);
            } finally {
                _searchManager.release(searcher);
            }
        } catch (IOException ioEx) {
            log.error("Error re-opening the index {}",ioEx.getMessage(),
                                                      ioEx);
        }
        return outCount;
    }
    /**
     * Executes a search query
     * @param qry the query to be executed
     * @param sortFields the search query criteria
     * @param firstResultItemOrder the order number of the first element to be returned
     * @param numberOfResults number of results to be returnee
     * @return a page of search results
     */
    public LucenePageResults search(final Query qry,Set<SortField> sortFields,
                                    final int firstResultItemOrder,final int numberOfResults) {
        LucenePageResults outDocs = null;
        try {
            _searchManager.waitForGeneration(_reopenToken); // wait until the index is re-opened for the last update
            IndexSearcher searcher = _searchManager.acquire();
            try {
                // sort crieteria
                SortField[] theSortFields = null;
                if (CollectionUtils.hasData(sortFields)) theSortFields = CollectionUtils.toArray(sortFields,SortField.class);
                Sort theSort = CollectionUtils.hasData(theSortFields) ? new Sort(theSortFields)
                                                                      : null;
                // number of results to be returned
                int theNumberOfResults = firstResultItemOrder + numberOfResults;

                // Exec the search (if the sort criteria is null, they're not used)
                TopDocs scoredDocs = theSort != null ? searcher.search(qry,
                                                                       theNumberOfResults,
                                                                       theSort)
                                                     : searcher.search(qry,
                                                                       theNumberOfResults);
                log.debug("query {} {} executed against lucene index: returned {} total items, {} in this page",qry.toString(),
                                                                                                                (theSort != null ? theSort.toString() : ""),
                                                                                                                scoredDocs != null ? scoredDocs.totalHits : 0,
                                                                                                                scoredDocs != null ? scoredDocs.scoreDocs.length : 0);
                outDocs = LucenePageResults.create(searcher,
                                                   scoredDocs,
                                                   firstResultItemOrder,numberOfResults);
            } finally {
                _searchManager.release(searcher);
            }
        } catch (IOException ioEx) {
            log.error("Error freeing the searcher {}",ioEx.getMessage(),
                                                      ioEx);
        }
        return outDocs;
    }
/////// INDEX MAINTEINANCE
    /**
     * Mergest the lucene index segments into one
     * (this should NOT be used, only rarely for index mainteinance)
     */
    public void optimize() {
        try {
            _indexWriter.forceMerge(1);
            log.debug("Lucene index merged into one segment");
        } catch (IOException ioEx) {
            log.error("Error optimizing lucene index {}",ioEx.getMessage(),
                                                         ioEx);
        }
    }
}
like image 170
futuretelematics Avatar answered Jun 08 '23 03:06

futuretelematics


You should not commit after every document, and you shouldn't need to Thread.interrupt the reopen thread (in fact, this is deadly when you are using NIOFSDirectory). Instead, just call its (ControlledRealTimeReopenThread) close method: under the hood, it notifies itself and should finish quickly.

like image 33
Michael McCandless Avatar answered Jun 08 '23 03:06

Michael McCandless