Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

nodejs - Sending back a stream from a worker thread to the main thread

I had been trying to separate some work that's done in my program in a different thread. One of the functions needs to return a stream to the main thread but I'm having the following exception:

Error
    at MessagePort.<anonymous> ([worker eval]:12:16)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)
From previous event:
    at PoolWorker.work (node_modules/node-worker-threads-pool/src/pool-worker.js:22:12)
    at DynamicPool.runTask (node_modules/node-worker-threads-pool/src/pool.js:110:47)
    at DynamicPool.exec (node_modules/node-worker-threads-pool/src/dynamic-pool.js:51:17)
    at renderToPdf (src/modules/templates/render2.js:27:14)
    at Context.<anonymous> (test/modules/templates/render.test.js:185:68)

I tried to construct a minimal example to reproduce what I'm trying to achieve. Basically, what I need is to send back a readable stream to the main thread. In this example, I'm also having a exception:

To have a pool of worker threads I'm using the library node-worker-threads-pool the DynamicPool specifically. And inside I'm trying to convert html to a PDF. But I need to somehow return the stream to the main thread.

const os = require('os');
const { DynamicPool } = require('node-worker-threads-pool');

const Pool = new DynamicPool(os.cpus().length);

async function convertToPDF(html) {
  return await Pool.exec({
    task: function() {
      const Promise = require('bluebird');
      const pdf = require('html-pdf');

      const { html } = this.workerData;

      const htmlToPdf = (html, renderOptions) => {
        const options = {
          format: 'Letter',
        };
        return pdf.create(html, Object.assign(options, renderOptions || {}));
      };

      return Promise.fromNode((cb) => htmlToPdf(html, {}).toStream(cb));
    },
    workerData: {
      html,
    },
  });
}

convertToPDF('<div>Hello World!</div>')
  .then((resp) => console.log('resp', resp))
  .catch((err) => console.error('err', err));
err DataCloneError: function() {
    if (this.autoClose) {
      this.destroy();
    }
  } could not be cloned.
    at MessagePort.<anonymous> ([worker eval]:12:16)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)

Do you have an idea of how can I achieve this?

PS: I'm aware that the IO operations are not as performant in the worker threads are they are in the nodejs main thread, but I need to do this to avoid locking the main thread with these operations.

like image 476
fingerprints Avatar asked Jun 16 '20 18:06

fingerprints


People also ask

What is stream passthrough in NodeJS?

This sort of stream is a trivial implementation of a Transform stream, which simply passes received input bytes through to an output stream. This is useful if one doesn't require any transformation of the input data, and simply wants to easily pipe a Readable stream to a Writable stream.

Is NodeJS synchronous or asynchronous?

NodeJS is an asynchronous event-driven JavaScript runtime environment designed to build scalable network applications. Asynchronous here refers to all those functions in JavaScript that are processed in the background without blocking any other request.

How does worker thread work in node JS?

Instead, the worker threads approach allows applications to use several isolated JavaScript workers, with Node providing communication between workers and the parent worker. Each worker in Node. js has its own V8 and Event Loop instance. Workers, unlike children's processes, can exchange memory.


1 Answers

Short version: you can't.

IPC in node is handled through some black box, but what we know is that message objects are serialized before sending and deserialized once received: you can't serialize a Stream because it is based on underling level (a socket, a file descriptor, custom read and write functions, etc) which can't be serialized/deserialized.

So you are forced to exchange serializable data.

Taking a look at html-pdf I think an easy way to convert your program is to use pdf.toBuffer: rather than trying to send a Stream to main thread and reading it in main thread to obtain a Buffer, you should send a Buffer to main thread and than use it as is.

Hope this helps.

like image 162
Daniele Ricci Avatar answered Sep 21 '22 11:09

Daniele Ricci