Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

nest bull separate process for queues and api

I have a nestjs application which is exposing a few REST APIs. One of the APIs triggers a job which processes some tasks. The problem is that when the job gets triggered the application stops serving REST requests which leads to health check failures from load balancer. I followed the method given at the end of the README to start a separate child process for processing jobs. But, the job doesn't start in a child process and the API requests stall.

Here's my Job:

import {
  BullQueueEvents,
  OnQueueActive,
  OnQueueEvent,
  Process,
  Processor,
} from 'nest-bull';
import { Job } from 'bull';
import { Logger } from '@nestjs/common';
import { AService } from './a-service';
import { AJobInterface } from '../AJobInterface';

@Processor({ name: 'a_queue' })
export class AJob {
  private readonly logger = new Logger('AQueue');

  constructor(private readonly service: AService) {}

  @Process({
    name: 'app',
    concurrency: 1
  })
  processApp(job: Job<AJobInterface>) {
    console.log('CHILD: ', process.pid);
    const { jobId } = job.data;
    return this.service.process(jobId);
  }

  @OnQueueActive()
  onActive(job: Job) {
    this.logger.log(
      `Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(
        job.data,
      )}...`,
    );
  }

  @OnQueueEvent(BullQueueEvents.COMPLETED)
  onCompleted(job: Job) {
    this.logger.log(
      `Completed job ${job.id} of type ${job.name} with result ${job.returnvalue}`,
    );
  }
}

Here's my app.module.ts:

import { Module, OnModuleInit } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { DatabaseModule } from './db/module';
import { BullModule } from 'nest-bull';
import { AJob } from './worker/a-job';
import { AService } from './worker/a-service';
import { join } from 'path';

@Module({
  imports: [
    TypeOrmModule.forRoot(),
    DatabaseModule,
    BullModule.register({
      name: 'a_queue',
      processors: [ join(__dirname, 'worker/a-job.js') ],
      options: {
        redis: {
          host: process.env.REDIS_URL || '127.0.0.1',
          port: 6379,
          showFriendlyErrorStack: true,
        },
        settings: {
          lockDuration: 300000,
          stalledInterval: 300000
        },
      },
    }),
  ],
  controllers: [AppController],
  providers: [AppService, AJob, AService],
})
export class AppModule implements OnModuleInit {
  onModuleInit() {
    console.log('MAIN: ', process.pid);
  }
}

Is there anything that I'm doing wrong?

like image 261
Rahul Sharma Avatar asked Dec 02 '19 14:12

Rahul Sharma


People also ask

What is concurrency in Bull queue?

The concurrency factor is a worker option that determines how many jobs are allowed to be processed in parallel. This means that the same worker is able to process several jobs in parallel, however the queue guarantees such as "at-least-once" and order of processing are still preserved.

What is NestJS bull?

Nest provides the @nestjs/bull package as an abstraction/wrapper on top of Bull, a popular, well supported, high performance Node. js based Queue system implementation. The package makes it easy to integrate Bull Queues in a Nest-friendly way to your application.


1 Answers

Sorry for posting the answer so late. It turns out that the setup of having a worker in a child process was not possible. I ended up having a separate worker.module.ts and a separate worker.ts and creating two separate processes for API and worker.

worker.module.ts:

import { Module, OnModuleInit } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppService } from '../app.service';
import { DatabaseModule } from '../db/module';
import { BullModule } from 'nest-bull';
import { AJob } from './a-job';
import { AService } from './a-service';
import { join } from 'path';
import { Job, DoneCallback } from 'bull';

@Module({
  imports: [
    TypeOrmModule.forRoot(),
    DatabaseModule,
    BullModule.register({
      name: 'a_queue',
      processors: [ (job: Job, done: DoneCallback) => { done(null, job.data); } ],
      options: {
        redis: {
          host: process.env.REDIS_URL || '127.0.0.1',
          port: 6379,
          password: process.env.REDIS_PWD,
          showFriendlyErrorStack: true,
        },
        settings: {
          lockDuration: 300000,
          stalledInterval: 300000
        },
      },
    }),
  ],
  providers: [AppService, AJob, AService],
})
export class WorkerModule implements OnModuleInit {
  onModuleInit() {
    console.log('WORKER: ', process.pid);
  }
}

worker.ts:

import { NestFactory } from '@nestjs/core';
import { WorkerModule } from './worker/worker.module';

async function bootstrap() {
  const app = await NestFactory.create(WorkerModule);
  app.init();
}

bootstrap();

While app.module.ts now looks like this:

//...imports
@Module({
  imports: [
    TypeOrmModule.forRoot(),
    DatabaseModule,
    BullModule.register({
      name: 'a_queue',
      processors: [ ],
      options: {
        redis: {
          host: process.env.REDIS_URL || '127.0.0.1',
          port: 6379,
          showFriendlyErrorStack: true,
        },
      },
    }),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule implements OnModuleInit {
  onModuleInit() {
    console.log('MAIN: ', process.pid);
  }
}

and the corresponding app.ts:

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { port } from './config';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.enableCors();
  await app.listen(port);
}

bootstrap();
like image 146
Rahul Sharma Avatar answered Oct 17 '22 06:10

Rahul Sharma