0.0.3 • Published 5 years ago

pg-streams v0.0.3

Weekly downloads
13
License
-
Repository
-
Last release
5 years ago

Postgres Streams

NOTE: The current library is in an early development stage. DO NOT USE IT!

Idea

<...>

Usage

1. Setup PostgreSQL

CREATE TABLE IF NOT EXISTS pg_streams_events (
  "position" bigserial PRIMARY KEY,
  "stream" text NOT NULL,
  "type" text NOT NULL,
  "schemaVersion" serial NOT NULL,
  "data" jsonb NOT NULL,
  "meta" jsonb NOT NULL,
  "createdAt" timestamp NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_pg_streams_events_stream ON pg_streams_events ("stream");

CREATE OR REPLACE FUNCTION pg_streams_notify() RETURNS trigger AS $trigger$
  DECLARE
  BEGIN
    PERFORM pg_notify('pg_streams', 'NEW_EVENT ' || NEW."position" || ' ' || NEW."stream");
    RETURN NEW;
  END;
$trigger$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS pg_streams_insert_trigger ON pg_streams_events;
CREATE TRIGGER pg_streams_insert_trigger AFTER INSERT ON pg_streams_events
FOR EACH ROW EXECUTE PROCEDURE pg_streams_notify();

2. Install NPM module

$ npm i pg pg-streams

3. Producer

const { createProducer } = require('pg-streams');

const producer = createProducer({ pgOptions });

producer.write({
   stream: 'user-account',
   type: 'created',
   schemaVersion: 2,
   data: {
      name: "Bob",
      age: 33,
      sex: undefined
   },
   meta: {
      foo: 'bar'
   }
});

4. Consumer

const { createConsumer } = require('pg-streams');

let position = '0';

const consumer = createConsumer({
    pgOptions,
    streams: ['user-account', 'user-preferences'],
    fromPosition: position,
    handler: async (event) => {
      console.log(event);
      position = event.position
   }
});

consumer.on('error', console.error);
consumer.on('warn', console.log);
consumer.on('info', console.log);

consumer.start();
0.0.3

5 years ago

0.0.2

6 years ago

0.0.1

6 years ago