You need to enable JavaScript to run this app.
导航
最佳实践:数据消费接口调用获取内容
最近更新时间:2024.09.14 15:41:20首次发布时间:2024.01.11 10:56:25

本文档介绍数据实时消费接口调用的最佳实践。

概述

客户侧在调用内容洞察平台的接口时,会存在一定的调用门槛,因此,特提供常规技术栈的数据实时消费接口调用的最佳实践。

备注:
发文实时流式接口文档:数据消费接口


说明

本文档仅适用于对接 【数据消费接口】 的历史客户使用,后续会下线,新客户使用BMQ,请参考:最佳实践:BMQ消息队列消费内容数据


最佳实践

Golang

核心:利用Golang轻量级协程的异步属性,扩大请求的并发量来提高单机的整体消费QPS。

在测试条件下,平均单接口的QPS可达40+

示例代码中包含一些额外的状态管理,以满足长时间的持续消费:

  1. access_token的过期刷新机制

  2. 连接错误后的自动重试

使用须知:
该Client正常情况下不会退出,如果不需要持续不断地消费,需要自行控制停止请求的时机(可以通过os.Exit(0)log.Fatal()来退出进程)

Go版本>=1.18可用

// package main
// filepath: client.go
//  @update 2023-08-21 03:49:49
package main

import (
    "context"
    "regexp"
    "strconv"
    "time"
)

func main() {
    for colStart < colEnd {
        end := colStart + colStep - 1
        collectors := strconv.FormatInt(colStart, 10) + "-" + strconv.FormatInt(colEnd, 10)
        subCtx := &subMsgContext{
            Context:     context.Background(),
            collectors:  collectors,
            startOffset: startOffset,
        }
        wg.Add(1)
        go subCtx.subMsg()
        colStart = end + 1
    }
    time.Sleep(time.Second * 1)
    wg.Wait()
}

// Split a line into fields separated by the ':' character
func splitFields(line string) []string {
    return regexp.MustCompile(`data:\s*`).Split(line, -1)
}
// package main
// filepath: client_var.go
//  @update 2023-08-21 03:50:40
package main

import (
    "context"
    "os"
    "sync"
    "time"

    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
)

// AccessTokenResp Storing Access Token Response
//
//  @update 2023-08-07 02:02:41
type AccessTokenResp struct {
    Data struct {
        AccessToken string `json:"access_token"`
        Expire      int    `json:"expire"`
    } `json:"data"`
    Message string `json:"message"`
    Status  int    `json:"status"`
}

// StreamMessage Storing Stream Message Data
//
//  @update 2023-08-14 05:50:34
type StreamMessage struct {
    MsgID       string `json:"msg_id"`
    Nonce       []byte `json:"nonce,omitempty"`
    EncryptItem []byte `json:"encrypt_item,omitempty"`
}

type subMsgContext struct {
    retryCnt    int
    collectors  string
    startOffset int64
    token       string
    context.Context
}

// AccessTokenStruct  represents the access token and with mutex lock
//
//  @update 2023-08-07 02:20:05
type AccessTokenStruct struct {
    token    string
    expireAt time.Time
    *sync.RWMutex
}

func init() {
    config := zap.NewProductionEncoderConfig()
    consoleWriter := zapcore.AddSync(os.Stdout)
    core := zapcore.NewCore(zapcore.NewConsoleEncoder(config), consoleWriter, zapcore.DebugLevel)
    zapLogger := zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))
    logger = zapLogger.Sugar()
}

// AccessToken represents the access token
//
//  @update 2023-08-07 02:20:03
var (
    AccessToken = &AccessTokenStruct{RWMutex: &sync.RWMutex{}}
    logger      *zap.SugaredLogger
)

var wg = &sync.WaitGroup{}

// Url path
var (
    insightBaseURL = "https://insight.volcengineapi.com"
    oauthTokenPath = "/oauth/access_token"
    streamReqPath  = "/openapi/item/sse/stream"
)

