本文档介绍数据实时消费接口调用的最佳实践。
客户侧在调用内容洞察平台的接口时,会存在一定的调用门槛,因此,特提供常规技术栈的数据实时消费接口调用的最佳实践。
备注:
发文实时流式接口文档:数据消费接口
说明
本文档仅适用于对接 【数据消费接口】 的历史客户使用,后续会下线,新客户使用BMQ,请参考:最佳实践:BMQ消息队列消费内容数据
核心:利用Golang轻量级协程的异步属性,扩大请求的并发量来提高单机的整体消费QPS。
在测试条件下,平均单接口的QPS可达40+
示例代码中包含一些额外的状态管理,以满足长时间的持续消费:
access_token
的过期刷新机制
连接错误后的自动重试
使用须知:
该Client正常情况下不会退出,如果不需要持续不断地消费,需要自行控制停止请求的时机(可以通过os.Exit(0)
或log.Fatal()
来退出进程)
Go版本>=1.18可用
// package main // filepath: client.go // @update 2023-08-21 03:49:49 package main import ( "context" "regexp" "strconv" "time" ) func main() { for colStart < colEnd { end := colStart + colStep - 1 collectors := strconv.FormatInt(colStart, 10) + "-" + strconv.FormatInt(colEnd, 10) subCtx := &subMsgContext{ Context: context.Background(), collectors: collectors, startOffset: startOffset, } wg.Add(1) go subCtx.subMsg() colStart = end + 1 } time.Sleep(time.Second * 1) wg.Wait() } // Split a line into fields separated by the ':' character func splitFields(line string) []string { return regexp.MustCompile(`data:\s*`).Split(line, -1) }
// package main // filepath: client_var.go // @update 2023-08-21 03:50:40 package main import ( "context" "os" "sync" "time" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) // AccessTokenResp Storing Access Token Response // // @update 2023-08-07 02:02:41 type AccessTokenResp struct { Data struct { AccessToken string `json:"access_token"` Expire int `json:"expire"` } `json:"data"` Message string `json:"message"` Status int `json:"status"` } // StreamMessage Storing Stream Message Data // // @update 2023-08-14 05:50:34 type StreamMessage struct { MsgID string `json:"msg_id"` Nonce []byte `json:"nonce,omitempty"` EncryptItem []byte `json:"encrypt_item,omitempty"` } type subMsgContext struct { retryCnt int collectors string startOffset int64 token string context.Context } // AccessTokenStruct represents the access token and with mutex lock // // @update 2023-08-07 02:20:05 type AccessTokenStruct struct { token string expireAt time.Time *sync.RWMutex } func init() { config := zap.NewProductionEncoderConfig() consoleWriter := zapcore.AddSync(os.Stdout) core := zapcore.NewCore(zapcore.NewConsoleEncoder(config), consoleWriter, zapcore.DebugLevel) zapLogger := zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel)) logger = zapLogger.Sugar() } // AccessToken represents the access token // // @update 2023-08-07 02:20:03 var ( AccessToken = &AccessTokenStruct{RWMutex: &sync.RWMutex{}} logger *zap.SugaredLogger ) var wg = &sync.WaitGroup{} // Url path var ( insightBaseURL = "https://insight.volcengineapi.com" oauthTokenPath = "/oauth/access_token" streamReqPath = "/openapi/item/sse/stream" ) // User configuration var ( volcBizName = "[YOUR_ACCOUNT_ID_HERE]" // !火山账号ID volcSecret = "[YOUR_SECRET_HERE]" // !从火山控制台-内容洞察概览页获取 volcAesKey = "[YOUR_AES_KEY_HERE]" // !AES Key从火山控制台-内容洞察概览页获取 ) // User Env var ( colStart = MustAtoi(os.Getenv("COLLECTOR_START")) colStep = MustAtoi(os.Getenv("COLLECTOR_STEP")) colEnd = MustAtoi(os.Getenv("COLLECTOR_END")) startOffset = MustAtoi(os.Getenv("START_OFFSET")) )
// package main // filepath: client_ctx.go // @update 2023-08-21 03:50:55 package main import ( "bufio" "context" "crypto/aes" "crypto/cipher" "strconv" "time" "github.com/cloudwego/hertz/pkg/app/client" json "github.com/bytedance/sonic" "github.com/cloudwego/hertz/pkg/protocol" "github.com/cloudwego/hertz/pkg/protocol/consts" "github.com/pkg/errors" ) // RefreshToken RefreshToken if it's expired, then get a new token, will panic if error is returned // // @receiver ctx *subMsgContext // @param baseURL string // @update 2023-08-07 02:02:38 func (ctx *subMsgContext) RefreshToken(force bool) { if force || AccessToken.expireAt.Local().Before(time.Now().Local()) { if AccessToken.TryLock() { defer AccessToken.Unlock() err := GetAccessToken() ctx.token = AccessToken.token if err != nil { logger.Panicf("GetAccessToken error: %v", err) } } else { for !AccessToken.TryLock() { } AccessToken.Unlock() time.Sleep(time.Second * 3) // waiting for refreshing token ctx.token = AccessToken.token } } } // recoverSubMsg Recover from subMsg Goroutines, with retry logic // // @receiver ctx *subMsgContext // @param baseURL string // @update 2023-08-07 02:02:36 func (ctx *subMsgContext) recoverSubMsg() { // 无论如何都要重试,即使服务端已经发送完了数据 ctx.startOffset = time.Now().Unix() - 180 if err := recover(); err != nil { logger.Errorf("SubMsg Error with collector: %s, with error: %+v, will retry in 1s...", ctx.collectors, err) time.Sleep(time.Second * 1) go ctx.subMsg() } else { //! 当前的数据已经发完,等待更长的时间获取新的数据 logger.Errorf("SubMsg Done with collector: %+v, will retry in 10s...", ctx.collectors) time.Sleep(time.Second * 10) go ctx.subMsg() } } // subMsg receives subscription message from server send event // // @receiver ctx *subMsgContext // @param baseURL string // @update 2023-08-07 02:02:33 func (ctx *subMsgContext) subMsg() { defer ctx.recoverSubMsg() ctx.RefreshToken(false) var ( req = &protocol.Request{} rsp = &protocol.Response{} err error ) // building http request req.SetMethod(consts.MethodGet) req.SetRequestURI(insightBaseURL + streamReqPath) queryStr := "collector=" + ctx.collectors if ctx.startOffset != 0 { queryStr += "&start_offset=" + strconv.FormatInt(ctx.startOffset, 10) } req.SetQueryString(queryStr) req.SetHeaders( map[string]string{ "X-Real-Ip": "180.184.86.50", "X-Insight-Biz-Name": volcBizName, "X-Insight-Access-Token": ctx.token, }, ) c, err := client.NewClient( client.WithResponseBodyStream(true), client.WithTLSConfig(&tls.Config{ InsecureSkipVerify: true, }), ) if err != nil { logger.Panic(err) } err = c.Do(ctx, req, rsp) if err != nil { logger.Panic(err) } // Create a new scanner to read the SSE data as a stream const maxTokenSize = 1024 * 1024 // 1 MB scanner := bufio.NewScanner(rsp.BodyStream()) scanner.Buffer(make([]byte, maxTokenSize), maxTokenSize) // Set the scanner to split on newlines scanner.Split(bufio.ScanLines) key := []byte(volcAesKey) // Loop to read SSE events from the scanner for scanner.Scan() { line := scanner.Text() // Check if the line is a comment or a data line if len(line) == 0 || line[0] == ':' { continue } // Split the line into event and data fields for _, field := range splitFields(line) { if field != "" { var bizOutItem *StreamMessage if field == "unauthorized" { logger.Panic(field) } err = json.Unmarshal([]byte(field), &bizOutItem) if err != nil { logger.Warn("json unmarshal %+v", err) } iv := bizOutItem.Nonce decryptMsg, err := CBCDecrypter(bizOutItem.EncryptItem, key, iv) if err != nil { logger.Warn("decrypt message err %+v, msg %v", err, decryptMsg) } // logger.Infof("recv: %s %s", ctx.collectors, decryptMsg[:10]) // TODO: Here's where the post decrypted, capture the post here. } } } // Check if there was an error reading from the scanner if err := scanner.Err(); err != nil { logger.Panic(err) } } // GetAccessToken Generates access token method // // @param baseURL string // @param bizName string // @return err error // @update 2023-08-07 03:23:43 func GetAccessToken() (err error) { c, _ := client.NewClient(client.WithTLSConfig(&tls.Config{ InsecureSkipVerify: true, })) req := &protocol.Request{} req.SetMethod(consts.MethodGet) req.SetHeaders( map[string]string{ "X-Real-Ip": "180.184.86.50", "X-Insight-Biz-Name": volcBizName, "X-Insight-Biz-Secret": volcSecret, }, ) req.SetRequestURI(insightBaseURL + oauthTokenPath) rsp := &protocol.Response{} if err := c.Do(context.Background(), req, rsp); err != nil { logger.Error(err.Error()) } tokenResp := &AccessTokenResp{} json.Unmarshal(rsp.BodyBytes(), &tokenResp) AccessToken.token = tokenResp.Data.AccessToken AccessToken.expireAt = time.Now().Local().Add(time.Duration(tokenResp.Data.Expire) * time.Second) return } // CBCDecrypter CBC 解密 [copied from utils] // // @param encrypter []byte 待解密的密文 // @param key []byte 秘钥 // @param iv []byte // @return result []byte // @return err error // @update 2023-08-14 07:35:00 func CBCDecrypter(encrypter []byte, key []byte, iv []byte) (result []byte, err error) { block, err := aes.NewCipher(key) if err != nil { return result, errors.Wrap(err, "aes decipher err") } blockMode := cipher.NewCBCDecrypter(block, iv) result = make([]byte, len(encrypter)) blockMode.CryptBlocks(result, encrypter) // 去除填充 result = UnPKCS7Padding(result) return result, err } // UnPKCS7Padding 去除 PKCS7Padding 填充的数据 [copied from utils] // // @param text []byte 待去除填充数据的原文 // @return []byte // @update 2023-08-14 07:35:33 func UnPKCS7Padding(text []byte) []byte { // 取出填充的数据 以此来获得填充数据长度 unPadding := int(text[len(text)-1]) return text[:(len(text) - unPadding)] } // MustAtoi just return a number, if not valid, return 0 // // @param s string // @return int // @update 2023-08-14 05:30:14 func MustAtoi(s string) (i int64) { i, _ = strconv.ParseInt(s, 10, 64) return }
此python脚本只有简单的链接建立,没有重连机制,需要自行加上
import base64 import json import logging import time import requests from Crypto.Cipher import AES from Crypto.Util.Padding import unpad logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) base_url = "insight.volcengineapi.com" key = b"$HOUTAI_KEY" biz_name = "$BIZ_NAME" #biz_name biz_secret = "$SECRET" class Consume_buckets: def __init__(self, start_bucket=None, end_bucket=None, start_offset=None): self.start_bucket = start_bucket self.end_bucket = end_bucket self.start_offset = start_offset self.pid = 0 ## 增加获取token的方法 def get_access_token(self): # 获取token url = f"http://{base_url}/oauth/access_token" payload = {} headers = { 'X-Insight-Biz-Name': biz_name, 'X-Insight-Biz-Secret': biz_secret } response = requests.request("GET", url, headers=headers, data=payload) re_json = json.loads(response.text) print(re_json) data = re_json.get('data', None) if data != None and data != {}: access_token = data.get('access_token', None) return access_token def consume(self): # 获取token access_token = self.get_access_token() if access_token == None or access_token == "": print("access_token 为空") # 测试集群的数据 if self.start_offset == None: url = f"https://{base_url}/openapi/item/sse/stream?collector={self.start_bucket}-{self.end_bucket}" else: url = f"https://{base_url}/openapi/item/sse/stream?collector={self.start_bucket}-{self.end_bucket}&start_offset={self.start_offset}" # 新接口 payload = {} header = { # 新接口header "X-Real-Ip": "180.184.86.50", "X-Insight-Biz-Name": biz_name, "X-Insight-Access-Token": access_token # 将token用变量代替 } response = requests.request("GET", url, headers=header, data=payload, stream=True) print(response) for line in response.iter_lines(): if line.decode(): s = line.decode() index = s.find("data:") data = s[index+len("data:"):] try: message_json = json.loads(data) except Exception as e: with open(f"Exception_data.txt", "a+") as f: f.write(f"{e}----{data}") continue try: iv = message_json.get("nonce", None) # 获取nonce except Exception as e: print("---", message_json) continue encrypt_item = message_json.get("encrypt_item", "") # 获取加密后的范围 if iv == "" or iv == None: logging.warn(f"nonce id null") iv = base64.b64decode(iv) encrypt_item = base64.b64decode(encrypt_item) recv_message = json.loads(self.cbc_decrypter(encrypt_item, key, iv)) logging.info(f"recv: {recv_message}") post_id = recv_message["post_id"] status = recv_message["status"] if status == 1: #因为计数只统计status为1的,所以这里只写=1的post_id current_time = time.time() current_hour_timestamp = current_time - (current_time % 3600) with open( f"{int(current_hour_timestamp)}.txt","a+") as f: f.write(f"{post_id}\n") def cbc_decrypter(self, encrypter, key, iv): block = AES.new(key, AES.MODE_CBC, iv) result = unpad(block.decrypt(encrypter), AES.block_size) return result if __name__ == "__main__": a = Consume_buckets(0, 10) a.consume()