5.2.0 • Published 4 months ago

@lifeomic/test-tool-kinesis v5.2.0

Weekly downloads
-
License
MIT
Repository
github
Last release
4 months ago

@lifeomic/test-tool-kinesis

npm Build Status

Kinesis

Many services use Kinesis as a message processing system. Testing service code against a Kinesis service layer requires either mocking the Kinesis interface (using something like aws-sdk-mock) or pointing the test code at a provisioned Kinesis instance. LocalStack has published LocalStack Docker so that testing can be done without having to use real AWS resources, making testing even easier to do using tools like docker-compose and dockerode.

kinesis-tools supports both methods of integrating with Kinesis. For simple unit tests, the kinesis helper can be used to provision and link a Kinesis docker container with typical beforeAll, beforeEach, afterEach and afterAll hooks. The helper will define all relevant environment variables and will return a preconfigured Kinesis client test context. The setup and tear down will also create and destroy streams, as defined in a provided stream name array, between test cases. The helper will also automatically handle port binding differences between regular and nested Docker environments.

kinesisTestHooks

export const kinesisTestHooks = <StreamName extends string = string>(
  options: KinesisTestHooks | KinesisTestHooksWithDocker,
): KinesisHooksResults<StreamName> => {}

interface RootHooksConfig {
  streams: ModifiedCreateStreamInput[];
  useUniqueIdentifier?: boolean;
  useLocalStack?: boolean;
  kinesisClientConfig?: CreateKinesisClient;
  localStackConfig?: LocalstackConnectionOptions;
}

// Useful for running with a pre-configured instance of Kinesis
// already running, like inside docker-compose.
export interface KinesisTestHooks extends RootHooksConfig {
  useLocalStack?: false;
  localStackConfig?: undefined;
}

// Use when running unit tests and needing to configure a docker instance
// to run Kinesis.
export interface KinesisTestHooksWithDocker extends RootHooksConfig {
  useLocalKinesis: true;
  kinesisClientConfig?: undefined;
  localStackConfig?: LocalstackConnectionOptions;
}

export interface KinesisHooksState {
  streams: ModifiedCreateStreamInput[];
  kinesisClient: KinesisClient;
  uniqueIdentifier?: string;
}

export interface KinesisTestHooksContext<StreamNames extends string = string> extends KinesisHooksState {
  config: CreateKinesisClient,
  streamNamesMap: Record<StreamNames, string>;
}

export interface KinesisHooksResults<StreamName extends string = string> {
  beforeAll: () => Promise<void>;
  beforeEach: () => Promise<KinesisTestHooksContext<StreamName>>;
  afterEach: (context?: KinesisTestHooksContext<StreamName>) => Promise<void>;
  afterAll: () => Promise<void>;
}

The beforeEach test context includes the following attributes:

AttributeDescription/Type
kinesisClient@aws-sdk/client-kinesis { KinesisClient }
uniqueIdentifierThe unique identifier appended to each stream name. E.g. abcdef12345
streamsA copy of the input stream configuration array.
streamNamesMapMap of base stream name to uuid stream name. E.g. users to users-abcdef12345
configThe aws kinesis config object, useful for passing to AWS.Kinesis, or other kinesis wrappers.

If useUniqueIdentifier is true, dynamically generated stream names will be used, in the form of <streamNameProvided>-<ulid>. The unique stream name can be fetched from the streamNamesMap map. Otherwise, the stream name will be the default provided in the streams array. This allows tests to be run in parallel.

Kinesis Tools

kinesis.tools contains some simple kinesis tools to iterate a stream.

KinesisIterator

The KinesisIterator class provides a simple method to get the records from a stream by creating the stream iterator, and reusing it to get records.

Iterator config

AttributeDescription/Type
kinesisClientAWS.Kinesis
streamNamethe name of the kinesis stream

new KinesisIterator(config)

async init()

The init function creates the stream iterator so it can get records from the stream.

async next(limit)

Fetches the next batch of records from the stream. It auto updates it's position in the stream, and returns the KinesisIterator

AttributeDescription/Type
limitan optional limit to how many records to return. Max is 10,000

records

The array of records arrays returned from the last KinesisIterator.next() call.

response

The complete list of responses from the last KinesisIterator.next() call.

static async newIterator(config)

The static KinesisIterator.newIterator function creates a new KinesisIterator, and calls the init function.

getLambdaTriggerEvents

Will iterate through a kinesis stream, and return the events that can be sent to a Lambda handler.

export const getLambdaTriggerEvents = async (
  options: GetLambdaTriggerEvents,
): Promise<GetLambdaTriggerEventsResults> => {}

export interface GetLambdaTriggerEvents {
  kinesisIterator: KinesisIterator;
  limit?: number;
}

export interface GetLambdaTriggerEventsResults {
  processedRecordCount: number;
  lambdaEvents: KinesisStreamEvent[];
}