Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to listen to the completed event in bull queue - just for the current job

When running the code below it prints too many results.

My suspicion is that the completed event listens to all the previous jobs made in the current queue instance.

How can I manage the completed event to listen just to the current job completion?

producer.js

The producer creates a job with a default numerical id and listens to the global completion in order to return a response when the job is done.

const BullQ = require('bull');

let bullQ = BullQ('my-first-queue', {
  redis: {
    host: process.env.REDISHOST || 'localhost',
    password: process.env.REDISPASSWORD || ''
  }
});

app.get('/search/:term', async (req, res) => {
  const job = await bullQ.add({
    searchTerm: req.params.term
  });
  
  // Listen to the global completion of the queue in order to return result.
  bullQ.on('global:completed', (jobId, result) => {
    // Check if id is a number without any additions
    if (/^\d+$/.test(jobId) && !res.headersSent) {
      console.log(`Producer get: Job ${jobId} completed! Result: ${result}`);
      res.json(`Job is completed with result: ${result}`);
    }
  });
});

consumer.js

The consumer has 2 roles.

  1. To consume the jobs as it should be by the book
  2. To create new jobs based on the result of the last job.
const BullQ = require('bull');

let bullQ = BullQ('my-first-queue', {
  redis: {
    host: process.env.REDISHOST || 'localhost',
    password: process.env.REDISPASSWORD || ''
  }
});

bullQ.process((job, done) => {
  // Simulate asynchronous server request.
  setTimeout(async () => {
    // Done the first job and return an answer to the producer after the timeout.
    done(null, `Search result for ${job.data.searchTerm}`);
    // next job run
    if (counter < 10) {
      // For the first run the id is just a number if not changed via the jobId in JobOpts,
      // the next time the job id will be set to {{id}}_next_{{counter}} we need only the first number in order not to end with a long and not clear concatenated string.
      let jobID = (/^\d+$/.test(job.id)) ? job.id : job.id.replace(/[^\d].*/,'');
      await createNextJob(jobID, ++counter);
    }
  }, 100);
});

// Create next job and add it to the queue.
// Listen to the completed jobs (locally)
const createNextJob = async (id, counter) => {
  const nextJob = bullQ.add({
    searchTerm: "Next job"
  }, {
    jobId: `${id}_next_${counter}`
  });

  await bullQ.on('completed', (job, result) => {
    job.finished();
    console.log(`Consumer(next): Job ${job.id} completed! Result: ${result}`);
  });
};
like image 384
Da-Vi Avatar asked Apr 01 '19 11:04

Da-Vi


People also ask

How does bull queue work?

A Queue in Bull generates a handful of events that are useful in many use cases. Events can be local for a given queue instance (a worker), for example, if a job is completed in a given worker a local event will be emitted just for that instance.

Can I use bull without Redis?

Bull uses Redis to persist job data, so you'll need to have Redis installed on your system.

What is BullMQ?

BullMQ is a Node. js library that implements a fast and robust queue system built on top of Redis that helps in resolving many modern age micro-services architectures.


1 Answers

You can await job.finished() to get your result specific to a job object returned by queue.add().

Here's a simple, runnable example without Express to illustrate:

const Queue = require("bull"); // "bull": "^3.22.6"

const sleep = (ms=1000) =>
  new Promise(resolve => setTimeout(resolve, ms))
;

const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
  await sleep(job.data.seconds * 1000); // waste time
  return Promise.resolve(`job ${job.id} complete!`);
});

(async () => {
  const job = await queue.add({seconds: 5});
  const result = await job.finished();
  console.log(result); // => job 42 complete!
  await queue.close();
})();

With Express:

const Queue = require("bull");
const express = require("express");

const sleep = (ms=1000) =>
  new Promise(resolve => setTimeout(resolve, ms))
;

const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
  await sleep(job.data.seconds * 1000);
  return Promise.resolve(`job ${job.id} complete!`);
});

const app = express();
app
  .set("port", process.env.PORT || 5000)
  .get("/", async (req, res) => {
    try {
      const job = await queue.add({
        seconds: Math.abs(+req.query.seconds) || 10,
      });
      const result = await job.finished();
      res.send(result);
    }
    catch (err) {
      res.status(500).send({error: err.message});
    }
  })
  .listen(app.get("port"), () => 
    console.log("app running on port", app.get("port"))
  )
;

Sample run:

$ curl localhost:5000?seconds=2
job 42 complete!
like image 79
ggorlen Avatar answered Nov 02 '22 23:11

ggorlen