1.0.8 • Published 1 year ago
cavi-mqtt v1.0.8
cavi-mqtt
Add this to your main.ts file to listen for MQTT messages
const mqttService = await CaviMqttMicroservice.create(MqttModule, {
...mqttConfig,
logger: WinstonModule.createLogger({
exitOnError: false,
level: process.env.LOG_LEVEL,
format: winston.format.combine(
winston.format.label({ label: 'NOTIFICATION_SERVICE' }),
winston.format.timestamp({
format: () => {
return new Date().toISOString().split('.')[0] + 'Z';
},
alias: 'time',
}),
winston.format.json(),
),
transports: [new winston.transports.Console()],
}),
});
mqttService.listen();
You can then use the following method decorators in your controller (docs: https://docs.nestjs.com/microservices/basics)
import { MqttBaseController } from 'cavi-mqtt';
@Controller()
export class MyMqttController extends MqttBaseController { // - important for error handling and logging
@MessagePattern('pattern')
myMessagePatternMethod(@Payload('data') data) {
@EventPattern('pattern')
myEventPatternMethod(@Payload('data') data) {
Add this to your app.module.ts file to push messages to MQTT
CaviMqttModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
microserviceName: MICROSERVICE_NAME,
mqttOptions: {
transport: Transport.MQTT,
options: {
url: `${configService.get<string>('mqtt.protocol')}://${configService.get<string>(
'mqtt.host',
)}:${configService.get<number>('mqtt.port')}`,
password: configService.get<string | null>('mqtt.password'),
username: configService.get<string | null>('mqtt.username'),
clean: configService.get<number>('mqtt.clean') === 1,
clientId: configService.get<string | null>('mqtt.clientId'),
subscribeOptions: {
qos: 2,
},
},
},
}),
}),
To emit
or send
use the CaviMqttService (docs: https://docs.nestjs.com/microservices/basics)
import { CaviMqttService } from 'cavi-mqtt';
constructor(
private readonly caviMqttService: CaviMqttService,
)
this.caviMqttService.emit(topic, message);
this.caviMqttService.send(pattern, data);
Communication example of message pattern and shared subscription through MQTT
// producer
create(dto: PluginTypeDto): Promise<any> {
return this.mqttService.send(`${MQTT_NAMESPACE}/equipments/plugintypes/post`, dto);
}
export const PLUGIN_TYPES_MESSAGE_PATTERN_POST = `$share/${MQTT_SHARED_SUBSCRIPTION_GROUP}/${MQTT_NAMESPACE}/equipments/plugintypes/post/+`; // with shared subscription
// export const PLUGIN_TYPES_MESSAGE_PATTERN_POST = `${MQTT_NAMESPACE}/equipments/plugintypes/post`; - without shared subscription
// consumer
@MessagePattern(PLUGIN_TYPES_MESSAGE_PATTERN_POST)
create(@Payload('data') dto: PluginTypeDto): Promise<PluginType> {
return this.pluginTypesService.create(dto); // this is sent as a response to the PLUGIN_TYPES_MESSAGE_PATTERN_POST + '/reply' topic
}