mcp-stream-parser v0.8.1
MCP Stream Parser
A stream parser for Claude's message API that provides real-time access to:
- Text content with natural reading speed simulation
- Immediate thought process updates
- Real-time action/tool invocations
- Typing indicators
- Error handling
Installation
npm install @anthropic-ai/mcp-stream-parser
Quick Start
import { AnthropicStreamParser } from '@anthropic-ai/mcp-stream-parser';
const parser = new AnthropicStreamParser({
apiKey: 'your-api-key',
// Enable throttling for natural reading speed
throttle: {
enabled: true,
readingSpeed: 250, // words per minute
minPause: 100, // ms between emissions
maxBuffer: 200 // characters
}
});
// Regular text content (throttled for natural reading)
parser.on('text', event => {
console.log('Text:', event.content);
// event.splitInfo available if content was split
});
// Immediate thought updates
parser.on('thought', event => {
console.log('Thought:', event.content);
// event.hasAction indicates if an action follows
});
// Immediate action/tool invocations
parser.on('action', event => {
console.log('Action:', event.name);
console.log('Parameters:', event.params);
// event.thoughtContent available if preceded by thought
});
// Real-time typing indicators
parser.on('typing', event => {
if (event.isTyping) {
console.log('Claude is typing...');
} else {
console.log('Claude stopped typing');
}
});
// Detailed error handling
parser.on('error', event => {
if (event.fatal) {
console.error('Fatal error:', event.error);
} else {
console.warn('Non-fatal error:', event.error);
}
// event.partialContent available for recovery
});
// Send a message
await parser.sendMessage({
model: 'claude-3-opus-20240229',
content: 'Hello Claude!',
maxTokens: 1024, // optional
systemPrompt: { // optional
content: 'Custom system prompt',
prependInstructions: true
}
});
Event Types
Text Event
Regular text content from Claude. These events are throttled based on the throttling configuration to provide a natural reading experience:
interface TextEvent {
type: 'text';
content: string;
offset: number;
blockIndex?: number;
splitInfo?: {
position: number;
reason: string;
};
}
Thought Event
Claude's thought process. These events are emitted immediately without throttling to ensure responsive feedback:
interface ThoughtEvent {
type: 'thought';
content: string;
offset: number;
blockIndex?: number;
hasAction: boolean; // Indicates if this thought leads to an action
}
Action Event
Tool/action invocations from Claude. These events are emitted immediately without throttling:
interface ActionEvent {
type: 'action';
name: string;
params: Record<string, unknown>;
offset: number;
blockIndex?: number;
thoughtContent?: string; // Available if preceded by a thought
}
Typing Event
Real-time typing status updates:
interface TypingEvent {
type: 'typing';
isTyping: boolean;
offset: number;
blockIndex?: number;
}
Error Event
Detailed error information:
interface ErrorEvent {
type: 'error';
error: Error;
fatal: boolean;
offset: number;
blockIndex?: number;
partialContent?: string; // Available for content recovery
}
Configuration
The parser supports detailed configuration for all aspects of its operation:
const parser = new AnthropicStreamParser({
apiKey: 'your-api-key',
// Parser operation mode
mode: 'message', // or 'stream'
// System prompt handling
systemPrompt: {
content: 'Your custom system prompt',
prependInstructions: true // or false
},
// Message handling
message: {
maxLength: 2000,
splitPoints: ['\n\n', '. ', ' '],
overflow: 'split' // or 'error'
},
// Buffer management
buffer: {
maxSize: 64 * 1024, // 64KB
overflow: 'error', // or 'truncate'
flushOnNewline: true
},
// Natural reading speed simulation
throttle: {
enabled: true,
readingSpeed: 250, // words per minute
minPause: 100, // minimum pause between emissions in ms
maxBuffer: 200 // maximum characters before forced emission
},
// Debug options
debug: {
enabled: true,
state: true, // log state transitions
events: true, // log event emissions
buffer: true, // log buffer operations
json: true, // log JSON parsing
stream: true, // log raw stream chunks
logger: console.debug
}
});
Error Handling
The parser provides comprehensive error handling with the ability to recover partial content:
parser.on('error', event => {
const { error, fatal, partialContent } = event;
if (fatal) {
console.error('Fatal error:', error);
// Handle unrecoverable error
} else {
console.warn('Non-fatal error:', error);
// Continue processing
}
if (partialContent) {
console.log('Partial content:', partialContent);
// Handle incomplete content
}
});
Memory Management
The MCP Stream Parser is optimized for efficient memory usage, particularly important when processing large volumes of streaming content.
Memory Optimization
The parser implements several memory optimization strategies:
Efficient String Management: The
TextBuffer
class minimizes string duplication and manages content efficiently.Resource Cleanup: All components implement proper cleanup methods that should be called when done:
// Always call cleanup when finished with a parser await parser.cleanup();
Reference Management: The parser explicitly breaks circular references to help garbage collection.
Buffer Size Limits: Configure appropriate buffer sizes for your use case:
const parser = new MCPStreamParser({ buffer: { maxSize: 4096, // Adjust based on your needs overflow: 'error' // Or 'truncate' for long-running streams } });
Adapters for Collab-Spec Integration
MCP Stream Parser includes adapter modules to facilitate integration with the collaborative specification (collab-spec) system. These adapters provide a bridge between the internal types used by MCP Stream Parser and the types expected by collab-spec systems.
Using Adapters
import { AnthropicStreamParser } from '@anthropic-ai/mcp-stream-parser';
import { CollabSpecAdapter, StateAdapter } from '@anthropic-ai/mcp-stream-parser/adapters';
// Create parser and adapters
const parser = new AnthropicStreamParser({ apiKey: 'your-api-key' });
const eventAdapter = new CollabSpecAdapter();
const stateAdapter = new StateAdapter();
// Convert events to collab-spec format
parser.on('text', event => {
const collabEvent = eventAdapter.toCollabSpecTextEvent(event);
// Use collabEvent with collab-spec systems
});
// Convert parser state to collab-spec format
function updateState() {
if (parser.getState) {
const collabState = stateAdapter.toCollabSpecState(parser.getState());
// Use collabState with collab-spec systems
}
}
Complete Integration Bridge
For convenience, a complete integration bridge is provided that handles all event and state conversions:
import { MCPCollabSpecBridge } from '@anthropic-ai/mcp-stream-parser/adapters/usage-example';
const bridge = new MCPCollabSpecBridge({
apiKey: 'your-api-key',
// other config options
});
// Use the bridge like a regular parser
bridge.sendMessage('Hello!');
// Clean up when done
bridge.dispose();
For more details on adapters, see the Adapters Documentation.
Contributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.