Skip to main content

基于 Axios 的实现

本文档展示如何使用 axios HTTP 客户端实现自定义的 AgentService。

安装依赖

npm

$ npm install axios nanoid

yarn

$ yarn add axios nanoid

完整实现代码

import axios, {AxiosError} from 'axios';
import {nanoid} from 'nanoid';
import {BaseAgentService} from '@bdky/aaas-pilot-kit';

// 需要继承 BaseAgentService Kit 提供的基础 AgentService 类
export class CustomAgentService extends BaseAgentService {
static QUERY_URL = 'https://origin/api/sse';

abortController: AbortController | null = null;

// 销毁逻辑,可自定义
dispose = action(() => {
// 基类内置方法
this._dispose();
// 可按自己的逻辑实现
});

// 重写基类 query 方法实现 SSE 流式协议
override query = async (text: string) => {
this.abortController = new AbortController();

const params = {
query: text,
// 一轮对话的ID
queryId: this.queryId,
// 整个会话 Session
sessionId: this.conversationSessionId
};

try {
const response = await axios.post(CustomAgentService.QUERY_URL, params, {
timeout: 300 * 1000,
signal: this.abortController?.signal,
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
'x-trace-id': this.queryId
},
responseType: 'stream'
});

// 处理 SSE 流式响应
this.processStreamData(response.data);
}
catch (e) {
// TODO 业务错误处理

if (e instanceof Error) {
throw new Error(`请求对话API失败:${e.message}`);
}
}
};

/**
* 处理流式数据块
*/
private readonly processStreamData = (stream: any) => {
let buffer = '';

stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
const lines = buffer.split('\\n');

// 保留最后一个可能不完整的行
buffer = lines.pop() || '';

lines.forEach(line => {
if (line.startsWith('data: ')) {
try {
const data = line.slice(6); // 移除 'data: ' 前缀
if (data === '[DONE]') {
// 必须:每轮流式响应完成,调用 Kit 的 onCompleted 方法标记当前轮流式内容输出结束
this.onCompleted({
sessionId: this.conversationSessionId
});
return;
}

const parsedMessage = JSON.parse(data);
// 根据您自己的 SSE 接口协议,解析并处理对应字段,这里只是给个示例
const {
answer,
sessionId,
action,
intentResult
} = parsedMessage;

// 建议:设置 sessionId
if (sessionId) {
this.setSessionId(sessionId);
}

if (answer) {
// 必须:调用 Kit 的 onData 方法传递每轮未完成的数据
this.onData({
sessionId: this.conversationSessionId,
answer
});
}
}
catch (error) {
console.error('解析 SSE 数据失败:', error);
}
}
});
});

stream.on('end', () => {
this.onCompleted({
sessionId: this.conversationSessionId
});
});

stream.on('error', (error: Error) => {
console.error('流式响应错误:', error);
throw new Error(`流式响应处理失败:${error.message}`);
});
};
}

关键特性说明

1. 流式数据处理

使用 axios 的 responseType: 'stream' 处理 SSE 响应:

const response = await axios.post(CustomAgentService.QUERY_URL, params, {
responseType: 'stream',
headers: {
'Accept': 'text/event-stream'
}
});

// 处理 SSE 流式响应
this.processStreamData(response.data);

2. SSE 数据解析

手动解析 Server-Sent Events 格式:

stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
const lines = buffer.split('\\n');

lines.forEach(line => {
if (line.startsWith('data: ')) {
const data = line.slice(6); // 移除 'data: ' 前缀
if (data === '[DONE]') {
// 调用 Kit 的 onCompleted 方法
this.onCompleted({
sessionId: this.conversationSessionId
});
return;
}

const parsedMessage = JSON.parse(data);
// 处理消息...
}
});
});

3. 请求中断控制

支持 AbortController 中断请求:

this.abortController = new AbortController();

await axios.post(CustomAgentService.QUERY_URL, params, {
signal: this.abortController?.signal,
// ...其他配置
});

4. 错误处理

处理 axios 特定的错误类型:

catch (e) {
if (e instanceof Error) {
throw new Error(`请求对话API失败:${e.message}`);
}
// ...
}

注册到 Kit

import {createAaaSPilotKit} from '@bdky/aaas-pilot-kit';
import {CustomAgentService} from './CustomAgentService';

const kit = createAaaSPilotKit<CustomAgentService>({
// 其他配置...
agentService: CustomAgentService
});