Skip to main content

Axios-based Implementation

This document shows how to implement a custom AgentService using the axios HTTP client.

Install Dependencies

npm

$ npm install axios nanoid

yarn

$ yarn add axios nanoid

Complete Implementation Code

import axios, {AxiosError} from 'axios';
import {nanoid} from 'nanoid';
import {BaseAgentService} from '@bdky/aaas-pilot-kit';

// Need to inherit from BaseAgentService provided by Kit
export class CustomAgentService extends BaseAgentService {
static QUERY_URL = 'https://origin/api/sse';

abortController: AbortController | null = null;

// Dispose logic, can be customized
dispose = action(() => {
// Built-in base class method
this._dispose();
// Can implement according to your own logic
});

// Override base class query method to implement SSE streaming protocol
override query = async (text: string) => {
this.abortController = new AbortController();

const params = {
query: text,
// ID for this round of conversation
queryId: this.queryId,
// Session for entire conversation
sessionId: this.conversationSessionId
};

try {
const response = await axios.post(CustomAgentService.QUERY_URL, params, {
timeout: 300 * 1000,
signal: this.abortController?.signal,
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
'x-trace-id': this.queryId
},
responseType: 'stream'
});

// Process SSE streaming response
this.processStreamData(response.data);
}
catch (e) {
// TODO Business error handling

if (e instanceof Error) {
throw new Error(`Failed to request conversation API: ${e.message}`);
}
}
};

/**
* Process streaming data chunks
*/
private readonly processStreamData = (stream: any) => {
let buffer = '';

stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
const lines = buffer.split('\\n');

// Keep the last potentially incomplete line
buffer = lines.pop() || '';

lines.forEach(line => {
if (line.startsWith('data: ')) {
try {
const data = line.slice(6); // Remove 'data: ' prefix
if (data === '[DONE]') {
// Required: When streaming response completes, call Kit's onCompleted method to mark end of current round streaming content output
this.onCompleted({
sessionId: this.conversationSessionId
});
return;
}

const parsedMessage = JSON.parse(data);
// According to your own SSE interface protocol, parse and process corresponding fields, this is just an example
const {
answer,
sessionId,
action,
intentResult
} = parsedMessage;

// Recommended: Set sessionId
if (sessionId) {
this.setSessionId(sessionId);
}

if (answer) {
// Required: Call Kit's onData method to pass incomplete data from each round
this.onData({
sessionId: this.conversationSessionId,
answer
});
}
}
catch (error) {
console.error('Failed to parse SSE data:', error);
}
}
});
});

stream.on('end', () => {
this.onCompleted({
sessionId: this.conversationSessionId
});
});

stream.on('error', (error: Error) => {
console.error('Streaming response error:', error);
throw new Error(`Streaming response processing failed: ${error.message}`);
});
};
}

Key Feature Explanation

1. Streaming Data Processing

Use axios's responseType: 'stream' to handle SSE responses:

const response = await axios.post(CustomAgentService.QUERY_URL, params, {
responseType: 'stream',
headers: {
'Accept': 'text/event-stream'
}
});

// Process SSE streaming response
this.processStreamData(response.data);

2. SSE Data Parsing

Manually parse Server-Sent Events format:

stream.on('data', (chunk: Buffer) => {
buffer += chunk.toString();
const lines = buffer.split('\\n');

lines.forEach(line => {
if (line.startsWith('data: ')) {
const data = line.slice(6); // Remove 'data: ' prefix
if (data === '[DONE]') {
// Call Kit's onCompleted method
this.onCompleted({
sessionId: this.conversationSessionId
});
return;
}

const parsedMessage = JSON.parse(data);
// Process message...
}
});
});

3. Request Interrupt Control

Support AbortController to interrupt requests:

this.abortController = new AbortController();

await axios.post(CustomAgentService.QUERY_URL, params, {
signal: this.abortController?.signal,
// ... other configurations
});

4. Error Handling

Handle axios-specific error types:

catch (e) {
if (e instanceof Error) {
throw new Error(`Failed to request conversation API: ${e.message}`);
}
// ...
}

Register to Kit

import {createAaaSPilotKit} from '@bdky/aaas-pilot-kit';
import {CustomAgentService} from './CustomAgentService';

const kit = createAaaSPilotKit<CustomAgentService>({
// Other configurations...
agentService: CustomAgentService
});