0.0.66 • Published 1 year ago

@streamerson/core v0.0.66

Weekly downloads
-
License
LAMC
Repository
github
Last release
1 year ago

@streamerson/core

For Doing Cool & Idiomatic Things with Streams, Maybe

Overview

This package was created as a core SDK for my smoother-brained endeavours, some of which is documented in the Streamerson Examples. Give it a read, or skip it-- this package exists on its own and might be helpful.

Everything here is built for internal usage in the rest of the @streamerson packages. However, those higher-level packages mostly wrap this core code, meaning that if my more use-case-driven tooling (the other monorepo packages) aren't of interest to you, maybe the low-level components here will help you build something too.

The idea for the exported code of this package is essentially to achieve the following:

  • wrap Redis clients behind interfaces
  • wrap reading/writing logic behind real Streams (Read/Writables)
  • provide a set of un-opinionated tools for manipulating these Read/Writables
  • instrument some basic utilities and helper functions to distribute to the rest of the monorepo

Table of Contents

Notes

Some quick notes:

  • All Typescript
  • Meant for node v20+
  • All the current code relies on redis at a version higher than 6.
  • In the future, the Topic interface will be extended to work with more than Redis streams probably. I have my sights on AWS:SQS first, because cheap and fast and Kinesis sucks.
  • Unfinished -- believe the package version being sub-1.
  • Lots of high-hanging fruit for low-level optimizations around asynchronicity, network connection pooling, ser/des, memory management, potential for hanging promises, etc. This is a fairly crunchy project and I've just been getting to these things as I have time.

Installation

  • Install the core SDK in your package of choice:
yarn add @streamerson/core
  • Import some stuff, get streamin'. The following example will connect to Redis and begin listening for events with a type hello on a stream Topic, responding in kind with some JSON:

Example

readable-stream.example.ts

import {StreamingDataSource, Topic} from '@streamerson/core';

export const readChannel = new StreamingDataSource(/* optional options */);

await readChannel.connect();

for await (const event of readChannel.getReadStream({
    // ... optional options, like batch size and timeouts
    topic: new Topic('my-example-topic')
})) {
    readChannel.logger.info(event, 'Received event!')
    // Do something with my streamed event?
    // We could even `.pipe()` this event to a Writable.
}
  • And you're ... streaming with gas? Of course, this is relying on default connection settings and the existence of a Redis server. In this monorepo, there are tools for starting a Redis Docker image, and there are connection options built into the parameters of all the functions for connecting to a non-defaulted Redis instance.

API

RedisDataSource

  • a base implementation of a data-source (any interface capable of)
    • client (getter for a client for data connection)
    • control (getter for a client for orchestration)
    • connect() (connect to the datasource)
    • disconnect() (disconnect from the datasource)

StreamingDataSource

  • an extension of the RedisDataSource, which implements streaming protocols:
    • writeToStream({ ... }) (write to a stream using the data-source)
    • getReadStream({ ... }) (get a Readable for a stream)
    • getWriteStream({ ... }) (get a Writable for a stream)
    • iterateStream({ ... }) (get an Iterable that reads from a stream)
    • set() (set a key, for orchestration purposes)
    • get() (get a key, for orchestration purposes)
    • ... and more

Promise Tracker

  • a general purpose utility for using the await keyword to cede control until a future event has occurred on a stream. You could generally use .once('event'), but due to memory management concerns I have exposed a utility with an interface as follows:
    • tracker.promise('event')
    • tracker.cancel('event')
    • tracker.cancelAll()

Stream Awaiter

  • a general purpose utility for call-and-response along two streams. After a message with a given ID is dispatched to one stream, generate a promise that will resolve when the second stream receives a message with a matching ID, using the methods:
    • streamAwaiter.dispatch('some-id') (a promise for a response with 'some-id')
    • streamAwaiter.readResponseStream() (begin reading incoming responses)

