In the new 4.4.0 release of Lucene the Near Real Time Manager (org.apache.lucene.search.NRTManage) has been replaced by ControlledRealTimeReopenThread
Does anyone have some sample code for the new ControlledRealTimeReopenThread usage?
EDIT: I answer my own question below
I've made some research about the question and built an utillity type... not throughfully tested (specially in concurrency conditions), anyway it works and i'm pretty sure it's thread safe.
@Slf4j
public class LuceneIndex {
private final IndexWriter _indexWriter;
private final TrackingIndexWriter _trackingIndexWriter;
private final ReferenceManager<IndexSearcher> _indexSearcherReferenceManager;
private final ControlledRealTimeReopenThread<IndexSearcher> _indexSearcherReopenThread;
private long _reopenToken; // index update/delete methods returned token
////////// CONSTRUCTOR & FINALIZE
/**
* Constructor based on an instance of the type responsible of the lucene index persistence
*/
@Inject
public LuceneIndex(final Directory luceneDirectory,
final Analyzer analyzer) {
try {
// [1]: Create the indexWriter
_indexWriter = new IndexWriter(luceneDirectory,
new IndexWriterConfig(LuceneConstants.VERSION,
analyzer));
// [2a]: Create the TrackingIndexWriter to track changes to the delegated previously created IndexWriter
_trackingIndexWriter = new TrackingIndexWriter(_indexWriter);
// [2b]: Create an IndexSearcher ReferenceManager to safelly share IndexSearcher instances across
// multiple threads
_indexSearcherReferenceManager = new SearcherManager(_indexWriter,
true,
null);
// [3]: Create the ControlledRealTimeReopenThread that reopens the index periodically having into
// account the changes made to the index and tracked by the TrackingIndexWriter instance
// The index is refreshed every 60sc when nobody is waiting
// and every 100 millis whenever is someone waiting (see search method)
// (see http://lucene.apache.org/core/4_3_0/core/org/apache/lucene/search/NRTManagerReopenThread.html)
_indexSearcherReopenThread = new ControlledRealTimeReopenThread<IndexSearcher>(_trackingIndexWriter,
_indexSearcherReferenceManager,
60.00, // when there is nobody waiting
0.1); // when there is someone waiting
_indexSearcherReopenThread.start(); // start the refresher thread
} catch (IOException ioEx) {
throw new IllegalStateException("Lucene index could not be created: " + ioEx.getMessage());
}
}
@Override
protected void finalize() throws Throwable {
this.close();
super.finalize();
}
/**
* Closes every index
*/
public void close() {
try {
// stop the index reader re-open thread
_indexSearcherReopenThread.interrupt();
_indexSearcherReopenThread.close();
// Close the indexWriter, commiting everithing that's pending
_indexWriter.commit();
_indexWriter.close();
} catch(IOException ioEx) {
log.error("Error while closing lucene index: {}",ioEx.getMessage(),
ioEx);
}
}
////////// INDEX
/**
* Index a Lucene document
* @param doc the document to be indexed
*/
public void index(final Document doc) {
try {
_reopenToken = _trackingIndexWriter.addDocument(doc);
log.debug("document indexed in lucene");
} catch(IOException ioEx) {
log.error("Error while in Lucene index operation: {}",ioEx.getMessage(),
ioEx);
} finally {
try {
_indexWriter.commit();
} catch (IOException ioEx) {
log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
ioEx);
}
}
}
/**
* Updates the index info for a lucene document
* @param doc the document to be indexed
*/
public void reIndex(final Term recordIdTerm,
final Document doc) {
try {
_reopenToken = _trackingIndexWriter.updateDocument(recordIdTerm,
doc);
log.debug("{} document re-indexed in lucene",recordIdTerm.text());
} catch(IOException ioEx) {
log.error("Error in lucene re-indexing operation: {}",ioEx.getMessage(),
ioEx);
} finally {
try {
_indexWriter.commit();
} catch (IOException ioEx) {
log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
ioEx);
}
}
}
/**
* Unindex a lucene document
* @param idTerm term used to locate the document to be unindexed
* IMPORTANT! the term must filter only the document and only the document
* otherwise all matching docs will be unindexed
*/
public void unIndex(final Term idTerm) {
try {
_reopenToken = _trackingIndexWriter.deleteDocuments(idTerm);
log.debug("{}={} term matching records un-indexed from lucene",idTerm.field(),
idTerm.text());
} catch(IOException ioEx) {
log.error("Error in un-index lucene operation: {}",ioEx.getMessage(),
ioEx);
} finally {
try {
_indexWriter.commit();
} catch (IOException ioEx) {
log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
ioEx);
}
}
}
/**
* Delete all lucene index docs
*/
public void truncate() {
try {
_reopenToken = _trackingIndexWriter.deleteAll();
log.warn("lucene index truncated");
} catch(IOException ioEx) {
log.error("Error truncating lucene index: {}",ioEx.getMessage(),
ioEx);
} finally {
try {
_indexWriter.commit();
} catch (IOException ioEx) {
log.error("Error truncating lucene index: {}",ioEx.getMessage(),
ioEx);
}
}
}
/////// COUNT-SEARCH
/**
* Count the number of results returned by a search against the lucene index
* @param qry the query
* @return
*/
public long count(final Query qry) {
long outCount = 0;
try {
_indexSearcherReopenThread.waitForGeneration(_reopenToken); // wait untill the index is re-opened
IndexSearcher searcher = _indexSearcherReferenceManager.acquire();
try {
TopDocs docs = searcher.search(qry,0);
if (docs != null) outCount = docs.totalHits;
log.debug("count-search executed against lucene index returning {}",outCount);
} finally {
_indexSearcherReferenceManager.release(searcher);
}
} catch (IOException ioEx) {
log.error("Error re-opening the index {}",ioEx.getMessage(),
ioEx);
} catch (InterruptedException intEx) {
log.error("The index writer periodically re-open thread has stopped",intEx.getMessage(),
intEx);
}
return outCount;
}
/**
* Executes a search query
* @param qry the query to be executed
* @param sortFields the search query criteria
* @param firstResultItemOrder the order number of the first element to be returned
* @param numberOfResults number of results to be returnee
* @return a page of search results
*/
public LucenePageResults search(final Query qry,Set<SortField> sortFields,
final int firstResultItemOrder,final int numberOfResults) {
LucenePageResults outDocs = null;
try {
_indexSearcherReopenThread.waitForGeneration(_reopenToken); // wait until the index is re-opened for the last update
IndexSearcher searcher = _indexSearcherReferenceManager.acquire();
try {
// sort crieteria
SortField[] theSortFields = null;
if (CollectionUtils.hasData(sortFields)) theSortFields = CollectionUtils.toArray(sortFields,SortField.class);
Sort theSort = CollectionUtils.hasData(theSortFields) ? new Sort(theSortFields)
: null;
// number of results to be returned
int theNumberOfResults = firstResultItemOrder + numberOfResults;
// Exec the search (if the sort criteria is null, they're not used)
TopDocs scoredDocs = theSort != null ? searcher.search(qry,
theNumberOfResults,
theSort)
: searcher.search(qry,
theNumberOfResults);
log.debug("query {} {} executed against lucene index: returned {} total items, {} in this page",qry.toString(),
(theSort != null ? theSort.toString() : ""),
scoredDocs != null ? scoredDocs.totalHits : 0,
scoredDocs != null ? scoredDocs.scoreDocs.length : 0);
outDocs = LucenePageResults.create(searcher,
scoredDocs,
firstResultItemOrder,numberOfResults);
} finally {
_indexSearcherReferenceManager.release(searcher);
}
} catch (IOException ioEx) {
log.error("Error freeing the searcher {}",ioEx.getMessage(),
ioEx);
} catch (InterruptedException intEx) {
log.error("The index writer periodically re-open thread has stopped",intEx.getMessage(),
intEx);
}
return outDocs;
}
/////// INDEX MAINTEINANCE
/**
* Mergest the lucene index segments into one
* (this should NOT be used, only rarely for index mainteinance)
*/
public void optimize() {
try {
_indexWriter.forceMerge(1);
log.debug("Lucene index merged into one segment");
} catch (IOException ioEx) {
log.error("Error optimizing lucene index {}",ioEx.getMessage(),
ioEx);
}
}
}
EDIT 2: For thoose using the previous lucene 4.3 NearRealTime Manager type this is the analogous code
@Slf4j
public class LuceneIndexForLucene43 {
private final IndexWriter _indexWriter;
private final TrackingIndexWriter _trackingIndexWriter;
private final NRTManager _searchManager;
LuceneNRTReopenThread _reopenThread = null;
private long _reopenToken; // index update/delete methods returned token
///// CONSTRUCTOR
/**
* Constructor based on an instance of the type responsible of the lucene index persistence
*/
@Inject
public LuceneIndexForLucene43(final Directory luceneDirectory,
final Analyzer analyzer) {
try {
// Create the indexWriter
_indexWriter = new IndexWriter(luceneDirectory,
new IndexWriterConfig(LuceneConstants.VERSION,
analyzer));
_trackingIndexWriter = new NRTManager.TrackingIndexWriter(_indexWriter);
// Create the SearchManager to exec the search
_searchManager = new NRTManager(_trackingIndexWriter,
new SearcherFactory(),
true);
// Open the thread in charge of re-open the index to allow it to see real-time changes
// The index is refreshed every 60sc when nobody is waiting
// and every 100 millis whenever is someone waiting (see search method)
// (see http://lucene.apache.org/core/4_3_0/core/org/apache/lucene/search/NRTManagerReopenThread.html)
_reopenThread = new LuceneNRTReopenThread(_searchManager,
60.0, // when there is nobody waiting
0.1); // when there is someone waiting
_reopenThread.startReopening();
} catch (IOException ioEx) {
throw new IllegalStateException("Lucene index could not be created: " + ioEx.getMessage());
}
}
@Override
protected void finalize() throws Throwable {
this.close();
super.finalize();
}
/**
* Closes every index
*/
public void close() {
try {
// stop the index reader re-open thread
_reopenThread.stopReopening();
_reopenThread.interrupt();
// Close the search manager
_searchManager.close();
// Close the indexWriter, commiting everithing that's pending
_indexWriter.commit();
_indexWriter.close();
} catch(IOException ioEx) {
log.error("Error while closing lucene index: {}",ioEx.getMessage(),
ioEx);
}
}
//////// REOPEN-THREAD: Thread in charge of re-open the IndexReader to have access to the
// latest IndexWriter changes
private class LuceneNRTReopenThread
extends NRTManagerReopenThread {
volatile boolean _finished = false;
public LuceneNRTReopenThread(final NRTManager manager,
final double targetMaxStaleSec,final double targetMinStaleSec) {
super(manager, targetMaxStaleSec, targetMinStaleSec);
this.setName("NRT Reopen Thread");
this.setPriority(Math.min(Thread.currentThread().getPriority()+2,
Thread.MAX_PRIORITY));
this.setDaemon(true);
}
public synchronized void startReopening() {
_finished = false;
this.start();
}
public synchronized void stopReopening() {
_finished = true;
}
@Override
public void run() {
while (!_finished) {
super.run();
}
}
}
/////// INDEX
/**
* Index a Lucene document
* @param doc the document to be indexed
*/
public void index(final Document doc) {
try {
_reopenToken = _trackingIndexWriter.addDocument(doc);
log.debug("document indexed in lucene");
} catch(IOException ioEx) {
log.error("Error while in Lucene index operation: {}",ioEx.getMessage(),
ioEx);
} finally {
try {
_indexWriter.commit();
} catch (IOException ioEx) {
log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
ioEx);
}
}
}
/**
* Updates the index info for a lucene document
* @param doc the document to be indexed
*/
public void reIndex(final Term recordIdTerm,
final Document doc) {
try {
_reopenToken = _trackingIndexWriter.updateDocument(recordIdTerm,
doc);
log.debug("{} document re-indexed in lucene",recordIdTerm.text());
} catch(IOException ioEx) {
log.error("Error in lucene re-indexing operation: {}",ioEx.getMessage(),
ioEx);
} finally {
try {
_indexWriter.commit();
} catch (IOException ioEx) {
log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
ioEx);
}
}
}
/**
* Unindex a lucene document
* @param idTerm term used to locate the document to be unindexed
* IMPORTANT! the term must filter only the document and only the document
* otherwise all matching docs will be unindexed
*/
public void unIndex(final Term idTerm) {
try {
_reopenToken = _trackingIndexWriter.deleteDocuments(idTerm);
log.debug("{}={} term matching records un-indexed from lucene",idTerm.field(),
idTerm.text());
} catch(IOException ioEx) {
log.error("Error in un-index lucene operation: {}",ioEx.getMessage(),
ioEx);
} finally {
try {
_indexWriter.commit();
} catch (IOException ioEx) {
log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
ioEx);
}
}
}
/**
* Delete all lucene index docs
*/
public void truncate() {
try {
_reopenToken = _trackingIndexWriter.deleteAll();
log.warn("lucene index truncated");
} catch(IOException ioEx) {
log.error("Error truncating lucene index: {}",ioEx.getMessage(),
ioEx);
} finally {
try {
_indexWriter.commit();
} catch (IOException ioEx) {
log.error("Error truncating lucene index: {}",ioEx.getMessage(),
ioEx);
}
}
}
////// COUNT-SEARCH
/**
* Count the number of results returned by a search against the lucene index
* @param qry the query
* @return
*/
public long count(final Query qry) {
long outCount = 0;
try {
_searchManager.waitForGeneration(_reopenToken); // wait untill the index is re-opened
IndexSearcher searcher = _searchManager.acquire();
try {
TopDocs docs = searcher.search(qry,0);
if (docs != null) outCount = docs.totalHits;
log.debug("count-search executed against lucene index returning {}",outCount);
} finally {
_searchManager.release(searcher);
}
} catch (IOException ioEx) {
log.error("Error re-opening the index {}",ioEx.getMessage(),
ioEx);
}
return outCount;
}
/**
* Executes a search query
* @param qry the query to be executed
* @param sortFields the search query criteria
* @param firstResultItemOrder the order number of the first element to be returned
* @param numberOfResults number of results to be returnee
* @return a page of search results
*/
public LucenePageResults search(final Query qry,Set<SortField> sortFields,
final int firstResultItemOrder,final int numberOfResults) {
LucenePageResults outDocs = null;
try {
_searchManager.waitForGeneration(_reopenToken); // wait until the index is re-opened for the last update
IndexSearcher searcher = _searchManager.acquire();
try {
// sort crieteria
SortField[] theSortFields = null;
if (CollectionUtils.hasData(sortFields)) theSortFields = CollectionUtils.toArray(sortFields,SortField.class);
Sort theSort = CollectionUtils.hasData(theSortFields) ? new Sort(theSortFields)
: null;
// number of results to be returned
int theNumberOfResults = firstResultItemOrder + numberOfResults;
// Exec the search (if the sort criteria is null, they're not used)
TopDocs scoredDocs = theSort != null ? searcher.search(qry,
theNumberOfResults,
theSort)
: searcher.search(qry,
theNumberOfResults);
log.debug("query {} {} executed against lucene index: returned {} total items, {} in this page",qry.toString(),
(theSort != null ? theSort.toString() : ""),
scoredDocs != null ? scoredDocs.totalHits : 0,
scoredDocs != null ? scoredDocs.scoreDocs.length : 0);
outDocs = LucenePageResults.create(searcher,
scoredDocs,
firstResultItemOrder,numberOfResults);
} finally {
_searchManager.release(searcher);
}
} catch (IOException ioEx) {
log.error("Error freeing the searcher {}",ioEx.getMessage(),
ioEx);
}
return outDocs;
}
/////// INDEX MAINTEINANCE
/**
* Mergest the lucene index segments into one
* (this should NOT be used, only rarely for index mainteinance)
*/
public void optimize() {
try {
_indexWriter.forceMerge(1);
log.debug("Lucene index merged into one segment");
} catch (IOException ioEx) {
log.error("Error optimizing lucene index {}",ioEx.getMessage(),
ioEx);
}
}
}
You should not commit after every document, and you shouldn't need to Thread.interrupt the reopen thread (in fact, this is deadly when you are using NIOFSDirectory). Instead, just call its (ControlledRealTimeReopenThread) close method: under the hood, it notifies itself and should finish quickly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With