npm.io
0.4.0 • Published yesterday

@dobbyai/collector

Licence
MIT
Version
0.4.0
Deps
0
Size
1.0 MB
Vulns
0
Weekly
0

@dobbyai/collector

Telemetry collector for AI agents — Node.js / TypeScript port of dobby-collector.

Capture every run, LLM call, tool invocation, and chain step from your AI agent running on Node.js. Stream them to the Dobby AI Control Plane for governance, compliance, and observability.

  • Zero runtime dependencies (uses Node's built-in fetch + crypto).
  • Manual API for any framework (or no framework).
  • LangChain.js auto-instrumentation via callback handler.
  • Mastra auto-instrumentation via agent proxy.
  • Vercel AI SDK auto-instrumentation via call wrapper + callbacks.
  • Google GenAI (Gemini) auto-instrumentation via client wrap.
  • W3C Trace Context (traceparent) emitted per outbound batch — unlocks Dobby's tracing_enabled governance control automatically.
  • Telemetry never breaks your agent — every error path swallows and logs; the buffer drops oldest events instead of throwing on overflow.

The companion Python SDK is at https://pypi.org/project/dobby-collector/ — same wire protocol, same server-side normalizer, picks the same governance controls.

Install

npm install @dobbyai/collector

For framework auto-instrumentation, install the matching peer dependency:

npm install @dobbyai/collector @langchain/core      # LangChain.js
npm install @dobbyai/collector @mastra/core         # Mastra
npm install @dobbyai/collector ai                   # Vercel AI SDK
npm install @dobbyai/collector @google/genai        # Google GenAI (Gemini)

The base install does NOT pull any framework transitively — npm install @dobbyai/collector on a vanilla project picks up only Node built-ins.

Quickstart — manual API

import { init, startRun, endRun, span, shutdown } from '@dobbyai/collector';

init({
  apiKey: process.env.DOBBY_API_KEY!,        // dsdk_* token
  connectorId: process.env.DOBBY_CONNECTOR!, // wc_* connector id
});

const run = startRun({
  name: 'weekly_report',
  inputs: { week: '2026-W19' },
});

const docs = await span('retrieval', { kind: 'tool', inputs: { query: 'sales' } }, async () => {
  return await retriever.invoke('sales');
});

const summary = await span('summarize', { kind: 'llm' }, async () => {
  return await llm.invoke(docs);
});

endRun(run, { outputs: { summary }, status: 'success' });

// Best-effort flush via `beforeExit` is automatic — for long-running services,
// call shutdown() explicitly on SIGTERM / SIGINT.
await shutdown();

Quickstart — LangChain.js

import { init } from '@dobbyai/collector';
import { DobbyLangChainCallbackHandler } from '@dobbyai/collector/langchain';

init({ apiKey: '...', connectorId: '...' });
const handler = new DobbyLangChainCallbackHandler();

// Per-invoke (recommended)
const result = await agent.invoke(
  { input: 'What is the weather?' },
  { callbacks: [handler] },
);
// Every chain.start/end, llm.start/end, tool.start/end, agent.step lands as
// a Dobby SDK event automatically. One Dobby workload_run per top-level
// .invoke() call (when `autoRun: true`, the default).

Quickstart — Mastra

import { init } from '@dobbyai/collector';
import { wrapMastraAgent } from '@dobbyai/collector/mastra';
import { Agent } from '@mastra/core';

init({ apiKey: '...', connectorId: '...' });

const agent = wrapMastraAgent(new Agent({ /* ... */ }), {
  agentName: 'research-bot',
});

// Drop-in: same shape as the original Agent. Every .generate() and .stream()
// call becomes one Dobby workload_run; tool calls land as nested spans.
const result = await agent.generate('What is the weather?');

Quickstart — Vercel AI SDK

import { init } from '@dobbyai/collector';
import { trackVercelAiCall, vercelAiCallbacks } from '@dobbyai/collector/vercel-ai';
import { generateText, streamText } from 'ai';
import { openai } from '@ai-sdk/openai';

init({ apiKey: '...', connectorId: '...' });

// Non-streaming: wrap the generateText call
const result = await trackVercelAiCall(
  'weekly_summary',
  { inputs: { topic: 'AI safety' } },
  () => generateText({
    model: openai('gpt-4o-mini'),
    prompt: 'Summarize this week...',
    tools: { /* ... */ },
  }),
);

// Streaming: spread callbacks into streamText
const cbs = vercelAiCallbacks({ agentName: 'live_chat' });
const stream = streamText({
  model: openai('gpt-4o-mini'),
  prompt: '...',
  tools: { /* ... */ },
  ...cbs,
});

Quickstart — Google GenAI (Gemini)

The @google/genai SDK calls Gemini directly (ai.models.generateContent) with no callback hook, so you instrument the client instance once, right after constructing it. Every Gemini call then lands as a Dobby llm.start / llm.completion pair inside the active run — no per-call wrapping.

import { GoogleGenAI } from '@google/genai';
import { init, startRun, endRun } from '@dobbyai/collector';
import { instrumentGoogleGenAI } from '@dobbyai/collector/google-genai';

init({ apiKey: '...', connectorId: '...' });

const ai = new GoogleGenAI({ apiKey: process.env.GEMINI_API_KEY });
instrumentGoogleGenAI(ai); // patch once — covers generateContent + generateContentStream

const run = startRun({ name: 'weather', inputs: { q: 'forecast for Dan?' } });
const resp = await ai.models.generateContent({
  model: 'gemini-2.5-flash',
  contents: 'What is the weather in Kibbutz Dan?',
});
endRun(run, { outputs: { answer: resp.text }, status: 'success' });
// The run lands one workload_run with llm_calls[] = [{ model: 'gemini-2.5-flash',
// prompt, completion, prompt_tokens, completion_tokens }] — so the SOC 2
// logging-completeness control sees the model. `uninstrumentGoogleGenAI(ai)`
// restores the originals (e.g. in tests).

Unlike the Python SDK's zero-arg instrument_google_genai(), the Node API takes the client instance — @google/genai's public methods are instance-bound, so a global patch can't intercept them. One line, same effect.

Configuration

init() accepts:

Option Default Env var Notes
apiKey required DOBBY_API_KEY Workload connector token (dsdk_*)
connectorId required DOBBY_CONNECTOR_ID Workload connector id (wc_*)
baseUrl https://dobby-ai.com DOBBY_BASE_URL Useful for self-hosted / staging
flushIntervalMs 10_000 (10s) How often the background loop drains the buffer
maxBufferEvents 10_000 Hard cap; oldest events evicted on overflow
batchSize 500 Max events per POST
framework (none) Tag emitted in sdk_meta.framework
hostFingerprint (none) Tag emitted in sdk_meta.host_fingerprint
httpTimeoutMs 30_000 (30s) Per-attempt HTTP timeout
autoFlushOnExit true Auto-flush on process.on('beforeExit')
dlqPath ~/.dobby-collector/dlq-node DOBBY_DLQ_PATH Dead-letter-queue directory (v0.4.0+)
dlqEnabled true DOBBY_DLQ_DISABLED=1 Disable for tests / read-only filesystems
dlqMaxBatches 1000 DLQ cap; oldest batches evicted first
logger console (warn+error) Pass any {warn, error, info, debug} shaped object

Observability — what lands in Dobby

Each init() call registers a singleton that streams events to:

POST {baseUrl}/api/v1/webhooks/workloads/{connectorId}
Authorization: Bearer {apiKey}
Content-Type: application/json
traceparent: 00-{32hex}-{16hex}-01    ← W3C Trace Context (per batch)
{ "events": [...], "sdk_meta": {...} }

The server adapter buffers events by run_id and finalizes one workload_runs row on each terminal event (run.completed / run.failed / run.cancelled). That row carries the canonical workload_runs.metadata_json shape that Policy Scanner + governance dashboards consume:

  • prompt + output (run-level)
  • llm_calls[] (model, prompt, completion, tokens)
  • tool_calls[] (tool name, args, output, duration)
  • agent_steps[] (LangChain ReAct-style thought/action/observation)
  • traceparent — flips tracing_enabled governance control to configured

Telemetry-never-blocks-your-agent rule

The SDK uses a bounded ring buffer (default 10 000 events). When the buffer is full, the oldest event is dropped and the agent's call to emit() returns immediately. The dropped count surfaces in the next batch's sdk_meta.dropped_events_since_last_batch — the server flags the affected workload_run as "telemetry degraded" so customers can see the gap in the UI.

If init() is missing, track()/span()/startRun()/endRun() silently no-op. No throws, no crashes. The agent runs; telemetry simply doesn't land.

Offline survival — the dead-letter queue (v0.4.0+)

Batches that can't be delivered are persisted, not dropped:

  • A batch that exhausts its retry schedule (5xx / network errors), or hits an unknown client-side POST exception, is written as a JSON file under dlqPath (default ~/.dobby-collector/dlq-node).
  • If shutdown() times out while a POST is still in-flight (e.g. a slow server response in a short-lived script / CLI / cron agent), the in-flight batch and everything still buffered are rescued to the DLQ before the process exits.
  • On every flush cycle — including the next process start — pending DLQ batches are redelivered first, oldest first. Delivery is at-least-once; the server dedups by event_id, so a redelivered duplicate is harmless.
  • Bounded: dlqMaxBatches (default 1000, oldest evicted first), and a batch that fails 5 consecutive redeliveries is reaped so a permanently-broken payload can't block newer ones.

4xx responses still drop immediately — they indicate a customer-side bug (bad token, malformed event) that would fail redelivery forever.

Comparison with the Python SDK

Aspect dobby-collector (Python) @dobbyai/collector (Node)
Public API track / span / start_run / end_run track / span / startRun / endRun
Framework adapters LangChain, CrewAI, AutoGen, OpenAI Assistants, Google GenAI (Gemini) LangChain.js, Mastra, Vercel AI SDK, Google GenAI (Gemini)
Async context propagation threading.Lock + _current_run_id AsyncLocalStorage (Node async_hooks)
W3C traceparent v0.4.0+ v0.1.0+ (parity from day 1)
DLQ on retry-exhaustion v0.2.0+ (SQLite) v0.4.0+ (JSON files — zero deps)
@track decorator stdlib decorator HOF wrapper (TS decorators experimental)
Wire protocol ⟵ identical ⟶
Server-side normalizer ⟵ shared ⟶
Governance controls ⟵ both flip tracing_enabledconfigured automatically ⟶

License

MIT 2026 Dobby AI, Inc.

Keywords