@flowcore/sdk-data-pump-client v1.11.2
Flowcore SDK Module - Data Pump Client
A Flowcore SDK module that provides a lightweight client for fetching events from the Flowcore platform
Installation
install with npm:
npm install @flowcore/sdk-data-pump-client @flowcore/sdk-oidc-clientor yarn:
yarn add @flowcore/sdk-data-pump-client @flowcore/sdk-oidc-clientUsage
Create a new instance of the Data Pump client:
import {DataPump} from '@flowcore/sdk-data-pump-client';
import {OidcClient} from "@flowcore/sdk-oidc-client";
const client = new OidcClient("your client id", "your client secret", "well known endpoint");
const dataPump = new DataPump("https://graph.api.flowcore.io/graphql", client);You can configure the page size in the last argument of the constructor, the default is
1000.
Then create a RXJS observable to listen to the events:
import {Subject} from 'rxjs';
import {SourceEvent} from "@flowcore/sdk-data-pump-client";
const subject = new Subject<SourceEvent>();
subject.subscribe({
next: (event) => {
console.log(event);
},
complete: () => {
console.log("completed");
},
});Then you can fetch all events with the fetchAllEvents method:
await dataPump.fetchAllEvents(subject, {
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
});This will loop through all the events for the specified event types and push them to the observable.
You can specify how many time buckets should be run in parallel with the
parallelargument, the default is1.
To fetch events for a specific time bucket you can use the fetchEvents method:
await dataPump.fetchEvents(subject, {
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
timeBucket: "your time bucket",
});This will fetch all events for the specified time bucket and push them to the observable.
To close the observable set the last argument of the
fetchEventsmethod totrue.
Limits
You can specify the from and to event id to fetch events between a specific range:
await dataPump.fetchEvents(subject, {
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
timeBucket: "your time bucket",
afterEventId: "your from event id",
beforeEventId: "your to event id",
});These are both exclusive, meaning that the events with the specified id's will not be included in the result. Either and both can be omitted.
Indexes
You can fetch time buckets for a specific event type with the fetchIndexes method:
await dataPump.fetchIndexes({
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventType: "your event type",
});This will return a list of time buckets for the specified event type.
You can also specify the from and to time bucket with the
fromandtoarguments. and it will return the time buckets between the specified range.
Pumping Events
You can also pump events to a destination with the pumpEvents method:
const abortController = new AbortController();
await dataPump.pumpEvents(
"cache-key",
"observable",
{
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
},
abortController
);this will pump all events using backfilling, then switch to live mode and pump all new events. The cache-key is used to store the last event id in the cache, so that the client can resume from the last event id if it is restarted. The abort controller can be used to stop the pumping.
Note: the default cache is in memory, you can implement your own cache by extending the
SimpleCacheclass and passing it to theDataPumpconstructor. via the options object.Note: You can also specify from time bucket with the
fromargument and control the backfilling parallelism with theparallelargument.Note: When not passing the abort controller, the data pump will run once until it has fetched all events currently present in the data container.
Reseting the data pump
You can reset the data pump with the reset method:
await dataPump.reset("cache-key");Note: it is only possible to reset the data pump if it is not currently running. You can check if the data pump is running with the
isRunningmethod. To stop the data pump you can use theabortcontroller.
Creating your own pump
You can create your own pump by manually calling the pumpPage method:
let cursor: string | undefined = undefined;
do {
const result = await dataPump.pumpPage({
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
timeBucket: "your time bucket",
afterEventId: "your from event id" | undefined,
beforeEventId: "your to event id" | undefined,
}, cursor);
for (const event of result.events) {
console.log(event);
}
cursor = result.cursor;
} while(cursor);This will allow you to control the flow of events and prevent the dump from pumping too many events at once.
Development
yarn installor with npm:
npm install9 months ago
10 months ago
12 months ago
12 months ago
12 months ago
1 year ago
1 year ago
10 months ago
10 months ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago