用于将数据上传至火山引擎服务器。数据预同步、历史数据同步、增量天级数据同步、增量实时数据同步等均会涉及到此接口。
每次请求数据量不超过10000条,qps建议不超过100,每秒上传的数据条数不超过50000条(请求qps*每次请求中数据条数)。
若既有增量天级数据,也有增量实时数据,必须先接入增量天级数据,再接入增量实时数据。
若仅有增量实时数据,上传后不可再上传增量天级数据。
数据上传接口的超时时间应尽量大,例如设置为5s。当数据上传接口调用失败的话,应重新上传数据。
增量实时数据上报时,建议聚合一批数据一起上报(比如积攒1000条再上报),减小客户端和服务端频繁交互的压力。
write_data(self, data_list: list, topic: str, *opts: Option) -> WriteResponse
参数 | 类型 | 说明 |
---|---|---|
data_list | list[dict[str,Any]] | 上传的具体数据,不同行业同步字段请按照数据规范填写 |
topic | str | 数据上传时的topic,如用户数据对应“user”,商品数据对应“item”,行为数据对应“behavior” |
opts | Option[] | 请求中可选参数,不同场景需要带上不同opts参数,包括timeout、stage、data_date、request_id。其中data_date只需要在离线数据上传时使用。具体使用方式见用例 |
参数 | 类型 | 说明 | 获取方法 |
---|---|---|---|
code | int | 状态码 | response.status.code |
message | string | 请求结果信息 | response.status.message |
# 数据上传example import uuid from datetime import datetime from byteair import ClientBuilder, Client from byteair.protocol.volcengine_byteair_pb2 import * from core import Region, Option, NetException, BizException, metrics # 示例省略client初始化过程 def write(): # 此处为测试数据,实际调用时需注意字段类型和格式 data_list = [ { "id": "item_id1", "title": "test_title1", "status": 0, "brand": "volcengine", "pub_time": 1583641807, "current_price": 1.1, }, { "id": "item_id2", "title": "test_title2", "status": 1, "brand": "volcengine", "pub_time": 1583641503, "current_price": 2.2, } ] # topic为枚举值,请参考API文档 topic = "item" # 传输天级数据 opts = ( # 预同步("pre_sync"),历史数据同步("history_sync"),增量天级同步("incremental_sync_daily"), # 增量实时同步("incremental_sync_streaming") Option.with_stage("pre_sync"), # 必传,要求每次请求的Request-Id不重复,若未传,sdk会默认为每个请求添加 Option.with_request_id(str(uuid.uuid1())), # 必传,数据产生日期,实际传输时需修改为实际日期 Option.with_data_date(datetime(year=2022, month=1, day=1)), ) try: response = client.write_data(data_list, topic, *opts) except BizException as e: print("[write] occur err, msg: %s" % e) return if not response.status.success: print("[write] failure") return print("[write] success") return
代码示例
import uuid from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession from datetime import datetime from byteair import ClientBuilder, Client from byteair.protocol.volcengine_byteair_pb2 import * from core import Region, Option, NetException, BizException, metrics def get_client(): # 必传,租户id. TENANT_ID = "xxx" # 必传,项目id. PROJECT_ID = "xxx" # 必传,密钥AK,获取方式:【火山引擎控制台】->【个人信息】->【密钥管理】中获取. AK = "xxx" # 必传,密钥SK,获取方式:【火山引擎控制台】->【个人信息】->【密钥管理】中获取. SK = "xxx" client: Client = ClientBuilder() \ .tenant_id(TENANT_ID) \ .project_id(PROJECT_ID) \ .ak(AK) \ .sk(SK) \ .region(Region.AIR_CN) \ .build() # metrics上报初始化.建议开启,方便火山侧排查问题. metrics.init(()) return client input_path = 'hdfs://xxx' if __name__ == '__main__': conf = SparkConf().setMaster('local').setAppName('spark_example_python') spark = SparkSession.builder.config(conf=conf).getOrCreate() spark.sparkContext.setLogLevel("INFO") df = spark.read.json(input_path) df.show() def batch_write_data(items): client = get_client() data_list = [] for item in items: import json item_json = json.loads(item) data_list.append(item_json) print("data_list:", data_list) # topic为枚举值,请参考API文档 topic = "item" # 传输天级数据 opts = ( # 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"), # 增量实时同步("incremental_sync_streaming") Option.with_stage("incremental_sync_daily"), # 必传,数据产生日期,实际传输时需修改为实际日期 Option.with_data_date(datetime(year=2022, month=10, day=7)), Option.with_timeout(timedelta(milliseconds=3000)), Option.with_request_id(str(uuid.uuid1())), ) try: write_response = client.write_data(data_list, topic, *opts) except BizException as e: print("[write] occur err, msg: %s" % e) return if not write_response.status.success: print("[write] failure:", write_response.status.message) return print("[write] success") return df.toJSON().foreachPartition(batch_write_data)
python调用执行上面的任务,查看输出。