3.2.3 • Published 1 year ago

redis-x-stream v3.2.3

Weekly downloads
22
License
MIT
Repository
github
Last release
1 year ago

redis-x-stream

Create async iterables that emit redis stream entries. Requires Redis 5 or greater.

release license

test

Getting Started

import { RedisStream } from 'redis-x-stream'
import Redis from 'ioredis'

const myStream = 'my-stream'
await populate(myStream, 1e5)

let i = 0
for await (const [streamName, [id, keyvals]] of new RedisStream(myStream)) {
  i++;
}
console.log(`read ${i} stream entries from ${myStream}`)

async function populate(stream, count) {
  const writer = new Redis({ enableAutoPipelining: true })
  await Promise.all(
    Array.from(Array(count), (_, j) => writer.xadd(stream, '*', 'index', j))
  )
  writer.quit()
  await new Promise(resolve => writer.once('close', resolve))
  console.log(`wrote ${count} stream entries to ${stream}`)
}

Usage

See the API Docs for available options.

Advanced Usage

Task Processing

If you have a cluster of processes reading redis stream entries you likely want to utilize redis consumer groups

A task processing application may look like the following:

const control = {
  /* some control event emitter */
}
const stream = new RedisStream({
  streams: ['my-stream'],
  group: 'my-group',
  //eg. k8s StatefulSet hostname. or Cloud Foundry instance index
  consumer: 'tpc_' + process.env.SOME_ORDINAL_IDENTIFIER,
  block: Infinity,
  count: 10,
  deleteOnAck: true,
})
const lock = new Semaphore(11)
const release = lock.release.bind(lock)

control.on('new-source', (streamName) => {
  //Add an additional source stream to a blocked stream.
  stream.addStream(streamName)
})
control.on('shutdown', async () => {
  //drain will process all claimed entries (the PEL) and stop iteration
  await stream.drain()
})

async function tryTask(stream, streamName, id, entry) {
  //...process entry...
  stream.ack(streamName, id)
}

for await (const [streamName, [id, keyvals]] of stream) {
  await lock.acquire()
  void tryTask(stream, streamName, id, keyvals).finally(release)
}
1.4.1

2 years ago

1.4.0

2 years ago

1.3.1

2 years ago

1.3.0

2 years ago

2.0.3

1 year ago

2.0.2

1 year ago

2.1.0

1 year ago

2.0.1

2 years ago

2.0.0

2 years ago

3.2.2

1 year ago

3.1.3

1 year ago

3.2.1

1 year ago

3.1.2

1 year ago

3.2.0

1 year ago

3.1.1

1 year ago

3.1.0

1 year ago

3.1.5

1 year ago

3.2.3

1 year ago

3.1.4

1 year ago

3.0.0

1 year ago

1.2.1

3 years ago

1.2.0

3 years ago

1.1.1

3 years ago

1.1.0

3 years ago

1.0.0

3 years ago

1.0.0-alpha.4

3 years ago

1.0.0-alpha.2

3 years ago

1.0.0-alpha.1

3 years ago