在实时对话式 AI 场景下,你可以通过回调机制接收智能体状态,如聆听中、思考中、说话中、被打断等,可用于在客户端实时展示智能体状态或在服务端进行存储与分析。
接收方式
支持通过客户端或服务端两种方式接收智能体状态信息,它们的适用场景有所不同:
- 客户端接收(推荐):延迟更低,可靠性更高,适用于需要在终端实时展示智能体状态的场景。
- 服务端接收:适用于需要持久化存储、进行二次分析,或在客户端性能受限、不便处理实时二进制消息的场景。
通过客户端接收
实现方式
- 调用 StartVoiceChat 接口将
AgentConfig.EnableConversationStateCallback 设置为 true。 - 在客户端监听房间内广播二进制消息的回调 onRoomBinaryMessageReceived。
- 不同端回调名称可能有差异,具体请参见客户端 API 参考。
- 其中,C 语言端接口名称为
on_message_received。
- 解析消息。具体操作,请参见解析消息。
当智能体状态发生变化时,客户端将通过该回调收到一个二进制消息。你需要对该消息进行解析,才能获取具体的状态信息。
解析消息
收到的智能体状态消息的回调格式如下:
| 参数名 | 类型 | 描述 |
|---|
| uid | String | 消息发送者 ID。 |
| message | String | 二进制消息内容。长度不超过 64 KB。与服务端返回二进制消息格式相同,详细参看二进制消息格式。 |
你可以参考以下示例代码对回调信息中的 message 内容进行解析。
import VERTC from '@volcengine/rtc';
/**
* @brief TLV 数据格式转换成字符串
* @note TLV 数据格式
* | magic number | length(big-endian) | value |
* @param {ArrayBufferLike} tlvBuffer
* @returns
*/
function tlv2String(tlvBuffer: ArrayBufferLike) {
const typeBuffer = new Uint8Array(tlvBuffer, 0, 4);
const lengthBuffer = new Uint8Array(tlvBuffer, 4, 4);
const valueBuffer = new Uint8Array(tlvBuffer, 8);
let type = '';
for (let i = 0; i < typeBuffer.length; i++) {
type += String.fromCharCode(typeBuffer[i]);
}
const length =
(lengthBuffer[0] << 24) | (lengthBuffer[1] << 16) | (lengthBuffer[2] << 8) | lengthBuffer[3];
const value = new TextDecoder().decode(valueBuffer.subarray(0, length));
return { type, value };
};
/**
* @brief Room Message Handler
*/
function handleRoomBinaryMessageReceived(
event: {
userId: string;
message: ArrayBuffer;
},
) {
const { message } = event;
const { type, value } = tlv2String(message);
const data = JSON.parse(value);
const { EventTime, RoundID, Stage, TaskId, UserID, ErrorInfo } = data || {};
const { Code, Description } = Stage || {};
// 对 type、data 进行业务处理
console.log(type, EventTime, RoundID, TaskId, UserID, Code, Description);
if (ErrorInfo) {
const { ErrorCode, Reason } = ErrorInfo;
console.log('Error Occurred:', ErrorCode, Reason);
}
// ...
}
/**
* @brief 监听房间内二进制消息
*/
this.engine.on(VERTC.events.onRoomBinaryMessageReceived, handleRoomBinaryMessageReceived);
//智能体状态结构体
import com.google.gson.annotations.SerializedName;
class ErrorDetail {
@SerializedName("ErrorCode")
int errorCode;
@SerializedName("Reason")
String reason;
}
class Stage {
@SerializedName("Code")
/**
* 0: errorOccurred
* 1: listening
* 2: thinking
* 3: answering
* 4: interrupted
* 5: answerFinish
*/
int code;
@SerializedName("Description")
String description;
}
public class conversationStatusMessage {
@SerializedName("TaskId")
String taskId;
@SerializedName("UserID")
String userID;
@SerializedName("RoundID")
long roundID;
@SerializedName("EventTime")
long eventTime;
@SerializedName("Stage")
Stage stage;
@SerializedName("ErrorInfo")
ErrorDetail errorInfo;
public String toString() {
String string = "taskId:" + taskId + ", " +
"userID:" + userID + ", " +
"roundID:" + roundID + ", " +
"eventTime:" + eventTime + ", " +
"stage:" + stage.code + " - " + stage.description;
if (errorInfo != null) {
string += ", errorCode:" + errorInfo.errorCode + ", reason:" + errorInfo.reason;
}
System.out.println(string);
return string;
}
}
//解析消息
public void onRoomBinaryMessageReceived(String uid, ByteBuffer buffer) {
byte[] prefixBytes = new byte[4];
buffer.get(prefixBytes);
String prefix = new String(prefixBytes, StandardCharsets.UTF_8);
if (prefix.equals("conv")) {
int length = buffer.getInt();
byte[] jsonBytes = new byte[length];
buffer.get(jsonBytes);
String jsonContent = new String(jsonBytes, StandardCharsets.UTF_8);
conversationStatusMessage statusMessage = new Gson().fromJson(jsonContent, conversationStatusMessage.class);
System.out.println("Unpack_conv length:" + length + " " + statusMessage.toString());
} else {
System.out.println("Invalid message prefix:" + prefix);
}
}
struct ErrorDetail {
int code;
std::string reason;
};
struct conversationStatusMessage
{
struct Stage
{
int code;
std::string description;
};
std::string taskId;
std::string userID;
int64_t roundID;
int64_t eventTime;
Stage stage;
std::optional<ErrorDetail> errorInfo;
};
void onRoomBinaryMessageReceived(const char *uid, int size, const uint8_t *message)
{
static const int headerSize = 8;
if (size < headerSize)
{
return;
}
uint32_t header = messageHeader(message);
if (header == 0x636F6E76U) // magic number "conv"
{
std::string strMessage = unpack(message, size);
if (!strMessage.empty())
{
unpackConversationStage(strMessage);
}
}
else
{
LOG_E << "unknow message header: " << header;
return;
}
}
void unpackConversationStage(const std::string &message)
{
// 解析 JSON 字符串
nlohmann::json json_data = nlohmann::json::parse(message);
// 存储解析后的数据
conversationStatusMessage conversationStatusMessage;
conversationStatusMessage.taskId = json_data["TaskId"];
conversationStatusMessage.userID = json_data["UserID"];
conversationStatusMessage.roundID = json_data["RoundID"];
conversationStatusMessage.eventTime = json_data["EventTime"];
conversationStatusMessage::Stage stage;
stage.code = json_data["Stage"]["Code"];
stage.description = json_data["Stage"]["Description"];
conversationStatusMessage.stage = stage;
if (json_data.contains("ErrorInfo") && !json_data["ErrorInfo"].is_null()) {
ErrorDetail errorDetail;
errorDetail.code = json_data["ErrorInfo"]["ErrorCode"];
errorDetail.reason = json_data["ErrorInfo"]["Reason"];
conversationStatusMessage.errorInfo = errorDetail;
}
}
#import <Foundation/Foundation.h>
typedef struct {
int code;
NSString *reason;
} ErrorDetail;
typedef struct {
int code;
NSString *description;
} Stage;
@interface conversationStatusMessage : NSObject
@property (nonatomic, copy) NSString *taskId;
@property (nonatomic, copy) NSString *userID;
@property (nonatomic, assign) int64_t roundID;
@property (nonatomic, assign) int64_t eventTime;
@property (nonatomic, assign) Stage stage;
@property (nonatomic, assign) ErrorDetail errorInfo;
@property (nonatomic, assign) BOOL hasErrorInfo;
@end
@implementation conversationStatusMessage
@end
//回调
- (void)rtcRoom:( ByteRTCRoom *_Nonnull)rtcRoom onRoomBinaryMessageReceived:(NSString *_Nonnull)uid message:(NSData *_Nonnull)message {
NSString *strMessage = unpack(message);
if (strMessage) {
parseData(strMessage);
}
}
NSString *unpack(NSData *data) {
const int headerSize = 8;
NSUInteger size = data.length;
if (size < headerSize) {
return nil;
}
const uint8_t *message = data.bytes;
// Check magic number "conv"
uint32_t magic = (message[0] << 24) | (message[1] << 16) | (message[2] << 8) | message[3];
if (magic != 0x636F6E76) {
return nil;
}
// Get length
uint32_t length = (message[4] << 24) | (message[5] << 16) | (message[6] << 8) | message[7];
if (size - headerSize != length) {
return nil;
}
// Get conversationStatusMessage
NSString *conversationStatusMessage = nil;
if (length > 0) {
conversationStatusMessage = [[NSString alloc] initWithBytes:message + headerSize length:length encoding:NSUTF8StringEncoding];
} else {
conversationStatusMessage = @"";
}
return conversationStatusMessage;
}
void parseData(NSString *message) {
NSError *error = nil;
NSDictionary *json_data = [NSJSONSerialization JSONObjectWithData:[message dataUsingEncoding:NSUTF8StringEncoding] options:0 error:&error];
if (error || json_data == nil) {
NSLog(@"JSON parse error: %@", error);
return;
}
conversationStatusMessage *statusMsg = [[conversationStatusMessage alloc] init];
statusMsg.taskId = json_data[@"TaskId"];
statusMsg.userID = json_data[@"UserID"];
statusMsg.roundID = [json_data[@"RoundID"] longLongValue];
statusMsg.eventTime = [json_data[@"EventTime"] longLongValue];
Stage stage;
stage.code = [json_data[@"Stage"][@"Code"] intValue];
stage.description = json_data[@"Stage"][@"Description"];
statusMsg.stage = stage;
if (json_data[@"ErrorInfo"] && json_data[@"ErrorInfo"] != [NSNull null]) {
ErrorDetail errorDetail;
errorDetail.code = [json_data[@"ErrorInfo"][@"ErrorCode"] intValue];
errorDetail.reason = json_data[@"ErrorInfo"][@"Reason"];
statusMsg.errorInfo = errorDetail;
statusMsg.hasErrorInfo = YES;
} else {
statusMsg.hasErrorInfo = NO;
}
}
#define CONVERSATION_MAGIC_HEADER_SIZE 8
#define CONVERSATION_MAGIC_STATUS_STR "conv"
static bool _is_target_message(const uint8_t* message, const char* target) {
if (message == NULL || target == NULL) {
return false;
}
// Check if the first 4 bytes match the magic number for "conv"
if (*(const uint32_t*)message != *(const uint32_t*)target) {
return false;
}
return true;
}
static void on_conversation_status_message_received(byte_rtc_engine_t engine, const cJSON* root) {
cJSON* stage_obj = cJSON_GetObjectItem(root, "Stage");
if (stage_obj != NULL) {
cJSON* code_obj = cJSON_GetObjectItem(stage_obj, "Code");
if (code_obj != NULL) {
printf("conversation status message, code: %d\n", (int)cJSON_GetNumberValue(code_obj));
}
cJSON* description_obj = cJSON_GetObjectItem(stage_obj, "Description");
if (description_obj != NULL) {
printf("conversation status message, description: %s\n", cJSON_GetStringValue(description_obj));
}
}
cJSON* task_id_obj = cJSON_GetObjectItem(root, "TaskId");
if (task_id_obj != NULL) {
printf("conversation status message, task_id: %s\n", cJSON_GetStringValue(task_id_obj));
}
cJSON* user_id_obj = cJSON_GetObjectItem(root, "UserID");
if (user_id_obj != NULL) {
printf("conversation status message, user_id: %s\n", cJSON_GetStringValue(user_id_obj));
}
cJSON* round_id_obj = cJSON_GetObjectItem(root, "RoundID");
if (round_id_obj != NULL) {
printf("conversation status message, round_id: %d\n", (int)cJSON_GetNumberValue(round_id_obj));
}
cJSON* event_time_obj = cJSON_GetObjectItem(root, "EventTime");
if (event_time_obj != NULL) {
printf("conversation status message, event_time: %lld\n", (long long)cJSON_GetNumberValue(event_time_obj));
}
cJSON* error_info_obj = cJSON_GetObjectItem(root, "ErrorInfo");
if (error_info_obj != NULL && cJSON_IsObject(error_info_obj)) {
cJSON* error_code_obj = cJSON_GetObjectItem(error_info_obj, "ErrorCode");
if (error_code_obj != NULL) {
printf("error info, code: %d\n", (int)cJSON_GetNumberValue(error_code_obj));
}
cJSON* reason_obj = cJSON_GetObjectItem(error_info_obj, "Reason");
if (reason_obj != NULL) {
printf("error info, reason: %s\n", cJSON_GetStringValue(reason_obj));
}
}
}
// 回调函数
static void on_message_received(byte_rtc_engine_t engine, const char* room, const char* src, const uint8_t* message, int size, bool binary) {
cJSON* root = NULL;
if (size < CONVERSATION_MAGIC_HEADER_SIZE) {
return;
}
if (!_is_target_message(message, CONVERSATION_MAGIC_STATUS_STR)) {
// 非智能体状态消息
return;
}
root = cJSON_Parse((const char*)(message + CONVERSATION_MAGIC_HEADER_SIZE));
if (NULL == root) {
return;
}
on_conversation_status_message_received(engine, root);
cJSON_Delete(root);
}
通过服务端接收
实现方式
- 调用 StartVoiceChat 接口时,需配置如下参数:
// 调用 StartVoiceChat 接口时 AgentConfig 部分示例
"AgentConfig": {
"EnableConversationStateCallback": true,
// ... 其他 AgentConfig 参数
"ServerMessageURLForRTS": "YOUR_SERVER_CALLBACK_URL", // 必填,接收回调的 URL
"ServerMessageSignatureForRTS": "YOUR_SIGNATURE" // 必填,签名密钥,用于验证回调来源的安全性
}
- 解析消息。具体操作,请参见解析消息。
当智能体状态发生变化时,配置的回调 URL 会收到一个包含二进制数据的 HTTP POST 请求消息。你需要对消息体进行解析,提取状态信息。
解析消息
收到的回调格式如下:
| 参数名 | 类型 | 描述 |
|---|
| message | String | Base64 编码的二进制消息内容。长度不超过 48 KB。格式参看二进制消息格式。 |
| signature | String | 鉴权签名。可与 StartVoiceChat 接口中传入的 ServerMessageSignatureForRTS 字段值进行对比以进行鉴权验证。 |
你可以参考以下示例代码对回调信息中的 message 内容进行解析。
package main
import (
"encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
)
const (
conversationStageHeader = "conv"
exampleSignature = "example_signature"
)
type RtsMessage struct {
Message string `json:"message"`
Signature string `json:"signature"`
}
type Conv struct {
TaskID string `json:"TaskId"`
UserID string `json:"UserID"`
RoundID int64 `json:"RoundID"`
EventTime int64 `json:"EventTime"`
Stage StageInfo `json:"Stage"`
}
type StageInfo struct {
Code stageCode `json:"Code"`
Description string `json:"Description"`
}
type stageCode int
const (
_ stageCode = iota
listening
thinking
answering
interrupted
answerFinish
)
var (
stageListening = StageInfo{Code: listening, Description: "listening"}
stageThinking = StageInfo{Code: thinking, Description: "thinking"}
stageAnswering = StageInfo{Code: answering, Description: "answering"}
stageInterrupted = StageInfo{Code: interrupted, Description: "interrupted"}
stageAnswerFinish = StageInfo{Code: answerFinish, Description: "answerFinish"}
)
func HandleConversationStageMsg(c *gin.Context) {
msg := &RtsMessage{}
if err := c.BindJSON(&msg); err != nil {
fmt.Printf("BindJson failed,err:%v\n", err)
return
}
if msg.Signature != exampleSignature {
fmt.Printf("Signature not match\n")
return
}
conv, err := Unpack(msg.Message)
if err != nil {
fmt.Printf("Unpack failed,err:%v\n", err)
return
}
fmt.Println(conv)
//业务逻辑
c.String(200, "ok")
}
func Unpack(msg string) (*Conv, error) {
data, err := base64.StdEncoding.DecodeString(msg)
if err != nil {
return nil, fmt.Errorf("DecodeString failed,err:%v", err)
}
if len(data) < 8 {
return nil, fmt.Errorf("Data invalid")
}
dataHeader := string(data[:4])
if dataHeader != conversationStageHeader {
return nil, fmt.Errorf("Header not match")
}
dataSize := binary.BigEndian.Uint32(data[4:8])
if dataSize+8 != uint32(len(data)) {
return nil, fmt.Errorf("Size not match")
}
subData := data[8:]
conv := &Conv{}
err = json.Unmarshal(subData, conv)
if err != nil {
return nil, fmt.Errorf("Unmarshal failed,err:%v\n", err)
}
return conv, nil
}
func main() {
r := gin.Default()
r.POST("/example_domain/vertc/cstage", HandleConversationStageMsg)
r.Run()
}
附录
二进制消息格式
二进制消息格式如下:

