0.0.66 • Published 10 months ago

@streamerson/core v0.0.66

Weekly downloads
-
License
LAMC
Repository
github
Last release
10 months ago

@streamerson/core

For Doing Cool & Idiomatic Things with Streams, Maybe

Overview

This package was created as a core SDK for my smoother-brained endeavours, some of which is documented in the Streamerson Examples. Give it a read, or skip it-- this package exists on its own and might be helpful.

Everything here is built for internal usage in the rest of the @streamerson packages. However, those higher-level packages mostly wrap this core code, meaning that if my more use-case-driven tooling (the other monorepo packages) aren't of interest to you, maybe the low-level components here will help you build something too.

The idea for the exported code of this package is essentially to achieve the following:

  • wrap Redis clients behind interfaces
  • wrap reading/writing logic behind real Streams (Read/Writables)
  • provide a set of un-opinionated tools for manipulating these Read/Writables
  • instrument some basic utilities and helper functions to distribute to the rest of the monorepo

Table of Contents

Notes

Some quick notes:

  • All Typescript
  • Meant for node v20+
  • All the current code relies on redis at a version higher than 6.
  • In the future, the Topic interface will be extended to work with more than Redis streams probably. I have my sights on AWS:SQS first, because cheap and fast and Kinesis sucks.
  • Unfinished -- believe the package version being sub-1.
  • Lots of high-hanging fruit for low-level optimizations around asynchronicity, network connection pooling, ser/des, memory management, potential for hanging promises, etc. This is a fairly crunchy project and I've just been getting to these things as I have time.

Installation

  • Install the core SDK in your package of choice:
yarn add @streamerson/core
  • Import some stuff, get streamin'. The following example will connect to Redis and begin listening for events with a type hello on a stream Topic, responding in kind with some JSON:

Example

readable-stream.example.ts

import {StreamingDataSource, Topic} from '@streamerson/core';

export const readChannel = new StreamingDataSource(/* optional options */);

await readChannel.connect();

for await (const event of readChannel.getReadStream({
    // ... optional options, like batch size and timeouts
    topic: new Topic('my-example-topic')
})) {
    readChannel.logger.info(event, 'Received event!')
    // Do something with my streamed event?
    // We could even `.pipe()` this event to a Writable.
}
  • And you're ... streaming with gas? Of course, this is relying on default connection settings and the existence of a Redis server. In this monorepo, there are tools for starting a Redis Docker image, and there are connection options built into the parameters of all the functions for connecting to a non-defaulted Redis instance.

API

RedisDataSource

  • a base implementation of a data-source (any interface capable of)
    • client (getter for a client for data connection)
    • control (getter for a client for orchestration)
    • connect() (connect to the datasource)
    • disconnect() (disconnect from the datasource)

StreamingDataSource

  • an extension of the RedisDataSource, which implements streaming protocols:
    • writeToStream({ ... }) (write to a stream using the data-source)
    • getReadStream({ ... }) (get a Readable for a stream)
    • getWriteStream({ ... }) (get a Writable for a stream)
    • iterateStream({ ... }) (get an Iterable that reads from a stream)
    • set() (set a key, for orchestration purposes)
    • get() (get a key, for orchestration purposes)
    • ... and more

Promise Tracker

  • a general purpose utility for using the await keyword to cede control until a future event has occurred on a stream. You could generally use .once('event'), but due to memory management concerns I have exposed a utility with an interface as follows:
    • tracker.promise('event')
    • tracker.cancel('event')
    • tracker.cancelAll()

Stream Awaiter

  • a general purpose utility for call-and-response along two streams. After a message with a given ID is dispatched to one stream, generate a promise that will resolve when the second stream receives a message with a matching ID, using the methods:
    • streamAwaiter.dispatch('some-id') (a promise for a response with 'some-id')
    • streamAwaiter.readResponseStream() (begin reading incoming responses)

