2.4.2 • Published 1 month ago

@hazae41/plume v2.4.2

Weekly downloads
-
License
MIT
Repository
github
Last release
1 month ago

Plume

Typed async events with sequenced and parallel dispatching

npm i @hazae41/plume

Node Package 📦

Features

Current features

  • 100% TypeScript and ESM
  • No external dependency
  • Rust-like patterns
  • Type-safe event dispatching and listening
  • Event listeners can return values
  • Sequenced and parallel dispatching
  • Wait for events with composition

Usage

Emitters

/**
 * Events are described as functions that can accept multiple parameters and return something
 */
type MyEvents = {
  /**
   * This will handle a request and return a response
   */
  request: (data: string) => string,

  /**
   * This will handle a close and return nothing
   */
  close: (reason?: unknown) => void,

  /**
   * This will handle an error and return nothing
   */
  error: (reason?: unknown) => void,
}
class MyObject {
  /**
   * Composition over inheritance
   */
  readonly events = new SuperEventTarget<MyEvents>()

  /**
   * Dispatch an "error" event with a reason
   **/
  async onError(reason?: unknown) {
    await this.events.emit("error", reason)
  }

  /**
   * Dispatch a "close" event without a reason
   **/
  async onClose() {
    await this.event.emit("close")
  }

  /**
   * Dispatch a "request" event and return the returned response
   */
  async request(data: string): string {
    const response = await this.events.emit("request", data)

    /**
     * When a listener has returned something
     */
    if (response.isSome())
      return response.get()

    /**
     * When no listener has returned
     */
    throw new Error(`Unhandled`)
  }

}

Listeners

const object = new MyObject()

object.on("request", (request: string) => {
  if (request === "hello")
    /**
     * Return something and skip next listeners
     */
    return new Some("world")

  /**
   * Unhandled by this listener
   */
  return new None()
})

object.on("request", (request: string) => {
  if (request === "it")
    /**
     * Return something and skip next listeners
     */
    return new Some("works")

  /**
   * Unhandled by this listener
   */
  return new None()
})

object.on("request", (request: string) => {
  if (request === "have")
    /**
     * Return something and skip next listeners
     */
    return new Some("fun")

  /**
   * Unhandled by this listener
   */
  return new None()
})

Sequenced dispatching (default)

You can use sequenced listening using passive: false (or passive: undefined)

The listeners will be called one after the other

When a listener returns something, it will skip all other listeners

for (const listener of listeners) {
  const returned = await listener(...)

  if (returned.isSome())
    return returned

  continue
}

return new None()
/**
 * This listener will be called first
 */
myObject.events.on("message", async (message: string) => {
  await doSometing(message)

  return new Some(1)
}, { passive: false })

/**
 * This listener will be skipped
 */
myObject.events.on("message", async (message: string) => {
  await doSometing2(message)

  return new Some(2)
}, { passive: false })

/**
 * Some(1)
 */
console.log(await myObject.emit("message", "hello world"))

Parallel dispatching

Parallel listening using passive: true

Both listeners will be called at the same time

Their result will be retrieved with Promise.all

const promises = new Array<Promise<...>>()

for (const listener of listeners)
  promises.push(listener(...))

const returneds = await Promise.all(promises)

for (const returned of returneds)
  if (returned.isSome())
    return returned

return new None()
/**
 * This listener will be called first
 */
myObject.events.on("message", async (message: string) => {
  await doSometing(message)

  return new Some(1)
}, { passive: true })

/**
 * This listener will be called too
 */
myObject.events.on("message", async (message: string) => {
  await doSometing(e.data)

  return new Some(2)
}, { passive: true })

/**
 * Some(1)
 */
console.log(await myObject.emit("message", "hello world"))

Waiting for an event

In this example we have a target with a send() method and a message event

We want to send a message with some ID and wait for a reply with the same ID, skipping replies with other ID

Waiting is always done using passive: true

import { Future } from "@hazae41/future"

async function requestAndWait(id: number, request: string): Promise<string> {
  const socket = new MySocket()

  socket.send({ id, text: request })

  const response = await socket.wait("message", async (future: Future<string>, message) => {
    /**
     * Only wait for a message with the same id
     */
    if (message.id === id) {
      /**
       * Resolve with the text
       */
      future.resolve(message.text)

      /**
       * Do not skip other listeners
       */
      return new None()
    }

    /**
     * Do not skip other listeners
     */
    return new None()
  })

  return response
}

Composing waiters with automatic disposal

Same as above but this time the event is raced with other events in a composable way

When one event is resolved or rejected, it will stop listening to the other (it is disposed by the using keyword)

import { Future } from "@hazae41/future"

async function requestAndWaitOrClose(id: number, request: string): Promise<string> {
  const socket = new MySocket()

  socket.send({ id, text: request })

  /**
   * Resolve on message
   */
  using event = socket.wait("message", async (future: Future<string>, message) => {
    if (message.id === id) {
      future.resolve(message.text)
      return new None()
    }

    return new None()
  })

  /**
   * Reject on close
   */
  using close = socket.wait("close", (future: Future<never>) => {
    future.reject(new Error("Closed"))
    return new None()
  })
  
  return await Promise.race([event, close])
}

Plume provides some helper functions for doing this with fewer lines of code

import { Future } from "@hazae41/future"

async function requestAndWaitOrCloseOrErrorOrSignal(id: number, request: string, signal: AbortSignal): Promise<string> {
  const socket = new MySocket()

  socket.send({ id, text: request })

  /**
   * Resolve on message
   */
  using event = socket.wait("message", async (future: Future<string>, message) => {
    if (message.id === id) {
      future.resolve(message.text)
      return new None()
    }

    return new None()
  })

  /**
   * Reject on signal
   */
  using abort = Plume.AbortedError.waitOrThrow(signal)

  /**
   * Reject on error (only if the target has an "error" event)
   */
  using error = Plume.ErroredError.waitOrThrow(socket)

  /**
   * Reject on close (only if the target has a "close" event)
   */
  using close = Plume.ClosedError.waitOrThrow(socket)

  return await Promise.race([event, close, error, abort])
}

And it provides helpers for common error-close-signal patterns

import { Future } from "@hazae41/future"

async function requestAndWaitOrCloseOrErrorOrSignal(id: number, request: string, signal: AbortSignal): Promise<string> {
  const socket = new MySocket()

  socket.send({ id, text: request })

  const response = await Plume.waitOrCloseOrErrorOrSignal(socket, "message", async (future: Future<string>, message) => {
    if (message.id === id) {
      future.resolve(message.text)
      return new None()
    }

    return new None()
  }, signal)

  return response
}
2.4.2

1 month ago

2.4.1

2 months ago

2.4.0

2 months ago

2.3.0

2 months ago

2.2.1

3 months ago

2.2.3

3 months ago

2.2.2

3 months ago

2.1.2

10 months ago

2.2.0

5 months ago

2.1.1

10 months ago

2.1.4

8 months ago

2.1.3

10 months ago

2.1.6

8 months ago

2.1.5

8 months ago

2.1.8

8 months ago

2.1.7

8 months ago

2.1.0

10 months ago

2.1.9

6 months ago

2.1.12

5 months ago

2.1.10

5 months ago

2.0.3

12 months ago

2.0.2

12 months ago

2.0.5

11 months ago

2.0.4

12 months ago

2.0.11

11 months ago

2.0.7

11 months ago

2.0.6

11 months ago

2.0.9

11 months ago

2.0.10

11 months ago

2.0.8

11 months ago

2.0.1

12 months ago

2.0.0

12 months ago

1.0.1

1 year ago

1.0.0

1 year ago