/**
* SSE工具类
* 支持自动重连、手动关闭、消息监听、错误回调
*/
export class SSEClient {
private eventSource: EventSource | null = null;
private url: string;
private options: EventSourceInit;
private reconnectInterval: number; // 重连间隔(ms)
private maxReconnectAttempts: number; // 最大重连次数
private reconnectTimer: any = null;
private reconnectAttempts = 0;
// 回调函数
public onMessage: (data: any) => void;
public onError: (error: Event) => void;
public onOpen: () => void;
public onClose: () => void;
/**
* 构造函数
* @param url SSE 接口地址
* @param options SSE 配置(withCredentials 等)
* @param reconnectInterval 重连间隔 默认 3000ms
* @param maxReconnectAttempts 最大重连次数 默认 10 次(-1 为无限重连)
*/
constructor(url: string, options: EventSourceInit = { withCredentials: true }, reconnectInterval = 5000, maxReconnectAttempts = 5) {
this.url = url;
this.options = options;
this.reconnectInterval = reconnectInterval;
this.maxReconnectAttempts = maxReconnectAttempts;
// 默认空回调
this.onMessage = () => {};
this.onError = () => {};
this.onOpen = () => {};
this.onClose = () => {};
}
/**
* 创建 SSE 连接
*/
connect(): void {
// 已存在连接则不重复创建
if (this.eventSource) return;
// 初始化 SSE
this.eventSource = new EventSource(this.url, this.options);
// 连接成功
this.eventSource.onopen = (e) => {
console.log('✅ SSE 连接成功', e);
this.reconnectAttempts = 0; // 重置重连次数
this.onOpen();
};
// 接收消息
this.eventSource.onmessage = (e) => {
try {
// 后端返回 JSON 格式数据时自动解析(使用 json-bigint 处理大数)
const data = JSONBig.parse(e.data);
this.onMessage(data);
} catch (err) {
// 非 JSON 数据直接返回原文
this.onMessage(e.data);
}
};
// 连接错误/断开
this.eventSource.onerror = (e) => {
console.error('❌ SSE 连接异常', e);
this.onError(e);
// 关闭当前连接
this.close(false);
// 触发重连
this.handleReconnect();
};
}
/**
* 重连处理
*/
private handleReconnect(): void {
// 达到最大重连次数,停止重连
if (this.maxReconnectAttempts !== -1 && this.reconnectAttempts >= this.maxReconnectAttempts) {
console.warn('⚠️ SSE 已达到最大重连次数,停止重连');
this.onClose();
return;
}
// 清除已有定时器
if (this.reconnectTimer) clearTimeout(this.reconnectTimer);
// 执行重连
this.reconnectTimer = setTimeout(() => {
this.reconnectAttempts++;
console.log(`🔄 SSE 第 ${this.reconnectAttempts} 次重连中...`);
this.connect();
}, this.reconnectInterval);
}
/**
* 手动关闭 SSE 连接
* @param isManual 是否手动关闭(true:不触发重连)
*/
close(isManual = true): void {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
// 清除重连定时器
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (isManual) {
console.log('🔌 SSE 已手动关闭');
this.onClose();
}
}
}
SSE的封装
🌺
摘要
SSE的封装