// User configuration
var (
    volcBizName = "[YOUR_ACCOUNT_ID_HERE]" // !火山账号ID
    volcSecret  = "[YOUR_SECRET_HERE]"     // !从火山控制台-内容洞察概览页获取
    volcAesKey  = "[YOUR_AES_KEY_HERE]"    // !AES Key从火山控制台-内容洞察概览页获取
)

// User Env
var (
    colStart    = MustAtoi(os.Getenv("COLLECTOR_START"))
    colStep     = MustAtoi(os.Getenv("COLLECTOR_STEP"))
    colEnd      = MustAtoi(os.Getenv("COLLECTOR_END"))
    startOffset = MustAtoi(os.Getenv("START_OFFSET"))
)
// package main
// filepath: client_ctx.go
//  @update 2023-08-21 03:50:55
package main

import (
    "bufio"
    "context"
    "crypto/aes"
    "crypto/cipher"
    "strconv"
    "time"
    
    
    "github.com/cloudwego/hertz/pkg/app/client"
    json "github.com/bytedance/sonic"
    "github.com/cloudwego/hertz/pkg/protocol"
    "github.com/cloudwego/hertz/pkg/protocol/consts"
    "github.com/pkg/errors"
)

// RefreshToken RefreshToken if it's expired, then get a new token, will panic if error is returned
//
//  @receiver ctx *subMsgContext
//  @param baseURL string
//  @update 2023-08-07 02:02:38
func (ctx *subMsgContext) RefreshToken(force bool) {
    if force || AccessToken.expireAt.Local().Before(time.Now().Local()) {
        if AccessToken.TryLock() {
            defer AccessToken.Unlock()
            err := GetAccessToken()
            ctx.token = AccessToken.token
            if err != nil {
                logger.Panicf("GetAccessToken error: %v", err)
            }
        } else {
            for !AccessToken.TryLock() {
            }
            AccessToken.Unlock()
            time.Sleep(time.Second * 3) // waiting for refreshing token
            ctx.token = AccessToken.token
        }
    }
}

// recoverSubMsg Recover from subMsg Goroutines, with retry logic
//
//  @receiver ctx *subMsgContext
//  @param baseURL string
//  @update 2023-08-07 02:02:36
func (ctx *subMsgContext) recoverSubMsg() {
    // 无论如何都要重试,即使服务端已经发送完了数据
    ctx.startOffset = time.Now().Unix() - 180
    if err := recover(); err != nil {
        logger.Errorf("SubMsg Error with collector: %s, with error: %+v, will retry in 1s...", ctx.collectors, err)
        time.Sleep(time.Second * 1)
        go ctx.subMsg()
    } else {
        //! 当前的数据已经发完,等待更长的时间获取新的数据
        logger.Errorf("SubMsg Done with collector: %+v, will retry in 10s...", ctx.collectors)
        time.Sleep(time.Second * 10)
        go ctx.subMsg()
    }
}