| 参数名 | 类型 | 描述 |
|---|
| magic number | binary | 消息格式,固定为 conv。 |
| length | binary | 消息长度,单位为 bytes。存放方式为大端序。 |
| conversation_status_message | binary | 消息详细信息。格式参看 conversation_status_message 格式。 |
conversation_status_message:
| 参数名 | 类型 | 描述 |
|---|
| TaskId | String | 智能体任务 ID。 |
| UserID | String | 说话人 UserId。 |
| RoundID | Int64 | 对话轮次。从 0 开始计数。 |
| EventTime | Int64 | 该事件在 RTC 服务器上发生的 Unix 时间戳 (ms)。 |
| Stage | Stage | 任务状态详细描述。 |
| ErrorInfo | ErrorDetail | 任务错误详细信息。仅当 Stage.Code 为 0 时,会返回该字段。 |
Stage
| 参数名 | 类型 | 描述 |
|---|
| Code | Int | 任务状态码。- 0:任务发生错误。
- 1:智能体聆听中。
- 2:智能体思考中。
- 3:智能体说话中。
- 4:智能体被打断。
- 5:智能体说话完成。
|
| Description | String | 任务状态描述。 |
ErrorDetail
| 参数名 | 类型 | 描述 |
|---|
| Code | Int | 错误状态码。详细定义,请参见错误码。 |
| Reason | String | 错误详细原因。 |