基于 Axios 的实现
本文档展示如何使用 axios HTTP 客户端实现自定义的 AgentService。
安装依赖
npm
$ npm install axios nanoid
yarn
$ yarn add axios nanoid
完整实现代码
import axios, {AxiosError} from 'axios';
import {nanoid} from 'nanoid';
import {BaseAgentService} from '@bdky/aaas-pilot-kit';
// 需要继承 BaseAgentService Kit 提供的基础 AgentService 类
export class CustomAgentService extends BaseAgentService {
static QUERY_URL = 'https://origin/api/sse';
abortController: AbortController | null = null;
// 销毁逻辑,可自定义
dispose = action(() => {
// 基类内置方法
this._dispose();
// 可按自己的逻辑实现
});
// 重写基类 query 方法实现 SSE 流式协议
override query = async (text: string) => {
this.abortController = new AbortController();
const params = {
query: text,
// 一轮对话的ID
queryId: this.queryId,
// 整个会话 Session
sessionId: this.conversationSessionId
};
try {
const response = await axios.post(CustomAgentService.QUERY_URL, params, {
timeout: 300 * 1000,
signal: this.abortController?.signal,
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
'x-trace-id': this.queryId
},
responseType: 'stream'
});
// 处理 SSE 流式响应
this.processStreamData(response.data);
}
catch (e) {
// TODO 业务错误处理
if (e instanceof Error) {
throw new Error(`请求对话API失败:${e.message}`);
}
}
};
/**
* 处理流式数据块
*/
private readonly processStreamData = (stream: any) => {
let buffer = '';
stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
const lines = buffer.split('\\n');
// 保留最后一个可能不完整的行
buffer = lines.pop() || '';
lines.forEach(line => {
if (line.startsWith('data: ')) {
try {
const data = line.slice(6); // 移除 'data: ' 前缀
if (data === '[DONE]') {
// 必须:每轮流式响应完成,调用 Kit 的 onCompleted 方法标记当前轮流式内容输出结束
this.onCompleted({
sessionId: this.conversationSessionId
});
return;
}
const parsedMessage = JSON.parse(data);
// 根据您自己的 SSE 接口协议,解析并处理对应字段,这里只是给个示例
const {
answer,
sessionId,
action,
intentResult
} = parsedMessage;
// 建议:设置 sessionId
if (sessionId) {
this.setSessionId(sessionId);
}
if (answer) {
// 必须:调用 Kit 的 onData 方法传递每轮未完成的数据
this.onData({
sessionId: this.conversationSessionId,
answer
});
}
}
catch (error) {
console.error('解析 SSE 数据失败:', error);
}
}
});
});
stream.on('end', () => {
this.onCompleted({
sessionId: this.conversationSessionId
});
});
stream.on('error', (error: Error) => {
console.error('流式响应错误:', error);
throw new Error(`流式响应处理失败:${error.message}`);
});
};
}
关键特性说明
1. 流式数据处理
使用 axios 的 responseType: 'stream' 处理 SSE 响应:
const response = await axios.post(CustomAgentService.QUERY_URL, params, {
responseType: 'stream',
headers: {
'Accept': 'text/event-stream'
}
});
// 处理 SSE 流式响应
this.processStreamData(response.data);
2. SSE 数据解析
手动解析 Server-Sent Events 格式:
stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
const lines = buffer.split('\\n');
lines.forEach(line => {
if (line.startsWith('data: ')) {
const data = line.slice(6); // 移除 'data: ' 前缀
if (data === '[DONE]') {
// 调用 Kit 的 onCompleted 方法
this.onCompleted({
sessionId: this.conversationSessionId
});
return;
}
const parsedMessage = JSON.parse(data);
// 处理消息...
}
});
});
3. 请求中断控制
支持 AbortController 中断请求:
this.abortController = new AbortController();
await axios.post(CustomAgentService.QUERY_URL, params, {
signal: this.abortController?.signal,
// ...其他配置
});
4. 错误处理
处理 axios 特定的错误类型:
catch (e) {
if (e instanceof Error) {
throw new Error(`请求对话API失败:${e.message}`);
}
// ...
}
注册到 Kit
import {createAaaSPilotKit} from '@bdky/aaas-pilot-kit';
import {CustomAgentService} from './CustomAgentService';
const kit = createAaaSPilotKit<CustomAgentService>({
// 其他配置...
agentService: CustomAgentService
});