I have a nestjs
application which is exposing a few REST API
s. One of the API
s triggers a job which processes some tasks. The problem is that when the job gets triggered the application stops serving REST requests which leads to health check failures from load balancer. I followed the method given at the end of the README to start a separate child process for processing jobs. But, the job doesn't start in a child process and the API
requests stall.
Here's my Job:
import {
BullQueueEvents,
OnQueueActive,
OnQueueEvent,
Process,
Processor,
} from 'nest-bull';
import { Job } from 'bull';
import { Logger } from '@nestjs/common';
import { AService } from './a-service';
import { AJobInterface } from '../AJobInterface';
@Processor({ name: 'a_queue' })
export class AJob {
private readonly logger = new Logger('AQueue');
constructor(private readonly service: AService) {}
@Process({
name: 'app',
concurrency: 1
})
processApp(job: Job<AJobInterface>) {
console.log('CHILD: ', process.pid);
const { jobId } = job.data;
return this.service.process(jobId);
}
@OnQueueActive()
onActive(job: Job) {
this.logger.log(
`Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(
job.data,
)}...`,
);
}
@OnQueueEvent(BullQueueEvents.COMPLETED)
onCompleted(job: Job) {
this.logger.log(
`Completed job ${job.id} of type ${job.name} with result ${job.returnvalue}`,
);
}
}
Here's my app.module.ts:
import { Module, OnModuleInit } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { DatabaseModule } from './db/module';
import { BullModule } from 'nest-bull';
import { AJob } from './worker/a-job';
import { AService } from './worker/a-service';
import { join } from 'path';
@Module({
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register({
name: 'a_queue',
processors: [ join(__dirname, 'worker/a-job.js') ],
options: {
redis: {
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
showFriendlyErrorStack: true,
},
settings: {
lockDuration: 300000,
stalledInterval: 300000
},
},
}),
],
controllers: [AppController],
providers: [AppService, AJob, AService],
})
export class AppModule implements OnModuleInit {
onModuleInit() {
console.log('MAIN: ', process.pid);
}
}
Is there anything that I'm doing wrong?
The concurrency factor is a worker option that determines how many jobs are allowed to be processed in parallel. This means that the same worker is able to process several jobs in parallel, however the queue guarantees such as "at-least-once" and order of processing are still preserved.
Nest provides the @nestjs/bull package as an abstraction/wrapper on top of Bull, a popular, well supported, high performance Node. js based Queue system implementation. The package makes it easy to integrate Bull Queues in a Nest-friendly way to your application.
Sorry for posting the answer so late. It turns out that the setup of having a worker in a child process was not possible. I ended up having a separate worker.module.ts
and a separate worker.ts
and creating two separate processes for API and worker.
worker.module.ts
:
import { Module, OnModuleInit } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppService } from '../app.service';
import { DatabaseModule } from '../db/module';
import { BullModule } from 'nest-bull';
import { AJob } from './a-job';
import { AService } from './a-service';
import { join } from 'path';
import { Job, DoneCallback } from 'bull';
@Module({
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register({
name: 'a_queue',
processors: [ (job: Job, done: DoneCallback) => { done(null, job.data); } ],
options: {
redis: {
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
password: process.env.REDIS_PWD,
showFriendlyErrorStack: true,
},
settings: {
lockDuration: 300000,
stalledInterval: 300000
},
},
}),
],
providers: [AppService, AJob, AService],
})
export class WorkerModule implements OnModuleInit {
onModuleInit() {
console.log('WORKER: ', process.pid);
}
}
worker.ts
:
import { NestFactory } from '@nestjs/core';
import { WorkerModule } from './worker/worker.module';
async function bootstrap() {
const app = await NestFactory.create(WorkerModule);
app.init();
}
bootstrap();
While app.module.ts
now looks like this:
//...imports
@Module({
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register({
name: 'a_queue',
processors: [ ],
options: {
redis: {
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
showFriendlyErrorStack: true,
},
},
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule implements OnModuleInit {
onModuleInit() {
console.log('MAIN: ', process.pid);
}
}
and the corresponding app.ts
:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { port } from './config';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.enableCors();
await app.listen(port);
}
bootstrap();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With