Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Elasticsearch Indexing by BulkRequestBuilder getting slow down

Hi all elasticsearch masters.

I have millions of data to be indexed by elasticsearch Java API. The number of cluster nodes for elasticsearch are three (1 as master + 2 nodes).

My code snippet is below.

Settings settings = ImmutableSettings.settingsBuilder()
     .put("cluster.name", "MyClusterName").build();

TransportClient client = new TransportClient(settings);
String hostname = "myhost ip";
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port));

BulkRequestBuilder bulkBuilder = client.prepareBulk();
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path"))));
long bulkBuilderLength = 0;
String readLine = "";
String index = "my_index_name";
String type = "my_type_name";
String id = "";

while((readLine = br.readLine()) != null){

    id = somefunction(readLine);
    String json = new ObjectMapper().writeValueAsString(readLine);
    bulkBuilder.add(client.prepareIndex(index, type, id)
        .setSource(json));
    bulkBuilderLength++;
    if(bulkBuilderLength % 1000== 0){
        logger.info("##### " + bulkBuilderLength + " data indexed.");
        BulkResponse bulkRes = bulkBuilder.execute().actionGet();
        if(bulkRes.hasFailures()){
            logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
        }
    }
}

br.close();

if(bulkBuilder.numberOfActions() > 0){
    logger.info("##### " + bulkBuilderLength + " data indexed.");
    BulkResponse bulkRes = bulkBuilder.execute().actionGet();
    if(bulkRes.hasFailures()){
        logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
    }
    bulkBuilder = client.prepareBulk();
}

It works fine but the performance getting SLOW DOWN RAPIDLY after thousands of document.

I've already tried to change settings value of "refresh_interval" as -1 and "number_of_replicas" as 0. However, the situation of performance decreasing is the same.

If I monitor the status of my cluster using bigdesk, the GC value reaches 1 in every seconds like the screenshot below.

Anyone can help me?

Thanks in advance.

enter image description here

=================== UPDATED ===========================

Finally, I've solved this problem. (See the answer).

The cause of the problem is that I've missed recreate a new BulkRequestBuilder. Performance degradation is never occurred after I've changed my code snippet like below.

Thank you very much.

Settings settings = ImmutableSettings.settingsBuilder()
     .put("cluster.name", "MyClusterName").build();

TransportClient client = new TransportClient(settings);
String hostname = "myhost ip";
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port));

BulkRequestBuilder bulkBuilder = client.prepareBulk();
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path"))));
long bulkBuilderLength = 0;
String readLine = "";
String index = "my_index_name";
String type = "my_type_name";
String id = "";

while((readLine = br.readLine()) != null){

    id = somefunction(readLine);
    String json = new ObjectMapper().writeValueAsString(readLine);
    bulkBuilder.add(client.prepareIndex(index, type, id)
        .setSource(json));
    bulkBuilderLength++;
    if(bulkBuilderLength % 1000== 0){
        logger.info("##### " + bulkBuilderLength + " data indexed.");
        BulkResponse bulkRes = bulkBuilder.execute().actionGet();
        if(bulkRes.hasFailures()){
            logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
        }
        bulkBuilder = client.prepareBulk();  // This line is my mistake and the solution !!!
    }
}

br.close();

if(bulkBuilder.numberOfActions() > 0){
    logger.info("##### " + bulkBuilderLength + " data indexed.");
    BulkResponse bulkRes = bulkBuilder.execute().actionGet();
    if(bulkRes.hasFailures()){
        logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
    }
    bulkBuilder = client.prepareBulk();
}
like image 213
Hosang Jeon Avatar asked Apr 09 '14 00:04

Hosang Jeon


People also ask

What is indexing rate in Elasticsearch?

2. How is an index rate used in Elasticsearch? Elasticsearch is a powerful open source search and analytics engine that makes data easy to explore. An index rate is used in Elasticsearch to keep track of how often a document is updated.

What is the need for tuning the performance of Elasticsearch?

Why Is ElasticSearch Tuning Required? Elasticsearch gives you moderate performance for search and injection of logs maintaining a balance. But when the service utilization or service count within the infrastructure grows, logs grow in similar proportion.


1 Answers

The problem here is that you don't recreate again a new Bulk after Bulk execution.

It means that you are reindexing the same first data again and again.

BTW, look at BulkProcessor class. Definitely better to use.

like image 92
dadoonet Avatar answered Oct 01 '22 17:10

dadoonet