Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Forcing ~synchronous Node.js IPC

I have a Node server that creates a child process with fork() using IPC. At some point the child sends results back to the parent at about 10Hz as part of a long-running task. When the payload passed to process.send() is small all works well: every message I send is received ~immediately and processed by the parent.

However, when the payload is 'large'—I haven't determined the exact size limit—instead of being immediately received by the parent all payloads are first sent, and only once the child is done its long-running task does the parent receive and process the messages.

tl;dr visual:

Good (happens with small payload):

child:  send()
parent: receive()
child:  send()
parent: receive()
child:  send()
parent: receive()
...

Bad (happens with big payload):

child:  send()
child:  send()
child:  send()
(repeat many times over many seconds)
...
parent: receive()
parent: receive()
parent: receive()
parent: receive()
...
  1. Is this a bug? (Edit: behavior only occurs on OS X, not Windows or Linux)
  2. Is there any way to avoid this, other than trying to keep my IPC payload small?

Edit 2: the sample code below uses both time and iteration counter to select when to send an update. (In my actual code it's also possible to send an update after n iterations, or after the loop achieves certain results.) As such a rewrite of the code to use setInterval/setTimeout instead of a loop is a last resort for me, as it requires me to remove features.

Edit: Here is test code that reproduces the problem. However, it only reproduces on OS X, not on Windows or Linux:

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', msg => console.log(`parent: receive() ${msg.data.length} bytes`, Date.now()));

require('http').createServer((req, res) => {
   console.log(req.url);
   const match = /\d+/.exec(req.url);
   if (match) {
      child.send(match[0]*1);
      res.writeHead(200, {'Content-Type':'text/plain'});
      res.end(`Sending packets of size ${match[0]}`);
   } else {
      res.writeHead(404, {'Content-Type':'text/plain'});
      res.end('what?');
   }
}).listen(8080);

worker.js

if (process.send) process.on('message', msg => run(msg));

function run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e7; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.log(`worker: send()  > ${messageSize} bytes`, now);
         process.send({action:'update', data:msg});
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   console.log('worker done');
}

About around 8k the problem happens. For example, when querying http://localhost:8080/15 vs http://localhost:8080/123456

/15
worker: send()  > 15 bytes 1571324249029
parent: receive() 15 bytes 1571324249034
worker: send()  > 15 bytes 1571324249235
parent: receive() 15 bytes 1571324249235
worker: send()  > 15 bytes 1571324249436
parent: receive() 15 bytes 1571324249436
worker done
/123456
worker: send()  > 123456 bytes 1571324276973
worker: send()  > 123456 bytes 1571324277174
worker: send()  > 123456 bytes 1571324277375
child done
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277393

Experienced on both Node v12.7 and v12.12.

like image 640
Phrogz Avatar asked Oct 17 '19 04:10

Phrogz


4 Answers

lHaving a long-running and blocking while loop in combination with sockets or file descriptors in node is always an indication that something is done wrong.

Without being able to test the whole setup it is hard to tell if my claim is really correct, but short messages can probably be passed directly in one chunk to the OS which then passes it to the other process. With larger messages node would need to wait until the OS can receive more data, so sending is queued up, and as you have a blocking while the sending is queue until the while loop ended.

So to your question, not that is not a bug.

As you use a recent nodejs version I would use a await and async instead of and create a non-blocking while similar to the sleep in this answer. The await will allow the node event loop to intercept if processSome returns pending Promise.

For your code that is not really reflect a real use case, it is hard to tell how to solve it correctly. If you don't do anything async in processSome that would allow the I/O to intercept then you need to do that manually on a regular basis, with e.g. a await new Promise(setImmediate);.

async function run() {
  let interval = setInterval(() => {
    process.send({action:'update', data:status()});
    console.log('child:  send()');
  }, 1/10)

  while(keepGoing()) {
    await processSome();
  }

  clearInterval(interval)
}
like image 55
t.niese Avatar answered Sep 24 '22 23:09

t.niese


Regarding your first question

Is this a bug? (Edit: behavior only occurs on OS X, not Windows or Linux)

This is definitely not bug and I could reproduce it on my windows 10 (for the size 123456). It's mostly because of the underlying kernel buffering and context switching by OS, as two separate processes (not detached) are communicating over a ipc descriptor.

Regarding your second question

Is there any way to avoid this, other than trying to keep my IPC payload small?

If I understand the problem correctly, you're trying to solve, for each http request, every time the worker sends a chunk back to the server, you want the server to process it before you get the next chunk. That's how I understand when you said sync processing

There's a way using promises, but I would like to use generators in the workers. It's better to orchestrate the flow across server and worker

Flow:

  1. Server sends an integer to the worker whatever it gets from http request
  2. Worker then creates and runs generator to send the first chunk
  3. Worker yields after sending the chunk
  4. Server requests for more
  5. Worker generates more since server asked for more (only if available)
  6. If no more, worker sends of end of chunks
  7. Server just logs that worker is done and doesn't request any more

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc'], detached:false};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', (msg) => {
   //FLOW 7: Worker is done, just log
   if (msg.action == 'end'){
      console.log(`child ended for a particular request`)
   } else {
      console.log(`parent: receive(${msg.data.iter}) ${msg.data.msg.length} bytes`, Date.now())
      //FLOW 4: Server requests for more
      child.send('more')
   }   

});

