0.4.0 • Published 9 months ago

@yingyeothon/actor-system v0.4.0

Weekly downloads
-
License
MIT
Repository
github
Last release
9 months ago

Actor system

A basic actor system only using a queue and a lock.

Usage

The simplest case

import * as Actor from "@yingyeothon/actor-system";
import * as InMemorySupport from "@yingyeothon/actor-system/lib/support/inmemory";

const subsys = {
  queue: new InMemorySupport.InMemoryQueue(),
  lock: new InMemorySupport.InMemoryLock(),
  awaiter: new InMemorySupport.InMemoryAwaiter(),
};

class Adder {
  private value = 0;

  constructor(public readonly id: string) {}

  public onMessage = (message: { delta: number }) => {
    this.value += message.delta;
  };
}

const env = { ...Actor.singleConsumer, ...subsys, ...new Adder(`adder-1`) };

// `send` means that produces a message to an actor and try to process it if it is possible.
// If other thread attaches this actor, it would process my message, too.
await Actor.send(env, { item: { delta: 1 } });
await Actor.send(env, { item: { delta: 2 } });
await Actor.send(env, { item: { delta: 3 } });
await Actor.send(env, { item: { delta: 4 } });

// `adder` actor would process all messages sequentially in background.

Await policy

It supports awaitPolicy that determines how long I should wait. Forget is default that means I don't want to wait anymore. We can choose Act that waits after onMesssage call or Commit that waits after onCommit call. And in that cases, awaitTimeoutMillis makes a timeout to wait.

Actor.send(env, {
  item: { delta: 10 },
  awaitPolicy: Actor.AwaitPolicy.Act,
  awaitTimeoutMillis: 100,
})
  .then(/* HAPPY */) // It would be called after `onMessage`.
  .catch(/* SAD */);

With prepare and commit

It is too hard that an actor loads its context everytime to process onMessage. If there are many of waiting messages it leads to huge latency. To overcome this, this library supports onPrepare and onCommit to make a processing cycle like onPrepare -> a loop of onMessage until a queue is empty -> onCommit.

class Adder {
  private value = 0;

  constructor(public readonly id: string) {}

  public onPrepare = async () => {
    // Load context from the outer storage.
  };
  public onCommit = () => {
    // Store context to the outer storage.
  };

  public onMessage = (message: { delta: number }) => {
    // Modify context in memory.
    this.value += message.delta;
  };
}

const env = { ...Actor.singleConsumer, ...subsys, ...new Adder(`adder-1`) };
Actor.send(env, {
  item: { delta: 10 },
  awaitPolicy: Actor.AwaitPolicy.Commit,
  awaitTimeoutMillis: 1000,
})
  .then(/* HAPPY */) // It would be called after `onCommit`.
  .catch(/* SAD */);

With fire-and-forget producer and dedicated consumer

Sometimes, we want to use fire-and-forget producer and dedicated consumer to improve overall latency. And, in this case, bulk-message-handler is better than single-message-handler.

// To reduce code size, use an environment tailored to `post`.
await Actor.post(
  {
    id: `adder-1`,
    awaiter: {
      wait: subsys.awaiter.wait,
    },
    queue: {
      push: subsys.queue.push,
    },
    logger: subsys.logger, // optional
  },
  { item: { delta: 10 } }
)
  .then(/* HAPPY */)
  .catch(/* SAD */);

// Or you can use `enqueue`, which doesn't even need `awaiter`.
Actor.enqueue(
  {
    id: adder.id,
    queue: {
      push: actorSubsys.queue.push,
    },
    logger: actorSubsys.logger, // optional
  },
  { item: { delta: 1 } }
)
  .then(/* HAPPY */)
  .catch(/* SAD */);
class Adder {
  private value = 0;

  constructor(public readonly id: string) {}

  // It can process multiple messages at one time.
  public onMessages = (messages: { delta: number }[]) => {
    for (const message of messages) {
      this.value += message.delta;
    }
  };
}

// This `bulk` processor would be alive in 60 seconds.
const env = { ...Actor.bulkConsumer, ...subsys, ...new Adder(`adder-1`) };
Actor.tryToProcess(env, { aliveMillis: 60 * 1000 });

Shift

Preventing to be a victim, it supports aliveMillis and shiftable when processing messages from a queue. If a timeout occurred while executing tryToProcess, it gives up to process and occurs shift event. It is useful to use in a container which has a limited lifetime such as AWS Lambda.

const subsysWithShift = {
  ...subsys,
  shift: async (actorId: string) => {
    // Invoke a new AWS Lambda to process remaining messages in this actor.
  },
};

const env = { ...Actor.singleConsumer, ...subsys, ...new Adder(`adder-shift`) };
Actor.send(
  env,
  {
    item: { delta: 10 },
  },
  { aliveMillis: 5 * 1000, shiftable: true } // A usual timeout of API Gateway
);

It can be expanded a distributed actor system easily when both of a queue and a lock work properly in shared instances.

EventLoop

We can forget about awaiter for a while and focus on the more basic structure. They are enqueue and eventLoop. This is useful if we want to process other tasks in a loop as well as process a message when the actor's lock is occupied. For example, we could process the game logic by polling game messages from the actor queue.

import * as Actor from "@yingyeothon/actor-system";
import * as InMemorySupport from "@yingyeothon/actor-system/lib/support/inmemory";

const subsys = {
  queue: new InMemorySupport.InMemoryQueue(),
  lock: new InMemorySupport.InMemoryLock(),
  awaiter: new InMemorySupport.InMemoryAwaiter(),
};

class Game {
  constructor(public readonly id: string) {}

  public loop = async (poll: () => Promise<GameMessage[]>) => {
    while (this.running) {
      this.processMessages(await poll());
      this.tick();
    }
  };
}

const subsys = {
  queue: new InMemorySupport.InMemoryQueue(),
  lock: new InMemorySupport.InMemoryLock(),
};

async function main() {
  const game = new Game("GAME_ID");
  await Actor.eventLoop<GameMessage>({
    ...subsys,
    ...game,
  });
}

async function sendMessage(gameId: string, message: GameMessage) {
  await Actor.enqueue(
    {
      ...subsys,
      id: gameId,
    },
    { item: message }
  );
}

License

MIT

0.4.0

9 months ago

0.3.1

4 years ago

0.3.0

4 years ago

0.2.4

4 years ago

0.2.3

4 years ago

0.2.2

4 years ago

0.2.1

4 years ago

0.2.0

4 years ago

0.1.8

5 years ago

0.1.7

5 years ago

0.1.6

5 years ago

0.1.5

5 years ago

0.1.4

5 years ago

0.1.3

5 years ago

0.1.2

5 years ago

0.1.1

5 years ago

0.1.0

5 years ago