Skip to main content

基于 Ky 的实现 (推荐)

本文档展示如何使用内置的 Ky HTTP 客户端实现自定义的 AgentService。相比 Axios,Ky 对 SSE (Server-Sent Events) 流式响应有更好的原生支持,更适合 AI 对话场景。

内置 HTTP 模块

AaaS Pilot Kit 内置了完整的 HTTP 处理模块,提供了开箱即用的 HTTP 客户端和 SSE 支持:

  • ky - 基于 ky 的 HTTP 客户端,支持重试、超时、拦截器等高级特性
  • sseHook - 专门处理 Server-Sent Events (SSE) 的钩子函数,简化流式数据处理
优势

使用内置 HTTP 模块的好处:

  • 🎯 统一配置 - 预配置了适合对话场景的默认参数
  • 🔧 开箱即用 - 无需额外安装 HTTP 客户端依赖
  • 🚀 流式优化 - 原生支持 SSE 流式响应处理
  • 🛡️ 错误处理 - 内置完善的错误处理和重试机制

完整实现代码

// 从 Kit 内置 HTTP 模块导入 ky 客户端和 SSE Hook
import {ky, sseHook} from '@bdky/aaas-pilot-kit/http';
import {v4 as uuid} from 'uuid';
import {BaseAgentService} from '@bdky/aaas-pilot-kit';

// 需要继承 BaseAgentService Kit 提供的基础 AgentService 类
export class CustomAgentService extends BaseAgentService {
static QUERY_URL = 'https://origin/api/sse';

abortController: AbortController | null = null;

// 销毁逻辑,可自定义
dispose = action(() => {
// 基类内置方法
this._dispose();
// 可按自己的逻辑实现
});

// 重写基类 query 方法实现 SSE 流式协议
override query = async (text: string) => {
this.abortController = new AbortController();

const hook = sseHook.createHook({
onData: data => {
const parsedMessage = JSON.parse(data);
const {
answer,
sessionId,
action,
intentResult
} = parsedMessage;


// 建议:设置 sessionId
if (sessionId) {
this.setSessionId(sessionId);
}

if (answer) {
// 必须:调用 Kit 的 onData 方法传递每轮未完成的数据
this.onData({
sessionId: this.conversationSessionId,
answer
});
}
},
onCompleted: () => {
// 必须:每轮流式响应完成,调用 Kit 的 onCompleted 方法标记当前轮流式内容输出结束
this.onCompleted({
sessionId: this.conversationSessionId
});
}
});

const params = {
query: text,
// 一轮对话的ID
queryId: this.queryId,
// 整个会话 Session
sessionId: this.conversationSessionId
};

try {
await ky.post(CustomAgentService.QUERY_URL, {
retry: 0,
timeout: 300 * 1000,
signal: this.abortController?.signal,
headers: {
'x-trace-id': this.queryId
},
hooks: {
// 创建 hook 并绑定
afterResponse: [hook]
},
json: {
...params
}
});
}
catch (e) {
// TODO 业务错误处理

if (e instanceof Error) {
throw new Error(`请求对话API失败:${e.message}`);
}
}
};
}

关键特性说明

1. SSE 流式处理

使用内置的 sseHook 处理 Server-Sent Events 响应:

// 使用内置的 sseHook 创建 SSE 处理钩子
const hook = sseHook.createHook({
onData: data => {
// 处理每个数据块
const parsedMessage = JSON.parse(data);
// 调用 Kit 的 onData 方法传递数据
this.onData({
sessionId: this.conversationSessionId,
answer: parsedMessage.answer
});
},
onCompleted: () => {
// 流式响应完成
this.onCompleted({
sessionId: this.conversationSessionId
});
}
});

2. 请求中断控制

内置 ky 客户端支持请求中断和资源清理:

this.abortController = new AbortController();

// 使用内置 ky 客户端发送请求,支持 AbortController
await ky.post(postUrl, {
signal: this.abortController?.signal,
// ...其他配置
});

3. 错误处理

内置 ky 客户端提供完整的 HTTP 错误状态码处理:

catch (e) {
if (e instanceof Error) {
throw new Error(`请求对话API失败:${e.message}`);
}
// 内置错误处理机制会自动处理网络错误、超时等情况
}

注册到 Kit

import {createAaaSPilotKit} from '@bdky/aaas-pilot-kit';
import {CustomAgentService} from './CustomAgentService';

const kit = createAaaSPilotKit<CustomAgentService>({
// 其他配置...
agentService: CustomAgentService
});