@dobbyai/collector
Telemetry collector for AI agents — Node.js / TypeScript port of
dobby-collector.
Capture every run, LLM call, tool invocation, and chain step from your AI agent running on Node.js. Stream them to the Dobby AI Control Plane for governance, compliance, and observability.
- Zero runtime dependencies (uses Node's built-in
fetch+crypto). - Manual API for any framework (or no framework).
- LangChain.js auto-instrumentation via callback handler.
- Mastra auto-instrumentation via agent proxy.
- Vercel AI SDK auto-instrumentation via call wrapper + callbacks.
- Google GenAI (Gemini) auto-instrumentation via client wrap.
- W3C Trace Context (
traceparent) emitted per outbound batch — unlocks Dobby'stracing_enabledgovernance control automatically. - Telemetry never breaks your agent — every error path swallows and logs; the buffer drops oldest events instead of throwing on overflow.
The companion Python SDK is at https://pypi.org/project/dobby-collector/ — same wire protocol, same server-side normalizer, picks the same governance controls.
Install
npm install @dobbyai/collector
For framework auto-instrumentation, install the matching peer dependency:
npm install @dobbyai/collector @langchain/core # LangChain.js
npm install @dobbyai/collector @mastra/core # Mastra
npm install @dobbyai/collector ai # Vercel AI SDK
npm install @dobbyai/collector @google/genai # Google GenAI (Gemini)
The base install does NOT pull any framework transitively — npm install @dobbyai/collector on a vanilla project picks up only Node built-ins.
Quickstart — manual API
import { init, startRun, endRun, span, shutdown } from '@dobbyai/collector';
init({
apiKey: process.env.DOBBY_API_KEY!, // dsdk_* token
connectorId: process.env.DOBBY_CONNECTOR!, // wc_* connector id
});
const run = startRun({
name: 'weekly_report',
inputs: { week: '2026-W19' },
});
const docs = await span('retrieval', { kind: 'tool', inputs: { query: 'sales' } }, async () => {
return await retriever.invoke('sales');
});
const summary = await span('summarize', { kind: 'llm' }, async () => {
return await llm.invoke(docs);
});
endRun(run, { outputs: { summary }, status: 'success' });
// Best-effort flush via `beforeExit` is automatic — for long-running services,
// call shutdown() explicitly on SIGTERM / SIGINT.
await shutdown();
Quickstart — LangChain.js
import { init } from '@dobbyai/collector';
import { DobbyLangChainCallbackHandler } from '@dobbyai/collector/langchain';
init({ apiKey: '...', connectorId: '...' });
const handler = new DobbyLangChainCallbackHandler();
// Per-invoke (recommended)
const result = await agent.invoke(
{ input: 'What is the weather?' },
{ callbacks: [handler] },
);
// Every chain.start/end, llm.start/end, tool.start/end, agent.step lands as
// a Dobby SDK event automatically. One Dobby workload_run per top-level
// .invoke() call (when `autoRun: true`, the default).
Quickstart — Mastra
import { init } from '@dobbyai/collector';
import { wrapMastraAgent } from '@dobbyai/collector/mastra';
import { Agent } from '@mastra/core';
init({ apiKey: '...', connectorId: '...' });
const agent = wrapMastraAgent(new Agent({ /* ... */ }), {
agentName: 'research-bot',
});
// Drop-in: same shape as the original Agent. Every .generate() and .stream()
// call becomes one Dobby workload_run; tool calls land as nested spans.
const result = await agent.generate('What is the weather?');
Quickstart — Vercel AI SDK
import { init } from '@dobbyai/collector';
import { trackVercelAiCall, vercelAiCallbacks } from '@dobbyai/collector/vercel-ai';
import { generateText, streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
init({ apiKey: '...', connectorId: '...' });
// Non-streaming: wrap the generateText call
const result = await trackVercelAiCall(
'weekly_summary',
{ inputs: { topic: 'AI safety' } },
() => generateText({
model: openai('gpt-4o-mini'),
prompt: 'Summarize this week...',
tools: { /* ... */ },
}),
);
// Streaming: spread callbacks into streamText
const cbs = vercelAiCallbacks({ agentName: 'live_chat' });
const stream = streamText({
model: openai('gpt-4o-mini'),
prompt: '...',
tools: { /* ... */ },
...cbs,
});
Quickstart — Google GenAI (Gemini)
The @google/genai SDK calls Gemini directly (ai.models.generateContent)
with no callback hook, so you instrument the client instance once, right after
constructing it. Every Gemini call then lands as a Dobby llm.start /
llm.completion pair inside the active run — no per-call wrapping.
import { GoogleGenAI } from '@google/genai';
import { init, startRun, endRun } from '@dobbyai/collector';
import { instrumentGoogleGenAI } from '@dobbyai/collector/google-genai';
init({ apiKey: '...', connectorId: '...' });
const ai = new GoogleGenAI({ apiKey: process.env.GEMINI_API_KEY });
instrumentGoogleGenAI(ai); // patch once — covers generateContent + generateContentStream
const run = startRun({ name: 'weather', inputs: { q: 'forecast for Dan?' } });
const resp = await ai.models.generateContent({
model: 'gemini-2.5-flash',
contents: 'What is the weather in Kibbutz Dan?',
});
endRun(run, { outputs: { answer: resp.text }, status: 'success' });
// The run lands one workload_run with llm_calls[] = [{ model: 'gemini-2.5-flash',
// prompt, completion, prompt_tokens, completion_tokens }] — so the SOC 2
// logging-completeness control sees the model. `uninstrumentGoogleGenAI(ai)`
// restores the originals (e.g. in tests).
Unlike the Python SDK's zero-arg
instrument_google_genai(), the Node API takes the client instance —@google/genai's public methods are instance-bound, so a global patch can't intercept them. One line, same effect.
Configuration
init() accepts:
| Option | Default | Env var | Notes |
|---|---|---|---|
apiKey |
required | DOBBY_API_KEY |
Workload connector token (dsdk_*) |
connectorId |
required | DOBBY_CONNECTOR_ID |
Workload connector id (wc_*) |
baseUrl |
https://dobby-ai.com |
DOBBY_BASE_URL |
Useful for self-hosted / staging |
flushIntervalMs |
10_000 (10s) |
— | How often the background loop drains the buffer |
maxBufferEvents |
10_000 |
— | Hard cap; oldest events evicted on overflow |
batchSize |
500 |
— | Max events per POST |
framework |
(none) | — | Tag emitted in sdk_meta.framework |
hostFingerprint |
(none) | — | Tag emitted in sdk_meta.host_fingerprint |
httpTimeoutMs |
30_000 (30s) |
— | Per-attempt HTTP timeout |
autoFlushOnExit |
true |
— | Auto-flush on process.on('beforeExit') |
dlqPath |
~/.dobby-collector/dlq-node |
DOBBY_DLQ_PATH |
Dead-letter-queue directory (v0.4.0+) |
dlqEnabled |
true |
DOBBY_DLQ_DISABLED=1 |
Disable for tests / read-only filesystems |
dlqMaxBatches |
1000 |
— | DLQ cap; oldest batches evicted first |
logger |
console (warn+error) |
— | Pass any {warn, error, info, debug} shaped object |
Observability — what lands in Dobby
Each init() call registers a singleton that streams events to:
POST {baseUrl}/api/v1/webhooks/workloads/{connectorId}
Authorization: Bearer {apiKey}
Content-Type: application/json
traceparent: 00-{32hex}-{16hex}-01 ← W3C Trace Context (per batch)
{ "events": [...], "sdk_meta": {...} }
The server adapter buffers events by run_id and finalizes one
workload_runs row on each terminal event (run.completed / run.failed /
run.cancelled). That row carries the canonical
workload_runs.metadata_json shape that Policy Scanner + governance
dashboards consume:
prompt+output(run-level)llm_calls[](model, prompt, completion, tokens)tool_calls[](tool name, args, output, duration)agent_steps[](LangChain ReAct-style thought/action/observation)traceparent— flipstracing_enabledgovernance control toconfigured
Telemetry-never-blocks-your-agent rule
The SDK uses a bounded ring buffer (default 10 000 events). When the
buffer is full, the oldest event is dropped and the agent's call to
emit() returns immediately. The dropped count surfaces in the next batch's
sdk_meta.dropped_events_since_last_batch — the server flags the affected
workload_run as "telemetry degraded" so customers can see the gap in the UI.
If init() is missing, track()/span()/startRun()/endRun() silently
no-op. No throws, no crashes. The agent runs; telemetry simply doesn't land.
Offline survival — the dead-letter queue (v0.4.0+)
Batches that can't be delivered are persisted, not dropped:
- A batch that exhausts its retry schedule (5xx / network errors), or hits an
unknown client-side POST exception, is written as a JSON file under
dlqPath(default~/.dobby-collector/dlq-node). - If
shutdown()times out while a POST is still in-flight (e.g. a slow server response in a short-lived script / CLI / cron agent), the in-flight batch and everything still buffered are rescued to the DLQ before the process exits. - On every flush cycle — including the next process start — pending DLQ
batches are redelivered first, oldest first. Delivery is at-least-once;
the server dedups by
event_id, so a redelivered duplicate is harmless. - Bounded:
dlqMaxBatches(default 1000, oldest evicted first), and a batch that fails 5 consecutive redeliveries is reaped so a permanently-broken payload can't block newer ones.
4xx responses still drop immediately — they indicate a customer-side bug (bad token, malformed event) that would fail redelivery forever.
Comparison with the Python SDK
| Aspect | dobby-collector (Python) |
@dobbyai/collector (Node) |
|---|---|---|
| Public API | track / span / start_run / end_run |
track / span / startRun / endRun |
| Framework adapters | LangChain, CrewAI, AutoGen, OpenAI Assistants, Google GenAI (Gemini) | LangChain.js, Mastra, Vercel AI SDK, Google GenAI (Gemini) |
| Async context propagation | threading.Lock + _current_run_id |
AsyncLocalStorage (Node async_hooks) |
| W3C traceparent | v0.4.0+ | v0.1.0+ (parity from day 1) |
| DLQ on retry-exhaustion | v0.2.0+ (SQLite) | v0.4.0+ (JSON files — zero deps) |
@track decorator |
stdlib decorator | HOF wrapper (TS decorators experimental) |
| Wire protocol | ⟵ identical ⟶ | |
| Server-side normalizer | ⟵ shared ⟶ | |
| Governance controls | ⟵ both flip tracing_enabled → configured automatically ⟶ |
License
MIT 2026 Dobby AI, Inc.