@streamerson/core v0.0.66
@streamerson/core
For Doing Cool & Idiomatic Things with Streams, Maybe
Overview
This package was created as a core SDK for my smoother-brained endeavours, some of which is documented in the Streamerson Examples. Give it a read, or skip it-- this package exists on its own and might be helpful.
Everything here is built for internal usage in the rest of the @streamerson
packages. However, those higher-level packages mostly wrap this core code, meaning that if my more use-case-driven tooling (the other monorepo packages) aren't of interest to you, maybe the low-level components here will help you build something too.
The idea for the exported code of this package is essentially to achieve the following:
- wrap Redis clients behind interfaces
- wrap reading/writing logic behind real Streams (Read/Writables)
- provide a set of un-opinionated tools for manipulating these Read/Writables
- instrument some basic utilities and helper functions to distribute to the rest of the monorepo
Table of Contents
Notes
Some quick notes:
- All Typescript
- Meant for
node v20+
- All the current code relies on
redis
at a version higher than6
. - In the future, the
Topic
interface will be extended to work with more than Redis streams probably. I have my sights onAWS:SQS
first, because cheap and fast and Kinesis sucks. - Unfinished -- believe the package version being sub-
1
. - Lots of high-hanging fruit for low-level optimizations around asynchronicity, network connection pooling, ser/des, memory management, potential for hanging promises, etc. This is a fairly crunchy project and I've just been getting to these things as I have time.
Installation
- Install the core SDK in your package of choice:
yarn add @streamerson/core
- Import some stuff, get streamin'. The following example will connect to Redis and begin listening for events with a
type
hello
on a streamTopic
, responding in kind with some JSON:
Example
import {StreamingDataSource, Topic} from '@streamerson/core';
export const readChannel = new StreamingDataSource(/* optional options */);
await readChannel.connect();
for await (const event of readChannel.getReadStream({
// ... optional options, like batch size and timeouts
topic: new Topic('my-example-topic')
})) {
readChannel.logger.info(event, 'Received event!')
// Do something with my streamed event?
// We could even `.pipe()` this event to a Writable.
}
- And you're ... streaming with gas? Of course, this is relying on default connection settings and the existence of a Redis server. In this monorepo, there are tools for starting a Redis Docker image, and there are connection options built into the parameters of all the functions for connecting to a non-defaulted Redis instance.
API
RedisDataSource
- a base implementation of a data-source (any interface capable of)
- client (getter for a client for data connection)
- control (getter for a client for orchestration)
- connect() (connect to the datasource)
- disconnect() (disconnect from the datasource)
StreamingDataSource
- an extension of the RedisDataSource, which implements streaming protocols:
- writeToStream({ ... }) (write to a stream using the data-source)
- getReadStream({ ... }) (get a Readable for a stream)
- getWriteStream({ ... }) (get a Writable for a stream)
- iterateStream({ ... }) (get an Iterable that reads from a stream)
- set() (set a key, for orchestration purposes)
- get() (get a key, for orchestration purposes)
- ... and more
Promise Tracker
- a general purpose utility for using the
await
keyword to cede control until a future event has occurred on a stream. You could generally use.once('event')
, but due to memory management concerns I have exposed a utility with an interface as follows:- tracker.promise('event')
- tracker.cancel('event')
- tracker.cancelAll()
Stream Awaiter
- a general purpose utility for call-and-response along two streams. After a message with a given ID is dispatched to one stream, generate a promise that will resolve when the second stream receives a message with a matching ID, using the methods:
- streamAwaiter.dispatch('some-id') (a promise for a response with 'some-id')
- streamAwaiter.readResponseStream() (begin reading incoming responses)
Utils
- A collection of utilities for internal Streamerson use. Outside of that context, use them at your own peril:
- ids
- guuid() (generate a GUUID)
- keys
- keyGenerator() (generate keys with standard markup for stream & key identifiers)
- shardDecorator() (decorate IDs with a shard for logical partitioning)
- consumerProducerDecorator() (decorate IDs with a consumer group for group partioning)
- stream configuration
- buildStreamConfiguration() _(generate valid configurations for other builders)**
- time
- MS_TO_SECONDS() (converts milliseconds to seconds)
- SECONDS_TO_MS() (converts seconds to milliseconds)
- HOURS_TO_MS() (converts hours to milliseconds)
- topic
- new Topic({ /* options */ }) (creates a Topic, which should probably be a first-class citizen of the core package but for now resides here)
- ids
API Reference
:factory: StreamingDataSource
A remote source capable of retrieving stream records from a Redis instance.
Methods
:gear: writeToStream
A low-level implementation wrapping a Redis Stream Write operation
Method | Type |
---|---|
writeToStream | (outgoingStream: string, incomingStream: string, messageType: MessageType, messageId: string, message: string, sourceId: string, shard?: string) => Promise<string> |
Parameters:
outgoingStream
: The stream ID to target in RedisincomingStream
: Maybe, a stream ID to reply tomessageType
: The type of the eventmessageId
: The ID of the messagemessage
: The message payloadsourceId
: The ID of the sourceshard
: Maybe, the shard to target
:gear: setResponseType
Sets the MessageType
field default for outgoing messages
Method | Type |
---|---|
setResponseType | (type: string) => void |
Parameters:
type
: TheMessageType
for outgoing messages
:gear: addStreamId
Adds a stream to the set for consumption
Method | Type |
---|---|
addStreamId | (streamId: string) => void |
Parameters:
streamId
: the key of the stream to ingest
:gear: hasStreamId
Checks whether a stream is set for consumption
Method | Type |
---|---|
hasStreamId | (streamId: string) => boolean |
Parameters:
streamId
: the key of the stream to check
:gear: removeStreamId
Removes a stream from the set for consumption
Method | Type |
---|---|
removeStreamId | (streamId: string) => void |
Parameters:
streamId
: the key of the stream to remove
:gear: getReadStream
Method | Type |
---|---|
getReadStream | (options: { topic: Topic; shard?: string; } or GetReadStreamOptions) => Readable and { readableObjectMode: true; } |
:gear: getWriteStream
Get a Writable
stream, for which written objects will be written to the remote
Method | Type |
---|---|
getWriteStream | (options: { topic: Topic; shard?: string; } or { stream: string; responseChannel?: string; shard?: string; }) => Writable and { writableObjectMode: true; } |
Parameters:
options
: : The Topic to publish messages to
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago