I'm trying to setup a MQTT Microservice using NestJS according to the docs.
I've started a working Mosquitto Broker using Docker and verified it's operability using various MQTT clients. Now, when I start the NestJS service it seems to be connecting correctly (mqqt.fx shows new client), yet I am unable to receive any messages in my controllers. This is my bootstrapping, just like in the docs:
main.ts
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
host: 'localhost',
port: 1883,
protocol: 'tcp'
}
});
app.listen(() => console.log('Microservice is listening'));
}
bootstrap();
app.controller.ts
@Controller()
export class AppController {
@MessagePattern('mytopic') // tried {cmd:'mytopic'} or {topic:'mytopic'}
root(msg: Buffer) {
console.log('received: ', msg)
}
}
Am I using the message-pattern decorator wrongly or is my concept wrong of what a NestJS MQTT microservice even is supposed to do? I thought it might subscribe to the topic I pass to the decorator. My only other source of information being the corresponding unit tests
Maybe you can clarify the difference between the Nest microservice which connects to a MQTT broker and a ClientProxy which can also be connected to a microservice. My current understanding is that the Nest microservice connection will be used for listening on all messagepatterns in my controllers.
A communication system built on MQTT consists of the publishing server, a broker and one or more clients. It is designed for constrained devices and low-bandwidth, high-latency or unreliable networks. To start building MQTT-based microservices, first install the required package:
Publish to topic Now lets publish our message or payload to any topic dynamically, for this you have to create a publish (topic:string,payload:string) method outside of onModuleInit () Now you can inject this mqttService to any module and use its publish method to publish your payload to any topic.
MQTT (Message Queuing Telemetry Transport) is an open source, lightweight messaging protocol, optimized for low latency. This protocol provides a scalable and cost-efficient way to connect devices using a publish/subscribe model. A communication system built on MQTT consists of the publishing server, a broker and one or more clients.
On nest.js side we have the following pattern handler:
@MessagePattern('sum')
sum(data: number[]): number {
return data.reduce((a, b) => a + b, 0);
}
As @Alexandre explained, this will actually listen to sum_ack
.
A non-nest.js client could look like this (just save as client.js, run npm install mqtt
and run the program with node client.js
):
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://localhost:1883')
client.on('connect', function () {
client.subscribe('sum_res', function (err) {
if (!err) {
client.publish('sum_ack', '{"data": [2, 3]}');
}
})
})
client.on('message', function (topic, message) {
console.log(message.toString())
client.end()
})
It sends a message on the topic sum_ack
and listens to messages on sum_res
. When it receives a message on sum_res
, it logs the message and ends the program. nest.js expects the message format to be {data: myData}
and then call the param handler sum(myData)
.
// Log:
{"err":null,"response":5} // This is the response from sum()
{"isDisposed":true} // Internal "complete event" (according to unit test)
Of course, this is not very convenient...
That is because this is meant to be used with another nest.js client rather than a normal mqtt client. The nest.js client abstracts all the internal logic away. See this answer, which describes the client for redis (only two lines need to be changed for mqtt).
async onModuleInit() {
await this.client.connect();
// no 'sum_ack' or {data: [0, 2, 3]} needed
this.client.send('sum', [0, 2, 3]).toPromise();
}
The documentation is not very clear, but it seem that for mqtt if you have @MessagePattern('mytopic')
you can publish a command on the topic mytopic_ack
and you will get response on mytopic_res
. I am still trying to find out how to publish to the mqtt broker from a service.
See https://github.com/nestjs/nest/blob/e019afa472c432ffe9e7330dc786539221652412/packages/microservices/server/server-mqtt.ts#L99
public getAckQueueName(pattern: string): string {
return `${pattern}_ack`;
}
public getResQueueName(pattern: string): string {
return `${pattern}_res`;
}
@Tanas is right. Nestjs/Microservice now listens to your $[topic] and answer to $[topic]/reply. The postfix _ack and _res are deprecated.
For example:
@MessagePattern('helloWorld')
getHello(): string {
console.log("hello world")
return this.appService.getHello();
}
Listens now on Topic: helloWorld
Replies now on Topic helloWorld/reply
Regarding ID
You should also provide an id within the payload (See @Hakier) and Nestjs will reply with an answer, containing your id. If you don't have any id, there still won't be any reply but the corresponding logic will still trigger.
For example (Using the snipped from above):
your msg:
{"data":"foo","id":"bar"}
Nestjs reply:
{"response":"Hello World!","isDisposed":true,"id":"bar"}
Without ID:
your message:
{"data":"foo"} or {}
No reply but Hello World in Terminal
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With