3.3.5 • Published 11 days ago

@jetit/publisher v3.3.5

Weekly downloads
-
License
-
Repository
-
Last release
11 days ago

publisher

publisher is a library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a simple and scalable mechanism for publishing and consuming events in real-time, and supports features such as message deduplication, consumer group management, and scheduled event publishing.

IMPORTANT NOTE

This project currently does not have a means to clean up inactive consumers. This means that if you have a consumer that is no longer active, it will continue to receive events until it is removed from the consumer group. This is a known issue and will be addressed in a future release. A workaround is to use the following code to remove inactive consumers from the consumer group as part of your process cleanup:

const ioredis = require(`ioredis`);

const env = process.env;

async function bootstrap() {
    const connection = new ioredis.Redis({
        host: env.REDIS_HOST,
        port: parseInt(env.REDIS_PORT),
    });
    console.log(`Redis Connection Status ${connection.status}`);
    await waitForSeconds(0.3);
    console.log(`Redis Connection Status ${connection.status}`);
    const instanceUniqueId = env.INSTANCE_ID;
    if (!instanceUniqueId) {
        console.log(`Unique instance ID is not available`);
        return;
    }
    console.log(`Instance Unique ID : ${instanceUniqueId}`);
    const consumerGroupName = await getConsumerGroupName(connection, instanceUniqueId);
    if (!consumerGroupName) {
        console.log(`Consumer is not available, so graceful shutdown is not necessary.`);
        return;
    }
    console.log(`Consumer Group Name : ${consumerGroupName}`);
    const instanceId = getInstanceId(consumerGroupName.slice(3), instanceUniqueId);
    console.log(`Instance ID : ${instanceId}`);
    const subscribedEvents = await getAllEventsForInstance(connection, instanceId);
    console.log(`Subscribed Events : ${JSON.stringify(subscribedEvents)}`);
    await clearSubscribedEvents(connection, consumerGroupName, instanceId, subscribedEvents);
    await deleteConsumerGroupNameForInstance(connection, instanceId);
    await deleteAllEventsFroInstance(connection, instanceId);
}

/**
 *
 * @param {ioredis.Redis|ioredis.Cluster} connection
 * @param {string} consumerGroupName
 * @param {string} instanceId
 * @param {Array<string>} events
 */
async function clearSubscribedEvents(connection, consumerGroupName, instanceId, events) {
    return Promise.all(
        events.map(async (eventName) => {
            console.log(`${eventName} is being cleared in publisher`);
            const streamName = `${eventName}:${consumerGroupName}`;
            console.log(`${streamName} is being removed.`);
            await connection.srem(`${eventName}`, consumerGroupName);
            console.log(`${eventName} is removed from ${consumerGroupName}`);
            // Releasing all claims based on info from: https://redis.io/commands/xgroup-delconsumer/
            await releaseAllClaims(connection, streamName, consumerGroupName, instanceId);
            console.log(`${eventName} removes all claims`);
            await connection.xgroup(`DELCONSUMER`, streamName, consumerGroupName, instanceId);
            console.log(`${eventName} is deleted as a consumer from ${consumerGroupName}, ${instanceId}`);
        })
    );
}

/**
 *
 * @param {ioredis.Redis|ioredis.Cluster} connection
 * @param {string} streamName
 * @param {string} consumerGroupName
 * @param {string} instanceId
 */
async function releaseAllClaims(connection, streamName, consumerGroupName, instanceId) {
    /**
     * Retrieve the pending messages for the consumer. Note this only fetches the last
     * 10000 events assigned to this consumer. This function has been modified to make sure
     * that there is a temp instance that claims all this messages
     */
    const pendingMessages = await connection.xpending(streamName, consumerGroupName, `-`, `+`, 10000, instanceId);

    if (pendingMessages && pendingMessages.length > 0) {
        console.log(`${pendingMessages.length} messages to clean up.`);
        const transaction = connection.multi({ pipeline: true });
        const tempConsumerId = `${consumerGroupName}-temp`;
        for (const [messageId] of pendingMessages) {
            transaction.xclaim(streamName, consumerGroupName, tempConsumerId, 10, messageId);
        }
        await transaction.exec();
    }
}

/**
 *
 * @param {string} serviceName
 * @param {string} instanceUniqueId
 */
function getInstanceId(serviceName, instanceUniqueId) {
    const instanceId = `${serviceName}:${instanceUniqueId}`;
    console.log(`Generated Instance ID : ${instanceId}`);
    return instanceId;
}

