Anabranch
A TypeScript monorepo for async utilities with first-class error handling. Compatible with Deno, Node.js, and, occasionally, browsers.

Adapters
Anabranch provides adapters for popular tools like Kafka, S3, RabbitMQ, PostgreSQL, MySQL, SQLite, and Google Cloud Storage out of the box. These adapters wrap common operations in Anabranch semantics (Streams and Tasks), making them instantly interchangeable. This is especially useful for testing with in-memory adapters.
What do you do if your use case isn't covered by an existing adapter?
Simple wrapping
For straightforward cases, wrap promises or async iterables directly:
import { Source, Task } from 'anabranch'
// From an async iterable
const stream = Source.from(myAsyncIterable())
// From a promise
const task = Task.of(() => fetchData())
Custom adapters
For integration with abstractions like Storage, and the interchangeability it
brings, create a custom connector that produces adapters:
import type { StorageAdapter, StorageConnector } from '@anabranch/storage'
import { Storage } from '@anabranch/storage'
// 1. Implement the adapter interface
const myAdapter: StorageAdapter = {
async put(key, body) {/* ... */},
async get(key) {/* ... */},
async delete(key) {/* ... */},
async head(key) {/* ... */},
async *list(prefix) {/* ... */},
async close() {/* ... */},
}
// 2. Create a connector that produces adapters
function createMyStorage(): StorageConnector {
return {
connect() {
return Promise.resolve(myAdapter)
},
end() {/* cleanup */},
}
}
// 3. Use with the Storage wrapper for Task/Stream semantics
const storage = await Storage.connect(createMyStorage()).run()
await storage.put('file.txt', 'hello').run()
This pattern is consistent across all Anabranch adapters.
Scripts
# Bootstrap a new package
deno run -A scripts/bootstrap.ts new-package
# Bump versions (dry-run first)
deno run --allow-read --allow-write scripts/bump.ts --help
# Run checks
deno task check
Packages
| Package | Description | JSR | npm |
|---|---|---|---|
| anabranch | Async stream processing where errors are collected alongside values instead of stopping the pipeline. Built on Task and Channel primitives. | ||
| broken-link-checker | Crawl websites and find broken links. Uses web-client for robust HTTP and anabranch streams for concurrent processing with backpressure. | ||
| cache | Cache primitives with Task semantics for composable error handling | ||
| cache-redis | Redis adapter for @anabranch/cache using ioredis with native TTL support | ||
| check-runs | CI status reporting with line annotations. Report tests, lints, builds with error locations. | ||
| check-runs-github | GitHub API implementation for @anabranch/check-runs | ||
| db | Database abstraction with Task/Stream semantics. In-memory adapter for testing, adapters for PostgreSQL, MySQL, and SQLite. | ||
| db-mysql | MySQL database connector using mysql2 with connection pooling for MySQL databases. | ||
| db-postgres | PostgreSQL database connector using node:pg with connection pooling and cursor-based streaming for large result sets. | ||
| db-sqlite | SQLite database connector using Node.js built-in node:sqlite for in-memory or file-based databases. | ||
| eventlog | Event log with Task/Stream semantics. In-memory adapter for event-sourced systems with cursor-based consumption. | ||
| eventlog-kafka | Kafka adapter for eventlog using kafkajs with Task/Stream semantics | ||
| fs | Streaming file-system utilities for reading, walking, globbing, and watching files with composable error handling. | ||
| nosql | NoSQL document collection primitives with Task/Stream semantics | ||
| queue | Message queue with Task/Stream semantics. In-memory adapter with delayed messages, dead letter queues, and visibility timeout. | ||
| queue-rabbitmq | RabbitMQ adapter for @anabranch/queue using amqplib. Supports all queue features with RabbitMQ queues for persistent messaging. | ||
| queue-redis | Redis adapter for @anabranch/queue using ioredis. Supports all queue features with Redis streams for persistent messaging. | ||
| storage | Object storage primitives with Task/Stream semantics. In-memory adapter with generic interface for cloud providers. | ||
| storage-browser | Browser storage adapter using IndexedDB. Works in browsers and Web Workers. | ||
| storage-gcs | GCS adapter for @anabranch/storage using @google-cloud/storage. Supports signed URLs and all storage operations. | ||
| storage-s3 | S3 adapter for @anabranch/storage using @aws-sdk/client-s3. Supports presigned URLs, multipart uploads, and all storage operations. | ||
| web-client | Modern HTTP client built on fetch with automatic retries, timeouts, and rate-limit handling. Returns Task for composable error handling. |