I am reading a file (300,000 lines) in node.js. I want to send lines in batches of 5,000 lines to another application (Elasticsearch) to store them. So whenever I finish reading 5,000 lines, I want to send them in bulk to Elasticsearch through an API to store them and then keep reading the rest of the file and send every 5,000 line in bulk.
If I want to use java (or any other blocking language such as C, C++, python, etc.) for this task, I'll do something like this:
int countLines = 0;
String bulkString = "";
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("filePath.txt")));
while ((currentLine = br.readLine()) != null) {
countLines++;
bulkString += currentLine;
if(countLines >= 5000){
//send bulkString to Elasticsearch via APIs
countLines = 0;
bulkString = "";
}
}
If I want to do the same thing with node.js, I will do:
var countLines = 0;
var bulkString = "";
var instream = fs.createReadStream('filePath.txt');
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
if(countLines >= 5000){
//send bulkString to via APIs
client.bulk({
index: 'indexName',
type: 'type',
body: [bulkString]
}, function (error, response) {
//task is done
});
countLines = 0;
bulkString = "";
}
}
The problem with node.js is that it is non-blocking so it doesn't wait for the first API response before sending the next batch of lines. I know that this could count as a benefit for done.js because it does not wait for I/O, but the problem is that it sends too much of data to the Elasticsearch. Therefor the Elasticsearch's queue will get full and it will throw an exceptions.
My question is that how can I make the node.js to wait for the response from the API before it continues to read next lines or before it sends the next batch of lines to the Elasticsearch.
I know I can set some parameters in Elasticsearch to increase the queue size, but I am interested in blocking behavior of node.js for this issue. I am familiar with the concept of callbacks, but I cannot think of a way to use callbacks in this scenario to prevent node.js from calling the Elasticsearch API in non-blocking mode.
Method 1: Using the Readline Module: Readline is a native module of Node. js, it was developed specifically for reading the content line by line from any readable stream. It can be used to read data from the command line. const readline = require('readline');
Node. js is based on an event-driven non-blocking I/O model.
Instead of the process being blocked and waiting for I/O operations to complete, the I/O operations are delegated to the system, so that the process can execute the next piece of code. Non-blocking I/O operations provide a callback function that is called when the operation is completed.
Note: So, the solution to this problem in node. js is to use asynchronous non-blocking code and Node. js uses an event loop for this. “An object that handles and processes external events and converts them into call-back calls” is what an event loop is.
Pierre's answer is correct. I just want to submit a code that shows how we can benefit from non-blocking concept of the node.js but at the same time, do not overwhelm the Elasticsearch with too many requests at one time.
Here is a pseudo code that you can use to give the code a flexibility by setting the queue size limit:
var countLines = 0;
var bulkString = "";
var queueSize = 3;//maximum of 3 requests will be sent to the Elasticsearch server
var batchesAlreadyInQueue = 0;
var instream = fs.createReadStream('filePath.txt');
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
if(countLines >= 5000){
//send bulkString to via APIs
client.bulk({
index: 'indexName',
type: 'type',
body: [bulkString]
}, function (error, response) {
//task is done
batchesAlreadyInQueue--;//we will decrease a number of requests that are already sent to the Elasticsearch when we hear back from one of the requests
rl.resume();
});
if(batchesAlreadyInQueue >= queueSize){
rl.pause();
}
countLines = 0;
bulkString = "";
}
}
use rl.pause()
right after your if and rl.resume()
after your //task is done
.
Note that you may have a few more line event after calling pause.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With