require('http').createServer((req, res) => {
   console.log(req.url);
   const match = /\d+/.exec(req.url);   
   if (match) {
      //FLOW 1: Server sends integer to worker
      child.send(match[0]*1);
      res.writeHead(200, {'Content-Type':'text/plain'});
      res.end(`Sending packets of size ${match[0]}`);
   } else {
      res.writeHead(404, {'Content-Type':'text/plain'});
      res.end('what?');
   }
}).listen(8080);

worker.js

let runner
if (process.send) process.on('message', msg => {   
   //FLOW 2: Worker creates and runs a generator to send the first chunk
   if (parseInt(msg)) {
      runner = run(msg)
      runner.next()
   }
   //FLOW 5: Server asked more, so generate more chunks if available
   if (msg == "more") runner.next()

});

//generator function *
function* run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e7; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.log(`worker: send(${i})  > ${messageSize} bytes`, now);
         let j = i         
         process.send({action:'update', data:{msg, iter:j}});
         //FLOW 3: Worker yields after sending the chunk
         yield
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   //FLOW 6: If no more, worker sends end signal
   process.send({action:'end'});
   console.log('worker done');
}

If we know the exact use case, there could be better ways to program it. This is just one way of synchronizing the child process retaining much of your original source code.

like image 40
manikawnth Avatar answered Sep 24 '22 23:09

manikawnth


If you need to guarantee that a message is received before sending the next one, you can wait for the master to acknowledge receiving. This will delay sending the next message of course, but since your logic relies on both time & iteration number to determine whether to send a message then it may be alright for your case.

The implementation will need each worker to create a promise for each message sent, and wait for a reply from master before resolving the promise. This also means you need to identify which message is acknowledged based on a message id or something unique if you have more than one message or worker simultaneously.

here's the modified code

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', msg =>  {
    console.log(`parent: receive() ${msg.data.length} bytes`, Date.now())
    // reply to the child with the id
    child.send({ type: 'acknowledge', id: msg.id });
});

...

worker.js

const pendingMessageResolves = {};

if (process.send) process.on('message', msg => { 
    if (msg.type === 'acknowledge') {
        // call the stored resolve function
        pendingMessageResolves[msg.id]();
        // remove the function to allow the memory to be freed
        delete pendingMessageResolves[msg.id]
    } else {
        run(msg) 
    }
});

const sendMessageAndWaitForAcknowledge = (msg) => new Promise(resolve => {
    const id = new uuid(); // or any unique field
    process.send({ action:'update', data: msg, id });
    // store a reference to the resolve function
    pendingMessageResolves[id] = resolve;
})

async function run(messageSize) {
    const msg = new Array(messageSize+1).join('x');
    let lastUpdate = Date.now();
    for (let i=0; i<1e7; ++i) {
        const now = Date.now();
        if ((now-lastUpdate)>200 || i%5000==0) {
            console.log(`worker: send()  > ${messageSize} bytes`, now);
            await sendMessageAndWaitForAcknowledge(msg); // wait until master replies
            lastUpdate = Date.now();
        }
        Math.sqrt(Math.random());
    }
    console.log('worker done');
}

p.s. I didn't test the code so it might need some tweaking, but the idea should hold.

like image 37
gafi Avatar answered Sep 26 '22 23:09

gafi


While I agree with others that the optimal solution would be one where the child process can voluntarily give up control at the end of each loop, allowing the buffer flushing processes to run, there is an easy/quick/dirty fix that gets you almost synchronous behavior, and that is to make the child send calls blocking.

Use the same server.js as before, and almost the same worker.js, with just one line added:

worker.js

if (process.send) process.on('message', msg => run(msg));

// cause process.send to block until the message is actually sent                                                                                
process.channel.setBlocking(true);

function run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e6; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.error(`worker: send()  > ${messageSize} bytes`, now);
         process.send({action:'update', data:msg});
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   console.log('worker done');
}

Output:

/123456
worker: send()  > 123456 bytes 1572113820591
worker: send()  > 123456 bytes 1572113820630
parent: receive() 123456 bytes 1572113820629
parent: receive() 123456 bytes 1572113820647
worker: send()  > 123456 bytes 1572113820659
parent: receive() 123456 bytes 1572113820665
worker: send()  > 123456 bytes 1572113820668
parent: receive() 123456 bytes 1572113820678
worker: send()  > 123456 bytes 1572113820678
parent: receive() 123456 bytes 1572113820683
worker: send()  > 123456 bytes 1572113820683
parent: receive() 123456 bytes 1572113820687
worker: send()  > 123456 bytes 1572113820687
worker: send()  > 123456 bytes 1572113820692
parent: receive() 123456 bytes 1572113820692
parent: receive() 123456 bytes 1572113820696
worker: send()  > 123456 bytes 1572113820696
parent: receive() 123456 bytes 1572113820700
worker: send()  > 123456 bytes 1572113820700
parent: receive() 123456 bytes 1572113820703
worker: send()  > 123456 bytes 1572113820703
parent: receive() 123456 bytes 1572113820706
worker: send()  > 123456 bytes 1572113820706
parent: receive() 123456 bytes 1572113820709
worker: send()  > 123456 bytes 1572113820709
parent: receive() 123456 bytes 1572113820713
worker: send()  > 123456 bytes 1572113820714
worker: send()  > 123456 bytes 1572113820721
parent: receive() 123456 bytes 1572113820722
parent: receive() 123456 bytes 1572113820725
worker: send()  > 123456 bytes 1572113820725
parent: receive() 123456 bytes 1572113820727
like image 21
Old Pro Avatar answered Sep 24 '22 23:09

Old Pro