Utils

  • A collection of utilities for internal Streamerson use. Outside of that context, use them at your own peril:
    • ids
      • guuid() (generate a GUUID)
    • keys
      • keyGenerator() (generate keys with standard markup for stream & key identifiers)
      • shardDecorator() (decorate IDs with a shard for logical partitioning)
      • consumerProducerDecorator() (decorate IDs with a consumer group for group partioning)
    • stream configuration
      • buildStreamConfiguration() _(generate valid configurations for other builders)**
    • time
      • MS_TO_SECONDS() (converts milliseconds to seconds)
      • SECONDS_TO_MS() (converts seconds to milliseconds)
      • HOURS_TO_MS() (converts hours to milliseconds)
    • topic
      • new Topic({ /* options */ }) (creates a Topic, which should probably be a first-class citizen of the core package but for now resides here)

API Reference

_API.md

:factory: StreamingDataSource

A remote source capable of retrieving stream records from a Redis instance.

Methods

:gear: writeToStream

A low-level implementation wrapping a Redis Stream Write operation

MethodType
writeToStream(outgoingStream: string, incomingStream: string, messageType: MessageType, messageId: string, message: string, sourceId: string, shard?: string) => Promise<string>

Parameters:

  • outgoingStream: The stream ID to target in Redis
  • incomingStream: Maybe, a stream ID to reply to
  • messageType: The type of the event
  • messageId: The ID of the message
  • message: The message payload
  • sourceId: The ID of the source
  • shard: Maybe, the shard to target

:gear: setResponseType

Sets the MessageType field default for outgoing messages

MethodType
setResponseType(type: string) => void

Parameters:

  • type: The MessageType for outgoing messages

:gear: addStreamId

Adds a stream to the set for consumption

MethodType
addStreamId(streamId: string) => void

Parameters:

  • streamId: the key of the stream to ingest

:gear: hasStreamId

Checks whether a stream is set for consumption

MethodType
hasStreamId(streamId: string) => boolean

Parameters:

  • streamId: the key of the stream to check

:gear: removeStreamId

Removes a stream from the set for consumption

MethodType
removeStreamId(streamId: string) => void

Parameters:

  • streamId: the key of the stream to remove

:gear: getReadStream

MethodType
getReadStream(options: { topic: Topic; shard?: string; } or GetReadStreamOptions) => Readable and { readableObjectMode: true; }

:gear: getWriteStream

Get a Writable stream, for which written objects will be written to the remote

MethodType
getWriteStream(options: { topic: Topic; shard?: string; } or { stream: string; responseChannel?: string; shard?: string; }) => Writable and { writableObjectMode: true; }

Parameters:

  • options: : The Topic to publish messages to
0.0.62

10 months ago

0.0.63

10 months ago

0.0.64

10 months ago

0.0.65

10 months ago

0.0.66

10 months ago

0.0.60

10 months ago

0.0.61

10 months ago

0.0.59

10 months ago

0.0.51

10 months ago

0.0.52

10 months ago

0.0.53

10 months ago

0.0.54

10 months ago

0.0.55

10 months ago

0.0.56

10 months ago

0.0.57

10 months ago

0.0.50

10 months ago

0.0.48

10 months ago

0.0.49

10 months ago

0.0.42

2 years ago

0.0.43

2 years ago

0.0.44

2 years ago

0.0.45

2 years ago

0.0.46

2 years ago

0.0.47

2 years ago

0.0.41

2 years ago

0.0.40

2 years ago

0.0.39

2 years ago

0.0.38

2 years ago

0.0.37

2 years ago

0.0.36

2 years ago

0.0.35

2 years ago

0.0.33

2 years ago

0.0.32

2 years ago

0.0.31

2 years ago

0.0.30

2 years ago

0.0.29

2 years ago

0.0.28

2 years ago

0.0.25

2 years ago

0.0.24

2 years ago

0.0.22

2 years ago

0.0.21

2 years ago

0.0.20

2 years ago

0.0.19

2 years ago

0.0.18

2 years ago

0.0.17

2 years ago

0.0.16

2 years ago

0.0.15

2 years ago

0.0.14

2 years ago

0.0.13

2 years ago

0.0.12

2 years ago

0.0.11

2 years ago

0.0.10

2 years ago

0.0.9

2 years ago

0.0.8

2 years ago

0.0.7

2 years ago

0.0.6

2 years ago

0.0.5

2 years ago

0.0.3

2 years ago

0.0.2

2 years ago

0.0.1

2 years ago