Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to reindex in ElasticSearch via Java API

Like title says...

I read this article (https://www.elastic.co/blog/changing-mapping-with-zero-downtime), and the concept was great, but I struggling to find decent reference on how to do it via JAVA API.

I found this plugin: https://github.com/karussell/elasticsearch-reindex, but seems like overkill of what I am trying to do

like image 695
krinker Avatar asked Jul 10 '15 16:07

krinker


2 Answers

After some research at a local Starbucks here is what I came up with:

Let's assume that we have our index already ("old_index") and it has data... Now let's move that data to a new index ("new_index") that we created (perhaps with different schema STRING vs INT for a certain field, or now you decide that you no longer wish to analyze or store certain field, etc).

The basic idea here is to retrieve all data from already existing index ("old_index") and ingest it into new index ("new_index"). However, there are few things that you have to do:

Step 1. You need to perform search scroll https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html

All it does it retrieves results much more efficiently vs regular search. There is no scoring, etc. Here is what the documentation has to say: "Scrolling is not intended for real time user requests, but rather for processing large amounts of data, e.g. in order to reindex the contents of one index into a new index with a different configuration."

Here is a link to Java API on how to use it: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/scrolling.html

Step 2. When doing the insert, you have to use bulk ingest. Once again, it is done for performance reasons. Here is a link to Bulk Ingest Java API: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/bulk.html#_using_bulk_processor

Now onto ho to actually do it...

Step 1. Set up scroll search that would "load" data from old index

SearchResponse scrollResp = client.prepareSearch("old_index") // Specify index
    .setSearchType(SearchType.SCAN)
    .setScroll(new TimeValue(60000))
    .setQuery(QueryBuilders.matchAllQuery()) // Match all query
    .setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll

Step 2. Set up bulk processor.

int BULK_ACTIONS_THRESHOLD = 1000;
int BULK_CONCURRENT_REQUESTS = 1;
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        logger.info("Bulk Going to execute new bulk composed of {} actions", request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        logger.info("Executed bulk composed of {} actions", request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.warn("Error executing bulk", failure);
    }
    }).setBulkActions(BULK_ACTIONS_THRESHOLD).setConcurrentRequests(BULK_CONCURRENT_REQUESTS).setFlushInterval(TimeValue.timeValueMillis(5)).build();

Step 3. Read from old index via created scroll searcher in Step 1 until there are mo records left and insert into new index

//Scroll until no hits are returned
while (true) {
    scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
    //Break condition: No hits are returned
    if (scrollResp.getHits().getHits().length == 0) {
        logger.info("Closing the bulk processor");
        bulkProcessor.close();
        break; 
    }
    // Get results from a scan search and add it to bulk ingest
    for (SearchHit hit: scrollResp.getHits()) {
        IndexRequest request = new IndexRequest("new_index", hit.type(), hit.id());
        Map source = ((Map) ((Map) hit.getSource()));
        request.source(source);
        bulkProcessor.add(request);
   }
}

Step 4. Now it is time to assign existing alias, that points to old index, to new index. Then delete alias reference to old index and then delete old index itself. To find out how to determine alias that were assigned to already existing old index see this post: ElasticSeach JAVA API to find aliases given index

To assign alias to new index

client.admin().indices().prepareAliases().addAlias("new_index", "alias_name").get();

Remove alias from old index and then delete old index

client.admin().indices().prepareAliases().removeAlias("old_index", "alias_name").execute().actionGet();
client.admin().indices().prepareDelete("old_index").execute().actionGet();
like image 137
krinker Avatar answered Nov 12 '22 20:11

krinker


Since ES 2.0 you can use the reindex API. Since there is no documentation for how to do so with the Java API here are the steps:

  1. Add the Maven dependency according to your ES version
  2. Add the plugin to your client:

    client = TransportClient.builder().settings(elaSettings).addPlugin(ReindexPlugin.class).build();
    
  3. Call the reindex api

    ReindexRequestBuilder builder = ReindexAction.INSTANCE.newRequestBuilder(client).source(oldIndex).destination(newIndex);  
    builder.destination().setOpType(opType);
    builder.abortOnVersionConflict(false); builder.get();
    
like image 35
Remot Avatar answered Nov 12 '22 20:11

Remot