1.0.1 • Published 1 year ago

@flowcore/sdk-transformer-client v1.0.1

Weekly downloads
-
License
MIT
Repository
github
Last release
1 year ago

sdk-transformer-client

An SDK that can handle the transport logic of chainable flowcore transformers

Installation

install with npm:

npm install @flowcore/sdk-transformer-client

or yarn:

yarn add @flowcore/sdk-transformer-client

Usage

Express Middleware

This SDK comes with a middleware that you can use to handle the transport logic of chainable flowcore transformers.

first install express:

npm install express body-parser
yarn add express body-parser

then you can use the middleware like this:

import express from 'express';
import bodyParser from 'body-parser';
import { 
  transformerMiddleware, 
  transformerHandlerFactory 
} from '@flowcore/sdk-transformer-client';

import z from 'zod'; // optional

const transformerId = 'transformer-id';

const app = express();

app.use(bodyParser.json());
app.use(
  transformerMiddleware("scenario name", transformerId, "strand name", {
    transformerCache: new InMemoryCache(), // or any other that implements SimpleCache interface
    transferHeaders: (req: Request) => { // optional if you want to transfer headers to the next transformers
      return {
        'x-custom-header': req.headers['x-custom-header'],
      };
    };
  }),
);

app.post(
  "/transform",
  transformerHandlerFactory(transformerId, async (req) => {
    console.log(`Transforming data for ${transformerId}`, req.body);
    
    const event = EventData<{
      hello: string;
    }>(
      req.body.event, 
      z.object({ hello: z.string() }) // optional zod schema
    );

    return {
      something: "else",
    };
  }),
);


app.listen(3000, () => {
  console.log('Server is running on port 3000');
});

Note: the default behaviour of the middleware is to send and forget the result to the next transformers, if you want to wait for the next transformers to finish before sending a response to the requester, you can use the waitForNext option:

app.post(
  "/transform",
  transformerHandlerFactory(transformerId, async (req) => {
    // logic
  }),{
    waitForNext: true,
  },
);

Custom Implementation

You can also use the SDK to create your own custom implementation:

import {
  Transformer,
  transformerDataDto,
} from '@flowcore/sdk-transformer-client';
import {TRANSFORMER_VERSION_HEADER} from "./constants";

async function handleRequest(req, res) {
  // validate that the request has the correct information
  const validRequest = transformerDataDto.parse(req.body);
  
  // initialize the transformer
  const transformer = new Transformer(
    "scenario name",
    "strand name",
    "transformer-id",
    validRequest.instructions,
    new InMemoryCache(), // or any other that implements SimpleCache interface
  );

  // set transformer version for response header
  res.setHeader(TRANSFORMER_VERSION_HEADER, req.transformer.getVersion());
  
  try {
    // register the event id for dependency tracking
    if (validRequest.sender) {
      await transformer.registerDependentCall(
        validRequest.sender,
        validRequest.event.eventId,
        validRequest.result,
      );
    }

    // check if this transformers dependencies are met
    const combinedResult = await transformer.dependenciesMet(
      validRequest.event.eventId,
    );
    if (combinedResult === false) {
      res.send({
        message: "Dependencies not met, waiting for next event",
        success: true,
      });
      return;
    }

    // process the logic
    const result = await processLogic(req, res);

    // get the urls of the next transformers in the chain
    const getNextUrls = transformer.getNextTransformerUrls();

    // send the result to the next transformers
    getNextUrls.map(async (url) => {
      await fetch(url, {
        method: "POST",
        headers: {
          "Content-Type": "application/json",
        },
        body: JSON.stringify({
          sender: transformerId,
          instructions: transformer?.getRawInstructions(),
          result: {
            ...validRequest.result,
            ...combinedResult,
            ...result,
          },
          event: parsedBody.event,
        }),
      });
    });

    // send an OK response to requester
    res.send({
      data: result,
      success: true,
    });
  } catch (e) {
    if (transformer.hasErrorTransformer()) {
      if (error instanceof Error) {
        await fetch(`${transformer.getErrorTransformerUrl()}/failed`, {
          method: "POST",
          headers: {
            "Content-Type": "application/json",
          },
          body: JSON.stringify({
            ...req.body,
            error: error.message,
          }),
        });

        res.send({
          message: error.message,
          success: false,
        });
        return;
      }
    }
    console.error(`Error in transformerHandler: ${error}`);
    throw error;
  }
};

Utilities

The SDK also comes with some utilities that you can use when working with SourceEvents:

import { EventData } from '@flowcore/sdk-transformer-client';

// get the parsed and validated event payload
const event = EventData<{
  hello: string;
}>(req.body.event, z.object({ hello: z.string() }));

Cache

The SDK uses an interface for the cache, you can implement your own cache by implementing the SimpleCache interface:

import { SimpleCache } from '@flowcore/sdk-transformer-client';

class InMemoryCache implements SimpleCache {
  private cache: Record<string, any> = {};

  async get(key: string): Promise<any> {
    return this.cache[key];
  }

  async set(key: string, value: any): Promise<void> {
    this.cache[key] = value;
  }

  async delete(key: string): Promise<void> {
    delete this.cache[key];
  }
}

Note: This interface supports the ioredis client out of the box, you can use it by passing the client to the transformer constructor.

Development

yarn install

or with npm:

npm install
1.0.1

1 year ago

1.0.0

1 year ago