0.1.7 • Published 6 months ago

@markormesher/x-to-mqtt v0.1.7

Weekly downloads
-
License
AGPL-3.0
Repository
github
Last release
6 months ago

CircleCI npm

X to MQTT

:wave: Arrived here from a something-to-MQTT project? Jump to configuration.


This is the base library for various something-to-MQTT projects, each of which works as an adapter from some upstream service to MQTT.

It provides a wrapper around an MQTT client to reduce boilerplate code in each individual project, as well as a handful of useful utils for things like reading environment variables and logging.

Configuration

Configuration for projects that consume this library is controlled by the following environment variables:

Variable NameDescription
TOPIC_PREFIXTopic prefix for emitted messages. Non-optional.
MQTT_HOSTMQTT server host name, not including mqtt://. Non-optional.
MQTT_PORTMQTT server port. Optional, default 1883.
MQTT_USERNAMEMQTT server username. Optional, default no auth.
MQTT_PASSWORDMQTT server password. Optional, default no auth.
UPDATE_INTERVAL_SECONDSUpdate interval to query the upstream source. Optional, default depends on the parent project, may be unused.

Each of the environment variables can be suffixed with _FILE to read the value from a file, e.g. MQTT_PASSWORD_FILE=/run/secrets/mqtt-password.

Features

MQTT Client Wrapper

This library handles reading the configuration above and connecting to the MQTT broker, then provides convenient methods for publishing messages and updating the health status (see below).

Publishing

Publishing messages couldn't be easier - just call .publish() with the topic and message. The user-configured topic prefix will be added automatically.

const mqttWrapper = new XToMqtt();
mqttWrapper.publish("topic/foo/bar", "Hello world!");

Subscribing

Messages can be subcribed to with the .subscribe() method, passing in the topic pattern and a listener callback. The topic pattern can use the usual + and # wildcards supported by MQTT.

Note that the topic prefix is not included in the subscription pattern, allowing you to listen to topics outside of the tree you publish to. The topic prefix is exposed via .getTopicPrefix(), as shown below.

Subscriptions are not allowed until the MQTT client has connected, so it is advisable to subscribe to topics inside the onConnect handler, as shown below.

const mqttWrapper = new XToMqtt({
  onConnect: () => {
    mqttWrapper.subscribe(`${mqttWrapper.getTopicPrefix()}/command/#`, (topic, message) => {
      logger.info("Command message received", { topic, message });
    });
  },
});

Standardised Status Publishing

Every project that consumes this library publishes its status on two standardised topics:

  • ${topicPrefix}/_meta/last_seen - ISO 8601 timestamp of the last update from the upstream source
  • ${topicPrefix}/_meta/upstream_status - status of the upstream source, always either okay, errored or unknown.

Both of these values can be updated directly via the library or may be updated automatically when other messages are published, depending on the settings used by each project (see this library's type definitions for all settings).

Other topics may be published under the _meta topic in the future.

Repeating Update Runner

Many of the something-to-MQTT projects work by querying some upstream source on a given interval then publishing results. To further reduce boilerplate code, this library includes a utility to set up recurring updates that use the interval configured above.

Note: the update runner deliberately does not try to catch errors; if the update function throws an error the process will likely exit. This can be helpful if you want your programme to be restarted on error and you're using something like Kubernetes that will do that for you, but if you don't want the execution process to exit then you will need to catch and handle any potential errors.

Example

Below is a minimal example of using this library. Note that not all settings are shown - see the source code and type definitions for all options.

import { XToMqtt, logger, registerRepeatingUpdate } from "x-to-mqtt";

const mqttWrapper = new XToMqtt({
  // these are the default settings, so you could skipt this
  updateLastSeenOnPublish: true,
  updateUpstreamStatusOnPublish: true,

  // onConnect is optional
  onConnect: () => {
    mqttWrapper.subscribe("some/topic/+/set", (topic, message) => {
      logger.info("Message received");
    });
  },
});

registerRepeatingUpdate({ defaultIntervalSeconds: 3600 }, () => {
  logger.info("Getting new result...");
  getResultFromSomewhere()
    .then((result) => {
        logger.info("Got result");
        mqttWrapper.publish("key", result);
     })
    .catch((error) => {
      logger.error("Failed to get result", { error });
      mqttWrapper.updateUpstreamStatus("errored");
    });
});
0.1.7

6 months ago

0.1.6

8 months ago

0.1.5

8 months ago

0.1.4

8 months ago

0.1.2

8 months ago

0.1.1

8 months ago