// subMsg receives subscription message from server send event
//
//  @receiver ctx *subMsgContext
//  @param baseURL string
//  @update 2023-08-07 02:02:33
func (ctx *subMsgContext) subMsg() {
    defer ctx.recoverSubMsg()
    ctx.RefreshToken(false)
    var (
        req = &protocol.Request{}
        rsp = &protocol.Response{}
        err error
    )

    // building http request
    req.SetMethod(consts.MethodGet)
    req.SetRequestURI(insightBaseURL + streamReqPath)
    queryStr := "collector=" + ctx.collectors
    if ctx.startOffset != 0 {
        queryStr += "&start_offset=" + strconv.FormatInt(ctx.startOffset, 10)
    }
    req.SetQueryString(queryStr)
    req.SetHeaders(
        map[string]string{
            "X-Real-Ip":              "180.184.86.50",
            "X-Insight-Biz-Name":     volcBizName,
            "X-Insight-Access-Token": ctx.token,
        },
    )

    c, err := client.NewClient(
        client.WithResponseBodyStream(true),
        client.WithTLSConfig(&tls.Config{
            InsecureSkipVerify: true,
        }),
    )
    if err != nil {
        logger.Panic(err)
    }
    err = c.Do(ctx, req, rsp)
    if err != nil {
        logger.Panic(err)
    }

    // Create a new scanner to read the SSE data as a stream
    const maxTokenSize = 1024 * 1024 // 1 MB
    scanner := bufio.NewScanner(rsp.BodyStream())
    scanner.Buffer(make([]byte, maxTokenSize), maxTokenSize)
    // Set the scanner to split on newlines
    scanner.Split(bufio.ScanLines)

    key := []byte(volcAesKey)
    // Loop to read SSE events from the scanner
    for scanner.Scan() {
        line := scanner.Text()
        // Check if the line is a comment or a data line
        if len(line) == 0 || line[0] == ':' {
            continue
        }

        // Split the line into event and data fields
        for _, field := range splitFields(line) {
            if field != "" {
                var bizOutItem *StreamMessage
                if field == "unauthorized" {
                    logger.Panic(field)
                }
                err = json.Unmarshal([]byte(field), &bizOutItem)
                if err != nil {
                    logger.Warn("json unmarshal %+v", err)
                }
                iv := bizOutItem.Nonce
                decryptMsg, err := CBCDecrypter(bizOutItem.EncryptItem, key, iv)
                if err != nil {
                    logger.Warn("decrypt message err %+v, msg %v", err, decryptMsg)
                }
                // logger.Infof("recv: %s %s", ctx.collectors, decryptMsg[:10])
                // TODO: Here's where the post decrypted, capture the post here.
            }
        }
    }

    // Check if there was an error reading from the scanner
    if err := scanner.Err(); err != nil {
        logger.Panic(err)
    }
}

// GetAccessToken Generates access token method
//
//  @param baseURL string
//  @param bizName string
//  @return err error
//  @update 2023-08-07 03:23:43
func GetAccessToken() (err error) {
    c, _ := client.NewClient(client.WithTLSConfig(&tls.Config{
        InsecureSkipVerify: true,
    }))
    req := &protocol.Request{}
    req.SetMethod(consts.MethodGet)
    req.SetHeaders(
        map[string]string{
            "X-Real-Ip":            "180.184.86.50",
            "X-Insight-Biz-Name":   volcBizName,
            "X-Insight-Biz-Secret": volcSecret,
        },
    )
    req.SetRequestURI(insightBaseURL + oauthTokenPath)
    rsp := &protocol.Response{}
    if err := c.Do(context.Background(), req, rsp); err != nil {
        logger.Error(err.Error())
    }
    tokenResp := &AccessTokenResp{}
    json.Unmarshal(rsp.BodyBytes(), &tokenResp)
    AccessToken.token = tokenResp.Data.AccessToken
    AccessToken.expireAt = time.Now().Local().Add(time.Duration(tokenResp.Data.Expire) * time.Second)
    return
}

// CBCDecrypter CBC 解密 [copied from utils]
//
//  @param encrypter []byte 待解密的密文
//  @param key []byte 秘钥
//  @param iv []byte
//  @return result []byte
//  @return err error
//  @update 2023-08-14 07:35:00
func CBCDecrypter(encrypter []byte, key []byte, iv []byte) (result []byte, err error) {
    block, err := aes.NewCipher(key)
    if err != nil {
        return result, errors.Wrap(err, "aes decipher err")
    }
    blockMode := cipher.NewCBCDecrypter(block, iv)
    result = make([]byte, len(encrypter))
    blockMode.CryptBlocks(result, encrypter)
    // 去除填充
    result = UnPKCS7Padding(result)
    return result, err
}

