0.0.12 • Published 2 years ago

@sha1n/pipelines v0.0.12

Weekly downloads
-
License
MIT
Repository
github
Last release
2 years ago

CI Coverage GitHub npm type definitions npm

Pipelines

A mini-framework for building state driven pipelines. A pipeline can be a simple chain of responsibilities that run in process, but it really gets interesting when the process is distributed.

What's a Pipeline?

A pipeline is an abstract process that goes through several execution steps until it reaches a terminal state. A pipeline is stateful and defines several abstractions: 1. a StatefulPipelineEntity - carries the state of the process represented by the pipeline. 2. a StateRepository - used to persist the state of the pipeline after every state transition. 3. a Handler or a number of them - a handler is a function that transitions the process from one state to another. 4. a HandlerContext - any object that carries contextual volatile information throughout the process.

Why use Pipelines?

Pipelines break complex algorithms into smaller steps that have a lot of flexibility on one hand, but have an identical interface. This has many benefits:

  • Smaller consistent pieces are easy to test
  • Easy to add/remove/replace steps
  • Easy to understand the state graph (see example)
  • With consistent contextual information and data in all handlers, it is easier to monitor and create effective logs
  • A pipeline can be composed of steps that run local actions and remote actions. A pipeline can be driven by a mixture of HTTP/RPC requests, MQ messages, in process triggers and still look and behave like one consistent flow.

Use Cases

In Memory

If the entire pipeline starts and ends in one call in your process, you need something to drive the entity thought the pipeline. You can either create a pump function using createPump, or a PipelineDriver, or create your own version. See the build pipeline example here and demo here.

Distributed

If at least one state transition depends on an asynchronous execution (usually on an external systems), an in memory driver is not what you need. In such cases at least parts of the pipeline will have to be driven by external calls such as HTTP callbacks, MQ consumers etc.

Example

Lets say you have a pipeline that runs a Kubernetes job. Since Kubernetes jobs can take time to schedule resources and execute, we don't want to do it synchronously. In this case, we would normally have to pause our pipeline and continue the execution when job completes.

Here is what your pipeline state-machine might look like:

enum JobState {
  Initiated,
  Configured,
  Executed,
  Completed,
  Failed,
  Cancelled
}

// Here is what your pipeline definition might look like
const pipeline = createPipelineBuilder<Job, JobState, JobContext>()
  .withStateRepository(new YourPersistentRepository())
  // optional, this tells the pipeline to set the pipeline's state to this by default when a NonRecoverablePipelineError is caught
  .withFailedState(JobState.Failed)
  .withTransitionResolver(
    createTransitionResolverBuilder<Job, JobStats, JobContext>()
      .withTerminalStates(JobState.Completed, JobState.Failed, JobState.Cancelled)
      .withTransition(JobState.Initiated, JobState.Configured, configHandler)
      .withTransition(JobState.Configured, JobState.Executed, executionHandler)
      .withTransition(JobState.Executed, JobState.Completed, completionHandler)
      .build()
  )
  .build()

The part we want to focus on here is the transition from Executed to Completed, which is realized by the completionHandler. But first, lets understand how we get to the Executed state and what it represents in our pipeline. Executed in this case represents the fact that we requested Kubernetes to execute a job successfully. Since we lost contact with the flow, we cannot make any state transitions until the job completes, either successfully or with an error. So what now? Well, we have several options, all are equally valid from the pipeline's perspective.

We can schedule a recurrent polling job to monitor the Kubernetes job's state. In this case, once the polling job identifies that the job completed, it pushes the pipeline to completion by calling Pipeline.handle(job, ctx). If that job is not a part of the process that runs the pipeline, it makes sense that it will send a message or call an API that interacts with the pipeline. Either way the point is clear. At this point, the job is in state Executed, so calling Pipeline.handle(job, ctx) will trigger the transition handler that is associated with this state. In this case it's completionHandler. Alternatively, if we have control over the job's behavior, we can make it call an API or send an MQ message before it exits. In this case, the API controller, or MQ message handler will have to do the same thing.

Simple Build Pipeline Example

See full example code here

// Building a pipeline for a task
const pipeline = createPipelineBuilder<BuildTask, BuildState, BuildContext>()
  .withStateRepository(new BuildTasksRepository())
  .withFailedState(BuildState.Failed)
  .withOnBeforeHandler(async (task, ctx) => {
    ctx.logger.info(`[elapsed: ${ctx.elapsed(TimeUnit.Seconds)}]: ${task.state}`);
    return task;
  })
  .withOnAfterHandler(async (task, ctx) => {
    ctx.logger.info(`[elapsed: ${ctx.elapsed(TimeUnit.Seconds)}]: State is now ${task.state}`);
  })
  .withTransitionResolver(
    createTransitionResolverBuilder<BuildTask, BuildState, BuildContext>()
      .withTerminalStates(BuildState.Completed, BuildState.Failed, BuildState.Cancelled)
      // eslint-disable-next-line prettier/prettier
      .withTransition(BuildState.Initiated, BuildState.WorkspaceSetup, 
        async (task: BuildTask, ctx: BuildContext) => {
        await execute('mkdir', ['-p', ctx.workspaceDir]);
        await execute('git', ['clone', '--depth=1', task.repositoryUrl, ctx.workspaceDir]);

        return task;
      })
      .withTransition(
        BuildState.WorkspaceSetup,
        BuildState.InstallCompleted,
        async (task: BuildTask, ctx: BuildContext) => {
          await execute('yarn', ['install'], ctx.workspaceDir);
          await execute('yarn', ['build'], ctx.workspaceDir);
          return task;
        }
      )
      .withTransition(
        BuildState.InstallCompleted,
        BuildState.TestCompleted,
        async (task: BuildTask, ctx: BuildContext) => {
          await execute('yarn', ['test'], ctx.workspaceDir);
          return task;
        }
      )
      .withTransition(BuildState.TestCompleted, BuildState.Completed, async (task: BuildTask, ctx: BuildContext) => {
        await execute('echo', ['🥳 🎉 Build pipeline finished successfully!']);
        await cleanup(ctx);
        return task;
      })
      .build()
  )
  .build();


// Creating an in-memory pipeline pump
const pump = createPump(pipeline);

// Using the pipeline driver to run a task
const task = new BuildTask('git@github.com:sha1n/fungus.git');
const wsBasePath = path.join(os.tmpdir(), 'build-pipelines');
const ctx = <BuildContext>{
  workspaceDir: path.join(wsBasePath, task.id),
  elapsed: stopwatch(),
  logger: createLogger(`build:${task.id}`)
};

pump(task, ctx).finally(() => {
  return retryAround(() => execute('rm', ['-rf', wsBasePath]), exponentialBackoffRetryPolicy(2));
});

Build Pipeline Demo

The demo code can be found here and below is how you can run it.

yarn install && yarn run demo

or using NPM

npm i && npm run demo

Install

yarn install @sha1n/pipelines
npm i @sha1n/pipelines
0.0.12

2 years ago

0.0.11

2 years ago

0.0.10

2 years ago

0.0.9

2 years ago

0.0.8

2 years ago

0.0.7

2 years ago

0.0.6

2 years ago

0.0.5

2 years ago

0.0.4

2 years ago

0.0.3

2 years ago

0.0.2

2 years ago

0.0.1

2 years ago