3.0.1 • Published 2 years ago
express-long-polling v3.0.1
express-long-polling
Library to implement task dispatching with long-polling approach on express server.
Installation
npm i express-long-pollingUsage Example
See complete demo in server.ts, process-task.ts and submit-task.ts.
import express from 'express'
import cors from 'cors'
import { print } from 'listening-on'
import { LongPollingTaskQueue } from 'express-long-polling'
let app = express()
app.use(cors())
app.use(express.json())
let taskQueue = new LongPollingTaskQueue({ pollingInterval: 30 * 1000 })
// client creates pending task
app.post('/task', (req, res) => {
  let { id } = taskQueue.addTask({
    input: req.body,
  })
  res.json({ id })
})
// worker polls pending task
app.get('/task', (req, res) => {
  taskQueue.getOrWaitTask('random', req, ({ id, input }) =>
    res.json({ id, input }),
  )
})
// worker submits task result
app.post('/task/result', (req, res) => {
  let { id, output } = req.body
  let found = taskQueue.dispatchResult(id, output)
  res.status(201)
  res.end()
})
// client gets task result
app.get('/task/result', (req, res) => {
  let { id } = req.query
  taskQueue.getOrWaitResult(
    id,
    req,
    output => res.json({ status: 'completed', output }),
    /* optional, default will redirect with 307 for the same url */
    timeout => res.json({ status: 'pending' }),
  )
})
// client delete completed task
app.delete('/task', (req, res) => {
  let { id } = req.query
  taskQueue.deleteTask(id)
  res.end()
})
let PORT = 8100
app.listen(PORT, () => {
  print(PORT)
})Typescript Signature
import type { Request } from 'express'
/** @description redirect with 307 to let client retry */
function defaultOnTimeout(req: Request): void {
  req.res?.redirect(307, req.url)
}
export class LongPollingTaskQueue<Input, Output> {
  constructor(options?: {
    /** @default 30 seconds */
    pollingInterval?: number
  })
  /**
   * @description create task from client
   */
  addTask(options: {
    /** @default randomUUID */
    id?: string | (() => string)
    input: Input
  }): {
    id: string
  }
  /**
   * @description get task from worker
   */
  getOrWaitTask(
    getTask: 'first' | 'random',
    req: Request,
    onTask: (task: { id: string; input: Input }) => void,
    onTimeout?: typeof defaultOnTimeout,
  ): void
  /**
   * @description dispatch result from worker
   * @returns true if the task is found and deleted
   * @returns false if the task is not found (maybe already deleted)
   */
  dispatchResult(id: string, output: Output): boolean
  /**
   * @description get result from client (dispatched from worker)
   */
  getOrWaitResult(
    id: string,
    req: Request,
    onOutput: (output: Output) => void,
    onTimeout?: typeof defaultOnTimeout,
  ): void
  /**
   * @description delete completed task from client (to release memory)
   * @returns true if the task is found and deleted
   * @returns false if the task is not found (maybe already deleted)
   */
  deleteTask(id: string): boolean
}License
This project is licensed with BSD-2-Clause
This is free, libre, and open-source software. It comes down to four essential freedoms [ref]:
- The freedom to run the program as you wish, for any purpose
 - The freedom to study how the program works, and change it so it does your computing as you wish
 - The freedom to redistribute copies so you can help others
 - The freedom to distribute copies of your modified versions to others