Utils

  • A collection of utilities for internal Streamerson use. Outside of that context, use them at your own peril:
    • ids
      • guuid() (generate a GUUID)
    • keys
      • keyGenerator() (generate keys with standard markup for stream & key identifiers)
      • shardDecorator() (decorate IDs with a shard for logical partitioning)
      • consumerProducerDecorator() (decorate IDs with a consumer group for group partioning)
    • stream configuration
      • buildStreamConfiguration() _(generate valid configurations for other builders)**
    • time
      • MS_TO_SECONDS() (converts milliseconds to seconds)
      • SECONDS_TO_MS() (converts seconds to milliseconds)
      • HOURS_TO_MS() (converts hours to milliseconds)
    • topic
      • new Topic({ /* options */ }) (creates a Topic, which should probably be a first-class citizen of the core package but for now resides here)

API Reference

_API.md

:factory: StreamingDataSource

A remote source capable of retrieving stream records from a Redis instance.

Methods

:gear: writeToStream

A low-level implementation wrapping a Redis Stream Write operation

MethodType
writeToStream(outgoingStream: string, incomingStream: string, messageType: MessageType, messageId: string, message: string, sourceId: string, shard?: string) => Promise<string>

Parameters:

  • outgoingStream: The stream ID to target in Redis
  • incomingStream: Maybe, a stream ID to reply to
  • messageType: The type of the event
  • messageId: The ID of the message
  • message: The message payload
  • sourceId: The ID of the source
  • shard: Maybe, the shard to target

:gear: setResponseType

Sets the MessageType field default for outgoing messages

MethodType
setResponseType(type: string) => void

Parameters:

  • type: The MessageType for outgoing messages

:gear: addStreamId

Adds a stream to the set for consumption

MethodType
addStreamId(streamId: string) => void

Parameters:

  • streamId: the key of the stream to ingest

:gear: hasStreamId

Checks whether a stream is set for consumption

MethodType
hasStreamId(streamId: string) => boolean

Parameters:

  • streamId: the key of the stream to check

:gear: removeStreamId

Removes a stream from the set for consumption

MethodType
removeStreamId(streamId: string) => void

Parameters:

  • streamId: the key of the stream to remove

:gear: getReadStream

MethodType
getReadStream(options: { topic: Topic; shard?: string; } or GetReadStreamOptions) => Readable and { readableObjectMode: true; }

:gear: getWriteStream

Get a Writable stream, for which written objects will be written to the remote

MethodType
getWriteStream(options: { topic: Topic; shard?: string; } or { stream: string; responseChannel?: string; shard?: string; }) => Writable and { writableObjectMode: true; }

Parameters:

  • options: : The Topic to publish messages to
0.0.62

1 year ago

0.0.63

1 year ago

0.0.64

1 year ago

0.0.65

1 year ago

0.0.66

1 year ago

0.0.60

1 year ago

0.0.61

1 year ago

0.0.59

1 year ago

0.0.51

1 year ago

0.0.52

1 year ago

0.0.53

1 year ago

0.0.54

1 year ago

0.0.55

1 year ago

0.0.56

1 year ago

0.0.57

1 year ago

0.0.50

1 year ago

0.0.48

1 year ago

0.0.49

1 year ago

0.0.42

2 years ago

0.0.43

2 years ago

0.0.44

2 years ago

0.0.45

2 years ago

0.0.46

2 years ago

0.0.47

2 years ago

0.0.41

2 years ago

0.0.40

2 years ago

0.0.39

2 years ago

0.0.38

2 years ago

0.0.37

2 years ago

0.0.36

2 years ago

0.0.35

2 years ago

0.0.33

2 years ago

0.0.32

2 years ago

0.0.31

2 years ago

0.0.30

2 years ago

0.0.29

2 years ago

0.0.28

2 years ago

0.0.25

2 years ago

0.0.24

2 years ago

0.0.22

2 years ago

0.0.21

2 years ago

0.0.20

2 years ago

0.0.19

2 years ago

0.0.18

2 years ago

0.0.17

2 years ago

0.0.16

2 years ago

0.0.15

2 years ago

0.0.14

2 years ago

0.0.13

2 years ago

0.0.12

2 years ago

0.0.11

2 years ago

0.0.10

2 years ago

0.0.9

2 years ago

0.0.8

2 years ago

0.0.7

2 years ago

0.0.6

2 years ago

0.0.5

2 years ago

0.0.3

2 years ago

0.0.2

2 years ago

0.0.1

2 years ago