@solid-primitives/async
A collection of primitives for handling asynchronous data as reactive memos — streaming responses, cancellable fetches, retrying, and aggregating incremental results:
fromStream- wraps a fetch request to support web streams in memos or optimistic signalsfromJSONStream- wraps a fetch request returning a web stream containing (incomplete) JSON for the use in memos or optimistic signalsmakeAbortable- sets up an AbortSignal with auto-abort on re-fetch or timeoutcreateAbortable- likemakeAbortable, but with automatic abort on cleanupmakeRetrying- wraps the fetcher to retry requests after a delaycreateAggregated- aggregates the values of an accessor
Installation
npm install @solid-primitives/async
# or
yarn add @solid-primitives/async
# or
pnpm add @solid-primitives/async
fromStream
Turns a function returning a Web Stream API ReadableStream (optionally wrapped in a Promise), or a streaming Response, into an async generator that buffers the stream and yields the accumulated text after every chunk. Node's stream/web ReadableStream is also accepted — useful during streaming SSR, since a plain (non-streaming) SSR pass only ever reads the final value anyway.
// definition (ReadableStream includes Node's `stream/web` variant)
fromStream<Args extends any[]>(
fetcher: (...args: Args) => Promise<Response | ReadableStream> | Response | ReadableStream
): (...args: Args) => AsyncGenerator<string, void, unknown>;
// on the client
const plainText = createMemo(fromStream(() => fetch(url())));
// on the server (streaming SSR only)
const readme = createMemo(fromStream(() => Readable.toWeb(createReadStream("README.md"))));
If the packets were very small and contained only a few words from lorem ipsum, the result would be (one line per update):
Lorem ipsum
Lorem ipsum dolor sit amet,
Lorem ipsum dolor sit amet, consetetur sadipscing
and so on. Usual HTTP packets can transmit ~1.4kb including headers, so expect multiple updates for larger data.
fromJSONStream
The same as fromStream, but it auto-closes a partial JSON string on every chunk so it parses successfully even mid-object, instead of buffering plain text.
// definition (ReadableStream includes Node's `stream/web` variant)
fromJSONStream<Args extends any[]>(
fetcher: (...args: Args) => Promise<Response | ReadableStream> | Response | ReadableStream
): (...args: Args) => AsyncGenerator<any, void, unknown>;
// usage — cast the result to the shape you expect, since the parsed JSON is untyped
const answer = createMemo(fromJSONStream(() => fetch(url()))) as Accessor<MyResponseShape>;
The result looks like this:
// current data
// parsed JSON
'[{"id":8429,"name":"fromStrea'
[{ id: 8429, name: "fromStrea" }]
'[{"id":8429,"name":"fromStream","description":"tu'
[{ id: 8429, name: "fromStream", description: "tu" }]
'[{"id":8429,"name":"fromStream","description":"turns web streams into'
[{ id: 8429, name: "fromStream", description: "turns web streams into" }]
'[{"id":8429,"name":"fromStream","description":"turns web streams into async iterator"},{"id":294'
[{ id: 8429, name: "fromStream", description: "turns web streams into async iterator" }, { id: 294 }]
'[{"id":8429,"name":"fromStream","description":"turns web streams into async iterator"},{"id":2947,"name":"fromJSONStream",'
[{ id: 8429, name: "fromStream", description: "turns web streams into async iterator" }, { id: 2947, name: "fromJSONStream }]
// and so on
makeAbortable
Orchestrates AbortController creation and aborting of abortable fetchers, either on refetch, after a timeout, or when a parent signal aborts — depending on configuration:
// definition
function makeAbortable(options?: {
autoAbort?: boolean; // default true
timeout?: number;
chainTo?: () => AbortSignal;
}): [
signal: () => AbortSignal,
abort: (reason?: string) => void,
filterAbortError: (err: any) => void,
];
// usage
const [signal, abort, filterAbortError] = makeAbortable();
const data = createMemo(
fromStream(() => fetch(url(), { signal: signal() }).catch(filterAbortError)),
);
// use `createAbortable` if you do not want manual cleanup:
onCleanup(abort);
signal()always returns a fresh, not-yet-abortedAbortSignal; unlessoptions.autoAbortis set tofalse, calling it also aborts the previously returned signal, if anyabort(reason?)aborts the current signal, regardless ofautoAbort- if
timeoutis set, the signal aborts itself automatically after that many milliseconds - if
chainTois set to anothermakeAbortable/createAbortablesignal accessor, this signal aborts whenever that parent signal does (for any reason — manualabort(),timeout, or anautoAbort'd retry) — handy for cascading an abort down a chain of dependent requests filterAbortError(err)returnsundefinedfor errors whose.nameis"AbortError"(whatfetchrejects with when its signal aborts) and re-throws everything else, so you can.catch(filterAbortError)without swallowing real failures
createAbortable
Takes the same options and returns the same tuple as makeAbortable, but also aborts the current signal automatically onCleanup — so it must be called within a reactive (owned) scope.
const [signal, abort, filterAbortError] = createAbortable();
const data = createMemo(
fromStream(() => fetch(url(), { signal: signal() }).catch(filterAbortError)),
);
// no need to call onCleanup(abort) yourself — it happens when the owning scope disposes
makeRetrying
Wraps a fetcher and can catch errors and retry after a delay:
// definition
const fetcher: () => AsyncGenerator<any, void, unknown> = makeRetrying(
() => fetch(url()).then(r => r.body),
{
delay: 1000, // number of Milliseconds to wait before retrying; default is 5s
retries: 1, // number of times the request should be retried before throwing the last error; default is 3 times
},
);
If you want to retry for an infinite number of times, you can set options.retries to Infinity.
createAggregated
Aggregates every value emitted by an accessor into a growing memo, instead of replacing the previous value with each update:
// definition
function createAggregated<R, I extends R | R[]>(
accessor: Accessor<R>,
initialValue?: I,
memoOptions?: MemoOptions<I | R | R[]>, // forwarded to the underlying createMemo
): Accessor<I | R | R[] | undefined>;
// usage
const pages = createAggregated(currentPage, []);
- if the aggregate so far is an Array, incoming values are appended to it
- if the aggregate so far is an Object, the incoming value is shallow-merged into it
- if the aggregate so far is a string, incoming string data is concatenated onto it
- otherwise the aggregate becomes an Array containing the previous and incoming values
null/undefinedvalues from the accessor are ignored and never overwrite an existing aggregate — so a still-pending accessor won't reset an already-started aggregation- objects and arrays are re-created (shallow-copied) on every update, but the individual values are left untouched, so
<For>works as expected