Like title says...
I read this article (https://www.elastic.co/blog/changing-mapping-with-zero-downtime), and the concept was great, but I struggling to find decent reference on how to do it via JAVA API.
I found this plugin: https://github.com/karussell/elasticsearch-reindex, but seems like overkill of what I am trying to do
After some research at a local Starbucks here is what I came up with:
Let's assume that we have our index already ("old_index") and it has data... Now let's move that data to a new index ("new_index") that we created (perhaps with different schema STRING vs INT for a certain field, or now you decide that you no longer wish to analyze or store certain field, etc).
The basic idea here is to retrieve all data from already existing index ("old_index") and ingest it into new index ("new_index"). However, there are few things that you have to do:
Step 1. You need to perform search scroll https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
All it does it retrieves results much more efficiently vs regular search. There is no scoring, etc. Here is what the documentation has to say: "Scrolling is not intended for real time user requests, but rather for processing large amounts of data, e.g. in order to reindex the contents of one index into a new index with a different configuration."
Here is a link to Java API on how to use it: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/scrolling.html
Step 2. When doing the insert, you have to use bulk ingest. Once again, it is done for performance reasons. Here is a link to Bulk Ingest Java API: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/bulk.html#_using_bulk_processor
Now onto ho to actually do it...
Step 1. Set up scroll search that would "load" data from old index
SearchResponse scrollResp = client.prepareSearch("old_index") // Specify index
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(60000))
.setQuery(QueryBuilders.matchAllQuery()) // Match all query
.setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll
Step 2. Set up bulk processor.
int BULK_ACTIONS_THRESHOLD = 1000;
int BULK_CONCURRENT_REQUESTS = 1;
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Bulk Going to execute new bulk composed of {} actions", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.info("Executed bulk composed of {} actions", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.warn("Error executing bulk", failure);
}
}).setBulkActions(BULK_ACTIONS_THRESHOLD).setConcurrentRequests(BULK_CONCURRENT_REQUESTS).setFlushInterval(TimeValue.timeValueMillis(5)).build();
Step 3. Read from old index via created scroll searcher in Step 1 until there are mo records left and insert into new index
//Scroll until no hits are returned
while (true) {
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
//Break condition: No hits are returned
if (scrollResp.getHits().getHits().length == 0) {
logger.info("Closing the bulk processor");
bulkProcessor.close();
break;
}
// Get results from a scan search and add it to bulk ingest
for (SearchHit hit: scrollResp.getHits()) {
IndexRequest request = new IndexRequest("new_index", hit.type(), hit.id());
Map source = ((Map) ((Map) hit.getSource()));
request.source(source);
bulkProcessor.add(request);
}
}
Step 4. Now it is time to assign existing alias, that points to old index, to new index. Then delete alias reference to old index and then delete old index itself. To find out how to determine alias that were assigned to already existing old index see this post: ElasticSeach JAVA API to find aliases given index
To assign alias to new index
client.admin().indices().prepareAliases().addAlias("new_index", "alias_name").get();
Remove alias from old index and then delete old index
client.admin().indices().prepareAliases().removeAlias("old_index", "alias_name").execute().actionGet();
client.admin().indices().prepareDelete("old_index").execute().actionGet();
Since ES 2.0 you can use the reindex API. Since there is no documentation for how to do so with the Java API here are the steps:
Add the plugin to your client:
client = TransportClient.builder().settings(elaSettings).addPlugin(ReindexPlugin.class).build();
Call the reindex api
ReindexRequestBuilder builder = ReindexAction.INSTANCE.newRequestBuilder(client).source(oldIndex).destination(newIndex);
builder.destination().setOpType(opType);
builder.abortOnVersionConflict(false); builder.get();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With