/**
 *
 * @param {ioredis.Redis|ioredis.Cluster} serviceName
 * @param {string} instanceId
 * @returns {Promise<string>} consumer group name
 */
async function getConsumerGroupName(connection, instanceId) {
    const key = `instance:${instanceId}:consumerGroupName`;
    console.log(`Get consumer group name called for key : ${key}`);
    return await connection.get(key);
}

/**
 *
 * @param {ioredis.Redis|ioredis.Cluster} connection
 * @param {string} instanceId
 * @returns
 */
async function deleteConsumerGroupNameForInstance(connection, instanceId) {
    return await connection.del(`instance:${instanceId}:consumerGroupName`);
}

/**
 *
 * @param {ioredis.Redis|ioredis.Cluster} connection
 * @param {string} instanceId
 * @returns {Promise<Array<events>>} subscribed events for this instance
 */
async function getAllEventsForInstance(connection, instanceId) {
    const key = `instance:${instanceId}:subscribedEvents`;
    console.log(`Get consumer group events : ${key}`);
    return (await connection.sscan(key, 0))[1];
}

/**
 *
 * @param {ioredis.Redis|ioredis.Cluster} connection
 * @param {string} instanceId
 */
async function deleteAllEventsFroInstance(connection, instanceId) {
    return await connection.del(`instance:${instanceId}:subscribedEvents`);
}

async function waitForSeconds(seconds = 10) {
    return new Promise((res, _) => setTimeout(() => res(), seconds * 1000));
}

/**
 * Start
 */

bootstrap()
    .then(() => process.exit(0))
    .catch((e) => {
        console.error(e);
        process.exit(1);
    });

Simple Example

import { Publisher, EventData } from '@jetit/streams';

// Create an instance of the publisher
const streams = new Streams('Websockets');

// Publish an event
const eventData: EventData<{ message: string }> = {
    eventName: 'my-event',
    data: { message: 'Hello, world!' }
};

await streams.publish(eventData);

// Subscribe to an event
streams.listen('my-event').subscribe(event => {
    console.log(`Received event: ${event.eventName}`, event.data);
});

Possible use cases

  1. Microservices communication: If your system is composed of multiple microservices, the publisher can be used to facilitate communication between them by publishing and listening to events.

  2. Event sourcing and CQRS: In an event-sourced system, the publisher can be used to store and process events that represent the state changes of the system, enabling Command Query Responsibility Segregation (CQRS) by separating the read and write models.

  3. Task queues: The publisher can be used to create task queues for distributing workloads among different worker instances, ensuring that tasks are processed in the order they were created.

  4. Data streaming and processing: The publisher can be used to ingest and process large volumes of data in real-time, such as log files, clickstream data, or other event-based data.

  5. Distributed system coordination: In a distributed system, the publisher can be used for coordination between different components, such as managing leader election or maintaining configuration information.

  6. Real-time analytics and monitoring: The publisher can be used to collect and process real-time analytics data, such as user behavior, application performance metrics, or system monitoring information.

  7. Event-driven workflows: You can use the publisher to create event-driven workflows, where each step in the workflow is triggered by the completion of a previous step. This can be useful for orchestrating complex, multi-step processes.

  8. Message broadcasting: The publisher can be used to broadcast messages to multiple consumers or subscribers, allowing for efficient and scalable communication in applications with many components or services.

  9. Multicast Publishing: This is the existing PUB/SUB implementation but with the event data being stored into streams for additional processing

3.3.5

11 days ago

3.3.1

7 months ago

3.3.0

7 months ago

3.2.1

8 months ago

3.2.0

8 months ago

3.1.0

9 months ago

3.3.3

7 months ago

3.3.2

7 months ago

3.0.0

9 months ago

1.2.0

1 year ago

1.1.1

1 year ago

1.7.3

11 months ago

1.7.2

11 months ago

1.7.1

12 months ago

1.6.2

12 months ago

1.7.0

12 months ago

1.6.1

12 months ago

1.6.0

12 months ago

1.3.3

1 year ago

1.5.0

1 year ago

1.3.2

1 year ago

1.1.3

1 year ago

1.3.0

1 year ago

1.1.2

1 year ago

2.0.2

11 months ago

2.0.1

11 months ago

2.0.0

11 months ago

1.1.0

1 year ago

1.0.9

1 year ago

1.0.8

1 year ago

1.0.7

1 year ago

1.0.6

1 year ago

1.0.5

1 year ago

1.0.4

1 year ago

1.0.3

1 year ago

1.0.2

1 year ago

1.0.1

1 year ago

1.0.0

1 year ago