// UnPKCS7Padding 去除 PKCS7Padding 填充的数据 [copied from utils]
//
//  @param text []byte 待去除填充数据的原文
//  @return []byte
//  @update 2023-08-14 07:35:33
func UnPKCS7Padding(text []byte) []byte {
    // 取出填充的数据 以此来获得填充数据长度
    unPadding := int(text[len(text)-1])
    return text[:(len(text) - unPadding)]
}

// MustAtoi just return a number, if not valid, return 0
//
//  @param s string
//  @return int
//  @update 2023-08-14 05:30:14
func MustAtoi(s string) (i int64) {
    i, _ = strconv.ParseInt(s, 10, 64)
    return
}

Python

此python脚本只有简单的链接建立,没有重连机制,需要自行加上

import base64
import json
import logging
import time
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
base_url = "insight.volcengineapi.com"

key = b"$HOUTAI_KEY" 
biz_name = "$BIZ_NAME" #biz_name
biz_secret = "$SECRET" 

class Consume_buckets:
    def __init__(self, start_bucket=None, end_bucket=None, start_offset=None):
        self.start_bucket = start_bucket
        self.end_bucket = end_bucket
        self.start_offset = start_offset
        self.pid = 0

    ## 增加获取token的方法
    def get_access_token(self):
        # 获取token
        url = f"http://{base_url}/oauth/access_token"
        payload = {}
        headers = {
            'X-Insight-Biz-Name': biz_name,
            'X-Insight-Biz-Secret': biz_secret
        }
        response = requests.request("GET", url, headers=headers, data=payload)
        re_json = json.loads(response.text)
        print(re_json)
        data = re_json.get('data', None)
        if data != None and data != {}:
            access_token = data.get('access_token', None)
        return access_token

    def consume(self):
        # 获取token
        access_token = self.get_access_token()
        if access_token == None or access_token == "":
            print("access_token 为空")
        # 测试集群的数据
        if self.start_offset == None:
            url = f"https://{base_url}/openapi/item/sse/stream?collector={self.start_bucket}-{self.end_bucket}"
        else:
            url = f"https://{base_url}/openapi/item/sse/stream?collector={self.start_bucket}-{self.end_bucket}&start_offset={self.start_offset}"
        # 新接口
        payload = {}
        header = {  # 新接口header
            "X-Real-Ip": "180.184.86.50",
            "X-Insight-Biz-Name": biz_name,
            "X-Insight-Access-Token": access_token  # 将token用变量代替
        }
        response = requests.request("GET", url, headers=header, data=payload, stream=True)
        print(response)
        for line in response.iter_lines():
            if line.decode():
                s = line.decode()
                index = s.find("data:")
                data = s[index+len("data:"):]
                try:
                    message_json = json.loads(data)
                except Exception as e:
                    with open(f"Exception_data.txt", "a+") as f:
                        f.write(f"{e}----{data}")
                        continue
                try:
                    iv = message_json.get("nonce", None)  # 获取nonce
                except Exception as e:
                    print("---", message_json)
                    continue
                encrypt_item = message_json.get("encrypt_item", "")  # 获取加密后的范围
                if iv == "" or iv == None:
                    logging.warn(f"nonce id null")
                iv = base64.b64decode(iv)
                encrypt_item = base64.b64decode(encrypt_item)
                recv_message = json.loads(self.cbc_decrypter(encrypt_item, key, iv))
                logging.info(f"recv: {recv_message}")
                post_id = recv_message["post_id"]
                status = recv_message["status"]
                if status == 1: #因为计数只统计status为1的,所以这里只写=1的post_id
                    current_time = time.time()
                    current_hour_timestamp = current_time - (current_time % 3600)
                    with open( f"{int(current_hour_timestamp)}.txt","a+") as f:
                        f.write(f"{post_id}\n")

    def cbc_decrypter(self, encrypter, key, iv):
        block = AES.new(key, AES.MODE_CBC, iv)
        result = unpad(block.decrypt(encrypter), AES.block_size)     
        return result

if __name__ == "__main__":
    a = Consume_buckets(0, 10)
    a.consume()