Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wait until all bulk writes are completed in elastic search api

Using NodeJS elastic search client. Trying to write a data importer to bulk import documents from MongoDB. The problem I'm having is the index refresh doesn't seem to wait until all documents are written to elastic before checking the counts.

Using the streams API in node to read the records into a batch, then using the elastic API bulk command to write the records. Shown below:

function rebuildIndex(modelName, queryStream, openStream, done) {
    logger.debug('Rebuilding %s index', modelName);
    async.series([
        function (next) {
          deleteType(modelName, function (err, result) {
            next(err, result);
          });
        },
        function (next) {
          var Model;
          var i = 0;
          var batchSize = settings.indexBatchSize;
          var batch = [];
          var stream;

          if (queryStream && !openStream) {
            stream = queryStream.stream();
          } else if (queryStream && openStream) {
            stream = queryStream;
          }else
          {
            Model = mongoose.model(modelName);
            stream = Model.find({}).stream();
          }

          stream.on("data", function (doc) {
            logger.debug('indexing %s', doc.userType);
            batch.push({
              index: {
                "_index": settings.index,
                "_type": modelName.toLowerCase(),
                "_id": doc._id.toString()
              }
            });
            var obj;
            if (doc.toObject){
              obj = doc.toObject();
            }else{
              obj = doc;
            }
            obj = _.clone(obj);

            delete obj._id;
            batch.push(obj);
            i++;
            if (i % batchSize == 0) {
              console.log(chalk.green('Loaded %s records'), i);
              client().bulk({
                body: batch
              }, function (err, resp) {
                if (err) {
                  next(err);
                } else if (resp.errors) {
                  next(resp);
                }
              });
              batch = [];
            }
          });

          // When the stream ends write the remaining records
          stream.on("end", function () {
            if (batch.length > 0) {
              console.log(chalk.green('Loaded %s records'), batch.length / 2);
              client().bulk({
                body: batch
              }, function (err, resp) {
                if (err) {
                  logger.error(err, 'Failed to rebuild index');
                  next(err);
                } else if (resp.errors) {
                  logger.error(resp.errors, 'Failed to rebuild index');
                  next(resp);
                } else {
                  logger.debug('Completed rebuild of %s index', modelName);
                  next();
                }
              });
            } else {
              next();
            }

            batch = [];
          })
        }

      ],
      function (err) {
        if (err)
          logger.error(err);
        done(err);
      }
    );
  }

I use this helper to check the document counts in the index. Without the timeout, the counts in the index are wrong, but with the timeout they're okay.

/**
   * A helper function to count the number of documents in the search index for a particular type.
   * @param type The type, e.g. User, Customer etc.
   * @param done A callback to report the count.
   */
  function checkCount(type, done) {
    async.series([
      function(next){
        setTimeout(next, 1500);
      },
      function (next) {
        refreshIndex(next);
      },
      function (next) {
        client().count({
          "index": settings.index,
          "type": type.toLowerCase(),
          "ignore": [404]
        }, function (error, count) {
          if (error) {
            next(error);
          } else {
            next(error, count.count);
          }
        });
      }
    ], function (err, count) {
      if (err)
        logger.error({"err": err}, "Could not check index counts.");
      done(err, count[2]);
    });
  }

And this helper is supposed to refresh the index after the update completes:

// required to get results to show up immediately in tests. Otherwise there's a 1 second delay
  // between adding an entry and it showing up in a search.
  function refreshIndex(done) {
    client().indices.refresh({
      "index": settings.index,
      "ignore": [404]
    }, function (error, response) {
      if (error) {
        done(error);
      } else {
        logger.debug("deleted index");
        done();
      }
    });
  }

The loader works okay, except this test fails because of timing between the bulk load and the count check:

it('should be able to rebuild and reindex customer data', function (done) {
    this.timeout(0); // otherwise the stream reports a timeout error
    logger.debug("Testing the customer reindexing process");

    // pass null to use the generic find all query
    searchUtils.rebuildIndex("Customer", queryStream, false, function () {
      searchUtils.checkCount("Customer", function (err, count) {
        th.checkSystemErrors(err, count);
        count.should.equal(volume.totalCustomers);
        done();
      })
    });
  });

I observe random results in the counts from the tests. With the artificial delay (setTimeout in the checkCount function) then the counts match. So I conclude that the documents are eventually written to elastic and the test would pass. I thought the indices.refresh would essentially force a wait until the documents are all written to the index, but it doesn't seem to be working with this approach.

The setTimeout hack is not really sustainable when the volume goes to actual production level....so how can I ensure the bulk calls are completely written to elastic index before checking the count of documents?

like image 511
Richard G Avatar asked Apr 16 '16 06:04

Richard G


People also ask

Why is Elasticsearch so fast?

Elasticsearch is fast.Because Elasticsearch is built on top of Lucene, it excels at full-text search. Elasticsearch is also a near real-time search platform, meaning the latency from the time a document is indexed until it becomes searchable is very short — typically one second.

What is batch processing in Elasticsearch?

Batch processing in Elasticsearch refers to the process of indexing or deleting large amounts of data at once. This can be done using the Bulk API, which allows you to index or delete multiple documents in a single request.

What is bulk indexing in Elasticsearch?

In Elasticsearch, when using the Bulk API it is possible to perform many write operations in a single API call, which increases the indexing speed. Using the Bulk API is more efficient than sending multiple separate requests. This can be done for the following four actions: Index. Update.

How do I refresh Elasticsearch index?

By default, Elasticsearch periodically refreshes indices every second, but only on indices that have received one search request or more in the last 30 seconds. You can change this default interval using the index. refresh_interval setting.


1 Answers

Take a look at the "refresh" parameter (elasticsearch documentation)

For example:

let bulkUpdatesBody = [ bulk actions / docs to index go here ]
client.bulk({
  refresh: "wait_for",
  body: bulkUpdatesBody
});
like image 138
Troy Avatar answered Oct 21 '22 08:10

Troy