在实时对话式 AI 场景下,你可能需要获取 AI 对话过程中智能体任务的实时状态变化消息以便在业务端及时进行后续处理或保证业务的稳定性。你可以通过接收相应的回调来获取这些状态消息。
你可以使用三种方式接收回调信息:
通过控制台配置接收状态变化消息
前往 RTC 控制台-功能配置,在回调设置中添加实时对话式 AI 相关配置。配置完成后,智能体任务状态发生变化或任务报错时,你配置的回调 URL 会收到回调消息。回调消息格式为 JSON 格式,具体参看 VoiceChat。
通过 StartVoiceChat 接口配置接收状态变化消息
将 StartVoiceChat.AgentConfig.EnableConversationStateCallback
设置为 true
,并填入ServerMessageURLForRTS
和 ServerMessageSignatureForRTS
,在智能体任务状态发生变化时,你配置的回调 URL 会收到回调消息。
收到的回调格式如下:
参数名 | 类型 | 描述 |
---|
message | String | Base 64 编码的二进制消息内容。长度不超过 48 KB。格式参看二进制消息格式。 |
signature | String | 鉴权签名。可与StartVoiceChat 接口中传入的ServerMessageSignatureForRTS 字段值进行对比以进行鉴权验证。 |
二进制消息格式如下:

