1.0.1 • Published 6 months ago
@efdev/event-stream v1.0.1
@efdev/event-stream
浏览器端事件流处理库,提供高性能、可靠且类型安全的SSE(Server-Sent Events)和自定义事件流实现。
✨ 特性
- 🚀 完整的 TypeScript 支持
- 🔄 智能的重试机制
- ❤️ 心跳检测保持连接
- 🎭 支持自定义事件类型
- 🎯 精确的状态管理
- 🛡️ 健壮的错误处理
- 📦 零依赖,仅依赖 @microsoft/fetch-event-source
- 🎨 优雅的 API 设计
- ⚡ 高性能优化设计
📦 安装
npm install @efdev/event-stream -S
# 或
yarn add @efdev/event-stream -S
# 或
pnpm add @efdev/event-stream -S
🔨 使用指南
基本使用
import { createEventStream } from '@efdev/event-stream';
const controller = createEventStream('api/stream', {
onMessage: (message) => console.log('收到消息:', message),
onError: (error) => console.error('发生错误:', error),
onStatusChange: (status) => console.log('连接状态变更:', status),
});
完整配置选项
interface EventStreamOptions<T = unknown> {
// 请求唯一标识
requestId?: string;
// 心跳检测间隔(毫秒)
heartbeatInterval?: number;
// 自定义事件处理回调
onEvent?: (event: CustomEvent) => void;
// 消息处理回调
onMessage?: (message: StreamMessage<T>) => void | Promise<void>;
// 错误处理回调
onError?: (error: EventStreamError) => void;
// 状态变更回调
onStatusChange?: (status: ConnectionStatus) => void;
// 连接建立回调
onOpen?: (response: Response) => Promise<void>;
// 连接关闭回调
onClose?: () => void;
// 请求完成回调
onComplete?: () => void;
// 重试策略配置
retry?: RetryOptions;
// 调试模式开关
debug?: boolean;
// 其他 FetchEventSourceInit 配置
headers?: Record<string, string>;
method?: string;
// 请求负载数据
data?: Record<string, unknown>;
// Token获取函数
getToken?: () => string | Promise<string>;
}
🎓 高级用法
请求ID (requestId) 使用
requestId
用于唯一标识一个事件流请求,便于后续管理和取消:
// 生成唯一请求ID
const requestId = `stream-${Date.now()}-${Math.random().toString(36).substr(2, 8)}`;
const controller = createEventStream('api/stream', {
requestId, // 设置请求ID
onMessage: (message) => console.log(message),
});
// 后续可以通过ID取消请求
cancelRequest(requestId);
最佳实践:
- 确保 requestId 全局唯一
- 在复杂应用中,可以使用业务相关的ID便于追踪
- 建议将 requestId 与业务数据关联存储
心跳检测 (heartbeatInterval) 配置
heartbeatInterval
用于设置心跳检测间隔(毫秒),保持连接活跃:
// 每30秒发送一次心跳
createEventStream('api/stream', {
heartbeatInterval: 30000,
onEvent: (event) => {
if (event.type === EventType.HEARTBEAT) {
console.log('收到心跳响应');
}
},
});
配置建议:
- 生产环境推荐值:20000-60000ms
- 测试环境可以设置更短间隔(如5000ms)便于调试
- 设置为0可禁用心跳检测
自定义重试策略
createEventStream('api/stream', {
retry: {
maxRetries: 5,
retryDelay: (retryCount) => Math.pow(2, retryCount) * 1000,
shouldRetry: (error) => error.code !== EventStreamErrorCode.ABORT_ERROR,
},
});
认证与授权
createEventStream('api/secure-stream', {
getToken: async () => {
const token = await fetchToken();
return `Bearer ${token}`;
},
});
性能优化
// 使用防抖处理高频消息
let debounceTimer: number;
createEventStream('api/high-frequency', {
onMessage: (message) => {
clearTimeout(debounceTimer);
debounceTimer = setTimeout(() => {
processMessage(message);
}, 100);
},
});
📚 API 参考
createEventStream(url: string, options?: EventStreamOptions): EventStreamController
创建并启动一个新的事件流请求。
参数
url
: 目标接口URLoptions
: 事件流配置选项
返回值
EventStreamController
实例,用于控制请求生命周期
EventStreamController 方法
abort()
: 中止请求dispose()
: 清理资源getStatus()
: 获取当前连接状态isAborted()
: 检查是否已中止sendHeartbeat()
: 发送心跳检测
请求管理方法
cancelRequest(requestId: string)
: 取消指定请求cancelAllRequests()
: 取消所有请求getActiveRequests()
: 获取所有活跃请求ID
🏗️ 框架集成
React 示例
import { useEffect } from 'react';
import { createEventStream } from '@efdev/event-stream';
function StreamComponent() {
useEffect(() => {
const controller = createEventStream('api/stream', {
onMessage: (message) => {
// 更新状态
},
onError: (error) => {
// 处理错误
},
});
return () => controller.dispose();
}, []);
return <div>Stream Component</div>;
}
Vue 示例
<script setup>
import { onMounted, onUnmounted } from 'vue';
import { createEventStream } from '@efdev/event-stream';
let controller;
onMounted(() => {
controller = createEventStream('api/stream', {
onMessage: (message) => {
// 更新响应式数据
},
});
});
onUnmounted(() => {
controller?.dispose();
});
</script>
⚠️ 注意事项
连接管理:
- 确保在组件卸载时调用
controller.dispose()
- 合理配置心跳间隔保持连接
- 确保在组件卸载时调用
性能优化:
- 对于高频消息流,考虑使用防抖/节流
- 避免在回调中进行耗时操作
错误处理:
- 实现完整的错误处理逻辑
- 区分可重试错误和不可重试错误
类型安全:
- 充分利用 TypeScript 类型推断
- 为消息数据定义明确的类型
🔍 常见问题
如何检测连接断开?
通过 onStatusChange
回调监听状态变化:
createEventStream('api/stream', {
onStatusChange: (status) => {
if (
status === ConnectionStatus.ERROR ||
status === ConnectionStatus.CLOSED
) {
console.log('连接已断开');
}
},
});
如何实现自动重连?
配置 retry
选项:
createEventStream('api/stream', {
retry: {
maxRetries: 3,
retryDelay: 1000,
shouldRetry: (error) => error.isRetriable,
},
});
如何发送自定义事件?
服务端发送格式:
event: custom-event
data: {"key":"value"}
客户端处理:
createEventStream('api/stream', {
onEvent: (event) => {
if (event.type === 'custom-event') {
console.log('收到自定义事件:', event.data);
}
},
});
如何调试问题?
启用调试模式:
createEventStream('api/stream', {
debug: true, // 将在控制台输出详细日志
});
📝 类型定义
核心类型
// 流式消息
interface StreamMessage<T = unknown> {
data: T;
status: MessageStatus;
timestamp: number;
}
// 自定义事件
interface CustomEvent {
type: EventType;
data: string;
timestamp: number;
}
// 重试策略
interface RetryOptions {
maxRetries?: number;
retryDelay?: number | ((retryCount: number) => number);
shouldRetry?: (error: EventStreamError) => boolean;
}
枚举类型
// 消息状态
enum MessageStatus {
PENDING = 'PENDING',
PROCESSING = 'PROCESSING',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
DONE = '[DONE]',
}
// 连接状态
enum ConnectionStatus {
CONNECTING = 'CONNECTING',
CONNECTED = 'CONNECTED',
CLOSED = 'CLOSED',
ERROR = 'ERROR',
HEARTBEAT = 'HEARTBEAT',
RECONNECTING = 'RECONNECTING',
}
// 事件类型
enum EventType {
MESSAGE = 'message',
ERROR = 'error',
HEARTBEAT = 'heartbeat',
OPEN = 'open',
CLOSE = 'close',
CUSTOM = 'custom',
}
🏆 最佳实践
请求ID管理:
// 使用UUID生成唯一ID import { v4 as uuidv4 } from 'uuid'; const requestId = uuidv4();
错误处理:
createEventStream('api/stream', { onError: (error) => { if (error.code === EventStreamErrorCode.NETWORK_ERROR) { // 处理网络错误 } else { // 处理其他错误 } }, });
性能监控:
let lastMessageTime = Date.now(); createEventStream('api/stream', { onMessage: (message) => { const now = Date.now(); console.log(`消息延迟: ${now - message.timestamp}ms`); lastMessageTime = now; }, });
资源清理:
// 在页面卸载时清理 window.addEventListener('beforeunload', () => { cancelAllRequests(); });
🗂️ 打包与导入
ESM 模块
在支持原生 ESM 的环境中,您可以直接这样引入:
import { createEventStream } from '@efdev/event-stream';
CJS 模块
如果您的项目使用 CommonJS 模块:
const { createEventStream } = require('@efdev/event-stream');
🔄 与 React Hooks 结合示例
您也可以基于 createEventStream
封装一个自定义 Hook:
import { useEffect, useRef, useState } from 'react';
import { createEventStream } from '@efdev/event-stream';
export function useEventStream<T>(url: string) {
const [data, setData] = useState<T | null>(null);
const controllerRef = useRef<any>(null);
useEffect(() => {
controllerRef.current = createEventStream<T>(url, {
onMessage: (msg) => setData(msg.data),
});
return () => controllerRef.current.dispose();
}, [url]);
return data;
}