用于将数据上传至火山引擎服务器。数据预同步、历史数据同步、增量天级数据同步、增量实时数据同步等均会涉及到此接口。
每次请求数据量不超过10000条,qps建议不超过100,每秒上传的数据条数不超过50000条(请求qps*每次请求中数据条数)。
若既有增量天级数据,也有增量实时数据,必须先接入增量天级数据,再接入增量实时数据。
若仅有增量实时数据,上传后不可再上传增量天级数据。
数据上传接口的超时时间应尽量大,例如设置为5s。当数据上传接口调用失败的话,应重新上传数据。
增量实时数据上报时,建议聚合一批数据一起上报(比如积攒1000条再上报),减小客户端和服务端频繁交互的压力。
WriteResponse writeData(List<Map<String, Object>> dataList, String topic, Option... opts)
参数 | 类型 | 说明 |
---|---|---|
dataList | List<Map<String, Object>> | 上传的具体数据,不同行业同步字段请按照数据规范填写 |
topic | String | 数据上传时的topic,如用户数据对应“user”,商品数据对应“item”,行为数据对应“behavior” |
opts | Option[] | 请求中可选参数,不同场景需要带上不同opts参数,包括timeout、stage、DataDate、RequestId。其中DataDate只需要在离线数据上传时使用。具体使用方式见用例 |
使用自定义的WriteResponse类作为响应类型,具体参数如下表所示。在获取到WriteResponse类型的返回值后可调用它的getStatus()方法判断此次数据上传是否成功。
参数 | 类型 | 字段含义 | 获取方法 |
---|---|---|---|
Status | int | 状态码 | getStatus |
Errors | DataError | 出错的数据 | getErrors |
import volcengine.byteair.ByteairClient; import volcengine.byteair.ByteairClientBuilder; import volcengine.byteair.protocol.VolcengineByteair; import volcengine.core.BizException; import volcengine.core.NetException; import volcengine.core.Option; import volcengine.core.Region; import volcengine.core.metrics.MetricsCollector; import java.time.LocalDate; import java.util.*; public class Example { public static ByteairClient byteairClient; // 示例省略public static void init() // 某些语法可能在低版本JDK报错,请根据需求替换成等价语法. public static void write() { // 第一条数据 Map<String, Object> item1 = new HashMap<>(); item1.put("id", "item_id1"); item1.put("title", "test_title1"); item1.put("status", 0); item1.put("brand", "volcengine"); item1.put("pub_time", 1583641807); item1.put("current_price", 1.1); // 第二条数据 Map<String, Object> item2 = new HashMap<>(); item2.put("id", "item_id2"); item2.put("title", "test_title2"); item2.put("status", 1); item2.put("brand", "volcengine"); item2.put("pub_time", 1583641503); item2.put("current_price", 2.2); List<Map<String, Object>> datas = new ArrayList<>(); datas.add(item1); datas.add(item2); // topic为枚举值,请参考API文档 String topic = "item"; Option[] opts = new Option[]{ // 预同步("pre_sync"),历史数据同步("history_sync"),增量天级同步("incremental_sync_daily"),增量实时同步("incremental_sync_streaming") Option.withStage("pre_sync"), // 必传,数据产生日期,实际传输时需修改为实际日期.增量实时同步可不传. Option.withDataDate(LocalDate.of(2022, 1, 1)), Option.withRequestId(UUID.randomUUID().toString()), }; VolcengineByteair.WriteResponse writeResponse; try { writeResponse = byteairClient.writeData(datas, topic, opts); } catch (NetException | BizException e) { System.out.printf("[write] occur error, msg:%s\n", e.getMessage()); return; } if (!writeResponse.getStatus().getSuccess()) { System.out.println("[write] failure"); return; } System.out.println("[write] success"); } }
编写Spark main class。调用client,发送数据。下面示例是从HDFS读数据,然后通过foreachPartition将数据发送。
package com.demo; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.ForeachPartitionFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import volcengine.byteair.ByteairClient; import volcengine.byteair.protocol.VolcengineByteair; import volcengine.common.protocol.VolcengineCommon; import volcengine.core.BizException; import volcengine.core.NetException; import volcengine.core.Option; import java.time.LocalDate; import java.util.*; public class Main { public static void main(String[] args) { String input_path = args[0]; // 待发送的HDFS数据源 SparkConf conf = new SparkConf(); SparkSession spark = SparkSession.builder().appName("example_demo").config(conf).getOrCreate(); Dataset<Row> df = spark.read().json(input_path); System.out.println("input data schema:"); df.printSchema(); System.out.println("input data count:" + df.count()); // write df.toJSON().foreachPartition((ForeachPartitionFunction<String>) t -> { Example.init(); ByteairClient byteairClient = Example.byteairClient; List<Map<String, Object>> datas = new ArrayList<>(); JsonParser parser = new JsonParser(); // topic为枚举值,请参考API文档 String topic = "item"; Option[] opts = new Option[]{ // 预同步("pre_sync"),历史数据同步("history_sync"),增量天级同步("incremental_sync_daily"),增量实时同步("incremental_sync_streaming") Option.withStage("incremental_sync_daily"), // 必传,数据产生日期,实际传输时需修改为实际日期.增量实时同步可不传. Option.withDataDate(LocalDate.of(2022, 1, 1)), Option.withRequestId(UUID.randomUUID().toString()), }; while (t.hasNext()) { String rowStr = t.next(); JsonObject rowJson = parser.parse(rowStr).getAsJsonObject(); Map<String, Object> item = new HashMap<>(); item.put("id", rowJson.get("id").getAsString()); item.put("goods_id", rowJson.get("goods_id").getAsString()); item.put("title", rowJson.get("title").getAsString()); item.put("status", rowJson.get("status").getAsLong()); item.put("brand", rowJson.get("brand").getAsString()); item.put("pub_time", rowJson.get("pub_time").getAsLong()); item.put("current_price", rowJson.get("current_price").getAsDouble()); datas.add(item); } // 批量发送 VolcengineByteair.WriteResponse writeResponse; try { writeResponse = byteairClient.writeData(datas, topic, opts); } catch (NetException | BizException e) { System.out.printf("[WriteData] occur error, msg:%s\n", e.getMessage()); return; } if (!writeResponse.getStatus().getSuccess()) { System.out.println("[WriteData] failure," + writeResponse.getStatus().getMessage()); return; } System.out.println("[WriteData] success"); }); } }
将项目编译jar包,下面给出一个maven项目的pom文件参考
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.demo</groupId> <artifactId>spark_demo</artifactId> <version>1.0</version> <properties> <scala.version>2.11</scala.version> <spark.version>2.x</spark.version> <project.build.sourceencoding>UTF-8</project.build.sourceencoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <repositories> <repository> <id>xxx</id> <name>xxx</name> <url>xxx</url> </repository> <repository> <id>github-volcengine-repo</id> <name>The Maven Repository on Github</name> <url>https://volcengine.github.io/volcengine-sdk-java-rec/maven-repo/</url> </repository> </repositories> <dependencies> <dependency> <groupId>com.volcengine</groupId> <artifactId>volcengine-sdk-java-rec</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.15.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.15.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sketch_${scala.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.volcengine</groupId> <artifactId>volcengine-sdk-java-rec</artifactId> <version>1.1.1</version> <scope>compile</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> <exclude>log4j.properties</exclude> </excludes> </filter> </filters> <createDependencyReducedPom>false</createDependencyReducedPom> <artifactSet> <includes> <include>*:*</include> </includes> </artifactSet> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> </plugins> </build> </project>
在环境上测试jar包,参考命令:
#!/bin/bash set -e set -x xx_spark/bin/spark-submit \ --driver-memory 10G \ --executor-memory 8G \ --master yarn \ --deploy-mode client \ --executor-cores 1 \ --queue xxx \ --name spark_api_demo_xxx \ --class com.demo.Main \ output/spark_demo-1.0.jar \ /xxx/test/api_test.data
查看运行结果,看任务是否成功,以及log里面有没有相应的success信息。