参数名 | 类型 | 描述 |
---|
magic number | binary | 消息格式,固定为 conv 。 |
length | binary | 消息长度,单位为 bytes。存放方式为大端序。 |
conversion_status_message | binary | 消息详细信息。格式参看 conversion_status_message 格式。 |
conversion_status_message:
参数名 | 类型 | 描述 |
---|
TaskId | String | 智能体任务 ID。 |
UserID | String | 说话人 UserId。 |
RoundID | Int64 | 对话轮次。从 0 开始计数。 |
EventTime | Int64 | 该事件在 RTC 服务器上发生的 Unix 时间戳 (ms)。 |
Stage | Stage | 任务状态详细描述。 |
Stage
参数名 | 类型 | 描述 |
---|
Code | Int | 任务状态码。1 :智能体聆听中。2 :智能体思考中。3 :智能体说话中。4 :智能体被打断。5 :智能体说话完成。
|
Description | String | 任务状态描述。 |
你可以参考以下示例代码对回调信息中的 message
内容进行解析。
package main
import (
"encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
)
const (
conversationStageHeader = "conv"
exampleSignature = "example_signature"
)
type RtsMessage struct {
Message string `json:"message"`
Signature string `json:"signature"`
}
type Conv struct {
TaskID string `json:"TaskId"`
UserID string `json:"UserID"`
RoundID int64 `json:"RoundID"`
EventTime int64 `json:"EventTime"`
Stage StageInfo `json:"Stage"`
}
type StageInfo struct {
Code stageCode `json:"Code"`
Description string `json:"Description"`
}
type stageCode int
const (
_ stageCode = iota
listening
thinking
answering
interrupted
answerFinish
)
var (
stageListening = StageInfo{Code: listening, Description: "listening"}
stageThinking = StageInfo{Code: thinking, Description: "thinking"}
stageAnswering = StageInfo{Code: answering, Description: "answering"}
stageInterrupted = StageInfo{Code: interrupted, Description: "interrupted"}
stageAnswerFinish = StageInfo{Code: answerFinish, Description: "answerFinish"}
)
func HandleConversationStageMsg(c *gin.Context) {
msg := &RtsMessage{}
if err := c.BindJSON(&msg); err != nil {
fmt.Printf("BindJson failed,err:%v\n", err)
return
}
if msg.Signature != exampleSignature {
fmt.Printf("Signature not match\n")
return
}
conv, err := Unpack(msg.Message)
if err != nil {
fmt.Printf("Unpack failed,err:%v\n", err)
return
}
fmt.Println(conv)
//业务逻辑
c.String(200, "ok")
}
func Unpack(msg string) (*Conv, error) {
data, err := base64.StdEncoding.DecodeString(msg)
if err != nil {
return nil, fmt.Errorf("DecodeString failed,err:%v", err)
}
if len(data) < 8 {
return nil, fmt.Errorf("Data invalid")
}
dataHeader := string(data[:4])
if dataHeader != conversationStageHeader {
return nil, fmt.Errorf("Header not match")
}
dataSize := binary.BigEndian.Uint32(data[4:8])
if dataSize+8 != uint32(len(data)) {
return nil, fmt.Errorf("Size not match")
}
subData := data[8:]
conv := &Conv{}
err = json.Unmarshal(subData, conv)
if err != nil {
return nil, fmt.Errorf("Unmarshal failed,err:%v\n", err)
}
return conv, nil
}
func main() {
r := gin.Default()
r.POST("/example_domain/vertc/cstage", HandleConversationStageMsg)
r.Run()
}
通过客户端回调接收状态变化消息
将 StartVoiceChat.AgentConfig.EnableConversationStateCallback
设置为 true
,在智能体任务状态发生变化时,回调 onRoomBinaryMessageReceived 会收到回调消息。
返回的回调格式如下:
参数名 | 类型 | 描述 |
---|
uid | String | 消息发送者 ID。 |
message | String | Base 64 编码的二进制消息内容。长度不超过 64 KB。与服务端返回二进制消息格式相同,详细参看二进制消息格式。 |
你可以参考以下示例代码对回调信息中的 message
内容进行解析。
import VERTC from '@volcengine/rtc';
/**
* @brief TLV 数据格式转换成字符串
* @note TLV 数据格式
* | magic number | length(big-endian) | value |
* @param {ArrayBufferLike} tlvBuffer
* @returns
*/
function tlv2String(tlvBuffer: ArrayBufferLike) {
const typeBuffer = new Uint8Array(tlvBuffer, 0, 4);
const lengthBuffer = new Uint8Array(tlvBuffer, 4, 4);
const valueBuffer = new Uint8Array(tlvBuffer, 8);
let type = '';
for (let i = 0; i < typeBuffer.length; i++) {
type += String.fromCharCode(typeBuffer[i]);
}
const length =
(lengthBuffer[0] << 24) | (lengthBuffer[1] << 16) | (lengthBuffer[2] << 8) | lengthBuffer[3];
const value = new TextDecoder().decode(valueBuffer.subarray(0, length));
return { type, value };
};
/**
* @brief Room Message Handler
*/
function handleRoomBinaryMessageReceived(
event: {
userId: string;
message: ArrayBuffer;
},
) {
const { message } = event;
const { type, value } = tlv2String(message);
const data = JSON.parse(value);
const { EventTime, RoundID, Stage, TaskId, UserID } = data || {};
const { Code, Description } = Stage || {};
// 对 type、data 进行业务处理
console.log(type, EventTime, RoundID, TaskId, UserID, Code, Description);
// ...
}
/**
* @brief 监听房间内二进制消息
*/
this.engine.on(VERTC.events.onRoomBinaryMessageReceived, handleRoomBinaryMessageReceived);
//智能体状态结构体
import com.google.gson.annotations.SerializedName;
class Stage {
@SerializedName("Code")
/**
* 1: listening
* 2: thinking
* 3: answering
* 4: interrupted
* 5: answerFinish
*/
int code;
@SerializedName("Description")
String description;
public void setCode(int code) {
this.code = code;
}
public int getCode() {
return this.code;
}
public void setDescription(String description) {
this.description = description;
}
public String getDescription() {
return this.description;
}
}
public class ConversionStatusMessage {
@SerializedName("TaskId")
String taskId;
@SerializedName("UserID")
String userID;
@SerializedName("RoundID")
long roundID;
@SerializedName("EventTime")
long eventTime;
@SerializedName("Stage")
Stage stage;
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getTaskId() {
return this.taskId;
}
public void setUserID(String userID) {
this.userID = userID;
}
public String getUserID() {
return this.userID;
}
public void setRoundID(long roundID) {
this.roundID = roundID;
}
public long getRoundID() {
return this.roundID;
}
public void setEventTime(long eventTime) {
this.eventTime = eventTime;
}
public long getEventTime() {
return this.eventTime;
}
public void setStage(Stage stage) {
this.stage = stage;
}
public Stage getStage() {
return this.stage;
}
public String toString() {
String string = "taskId:" + taskId + ", " +
"userID:" + userID + ", " +
"roundID:" + roundID + ", " +
"eventTime:" + eventTime + ", " +
"stage:" + stage.code + " - " + stage.description;
System.out.println(string);
return string;
}
}
//解析消息
public void onRoomBinaryMessageReceived(String uid, ByteBuffer buffer) {
byte[] prefixBytes = new byte[4];
buffer.get(prefixBytes);
String prefix = new String(prefixBytes, StandardCharsets.UTF_8);
if (prefix.equals("conv")) {
int length = buffer.getInt();
byte[] jsonBytes = new byte[length];
buffer.get(jsonBytes);
String jsonContent = new String(jsonBytes, StandardCharsets.UTF_8);
ConversionStatusMessage statusMessage = new Gson().fromJson(jsonContent, ConversionStatusMessage.class);
System.out.println("Unpack_conv length:" + length + " " + statusMessage.toString());
} else {
System.out.println("Invalid message prefix:" + prefix);
}
}
struct ConversionStatusMessage
{
struct Stage
{
int code;
std::string description;
};
std::string taskId;
std::string userID;
int64_t roundID;
int64_t eventTime;
Stage stage;
};
void onRoomBinaryMessageReceived(const char *uid, int size, const uint8_t *message)
{
static const int headerSize = 8;
if (size < headerSize)
{
return;
}
uint32_t header = messageHeader(message);
if (header == 0x636F6E76U) // magic number "conv"
{
std::string strMessage = unpack(message, size);
if (!strMessage.empty())
{
unpackConversationStage(strMessage);
}
}
else
{
LOG_E << "unknow message header: " << header;
return;
}
}
void unpackConversationStage(const std::string &message)
{
// 解析 JSON 字符串
nlohmann::json json_data = nlohmann::json::parse(message);
// 存储解析后的数据
ConversionStatusMessage conversionStatusMessage;
conversionStatusMessage.taskId = json_data["TaskId"];
conversionStatusMessage.userID = json_data["UserID"];
conversionStatusMessage.roundID = json_data["RoundID"];
conversionStatusMessage.eventTime = json_data["EventTime"];
ConversionStatusMessage::Stage stage;
stage.code = json_data["Stage"]["Code"];
stage.description = json_data["Stage"]["Description"];
conversionStatusMessage.stage = stage;
}
#import <Foundation/Foundation.h>
typedef struct {
int code;
NSString *description;
} Stage;
@interface ConversionStatusMessage : NSObject
@property (nonatomic, copy) NSString *taskId;
@property (nonatomic, copy) NSString *userID;
@property (nonatomic, assign) int64_t roundID;
@property (nonatomic, assign) int64_t eventTime;
@property (nonatomic, assign) Stage stage;
@end
@implementation ConversionStatusMessage
@end
//回调
- (void)rtcRoom:( ByteRTCRoom *_Nonnull)rtcRoom onRoomBinaryMessageReceived:(NSString *_Nonnull)uid message:(NSData *_Nonnull)message {
NSString *strMessage = unpack(message);
if (strMessage) {
parseData(strMessage);
}
}
NSString *unpack(NSData *data) {
const int headerSize = 8;
NSUInteger size = data.length;
if (size < headerSize) {
return nil;
}
const uint8_t *message = data.bytes;
// Check magic number "conv"
uint32_t magic = (message[0] << 24) | (message[1] << 16) | (message[2] << 8) | message[3];
if (magic != 0x636F6E76) {
return nil;
}
// Get length
uint32_t length = (message[4] << 24) | (message[5] << 16) | (message[6] << 8) | message[7];
if (size - headerSize != length) {
return nil;
}
// Get conversionStatusMessage
NSString *conversionStatusMessage = nil;
if (length > 0) {
conversionStatusMessage = [[NSString alloc] initWithBytes:message + headerSize length:length encoding:NSUTF8StringEncoding];
} else {
conversionStatusMessage = @"";
}
return conversionStatusMessage;
}
void parseData(NSString *message) {
NSError *error = nil;
NSDictionary *json_data = [NSJSONSerialization JSONObjectWithData:[message dataUsingEncoding:NSUTF8StringEncoding] options:0 error:&error];
if (error || json_data == nil) {
NSLog(@"JSON parse error: %@", error);
return;
}
ConversionStatusMessage *conversionStatusMessage = [[ConversionStatusMessage alloc] init];
conversionStatusMessage.taskId = json_data[@"TaskId"];
conversionStatusMessage.userID = json_data[@"UserID"];
conversionStatusMessage.roundID = [json_data[@"RoundID"] longLongValue];
conversionStatusMessage.eventTime = [json_data[@"EventTime"] longLongValue];
Stage stage;
stage.code = [json_data[@"Stage"][@"Code"] intValue];
stage.description = json_data[@"Stage"][@"Description"];
conversionStatusMessage.stage = stage;
}