You need to enable JavaScript to run this app.
导航
同步接口(write)
最近更新时间:2023.10.19 17:15:44首次发布时间:2022.04.13 17:53:20

用于将数据上传至火山引擎服务器。数据预同步、历史数据同步、增量天级数据同步、增量实时数据同步等均会涉及到此接口。
每次请求数据量不超过10000条qps建议不超过100每秒上传的数据条数不超过50000条(请求qps*每次请求中数据条数)。
若既有增量天级数据,也有增量实时数据,必须先接入增量天级数据,再接入增量实时数据。
若仅有增量实时数据,上传后不可再上传增量天级数据。
数据上传接口的超时时间应尽量大,例如设置为5s。当数据上传接口调用失败的话,应重新上传数据。
增量实时数据上报时,建议聚合一批数据一起上报(比如积攒1000条再上报),减小客户端和服务端频繁交互的压力。

调用方法

WriteResponse writeData(List<Map<String, Object>> dataList, String topic, Option... opts)

方法参数

参数

类型

说明

dataList

List<Map<String, Object>>

上传的具体数据,不同行业同步字段请按照数据规范填写

topic

String

数据上传时的topic,如用户数据对应“user”,商品数据对应“item”,行为数据对应“behavior”

opts

Option[]

请求中可选参数,不同场景需要带上不同opts参数,包括timeout、stage、DataDate、RequestId。其中DataDate只需要在离线数据上传时使用。具体使用方式见用例

方法返回

使用自定义的WriteResponse类作为响应类型,具体参数如下表所示。在获取到WriteResponse类型的返回值后可调用它的getStatus()方法判断此次数据上传是否成功。

参数

类型

字段含义

获取方法

Status

int

状态码

getStatus

Errors

DataError

出错的数据

getErrors

示例

import volcengine.byteair.ByteairClient;
import volcengine.byteair.ByteairClientBuilder;
import volcengine.byteair.protocol.VolcengineByteair;
import volcengine.core.BizException;
import volcengine.core.NetException;
import volcengine.core.Option;
import volcengine.core.Region;
import volcengine.core.metrics.MetricsCollector;

import java.time.LocalDate;
import java.util.*;


public class Example {


    public static ByteairClient byteairClient;
    
    // 示例省略public static void init()
    // 某些语法可能在低版本JDK报错,请根据需求替换成等价语法.

    
    public static void write() {
        // 第一条数据
        Map<String, Object> item1 = new HashMap<>();
        item1.put("id", "item_id1");
        item1.put("title", "test_title1");
        item1.put("status", 0);
        item1.put("brand", "volcengine");
        item1.put("pub_time", 1583641807);
        item1.put("current_price", 1.1);
        // 第二条数据
        Map<String, Object> item2 = new HashMap<>();
        item2.put("id", "item_id2");
        item2.put("title", "test_title2");
        item2.put("status", 1);
        item2.put("brand", "volcengine");
        item2.put("pub_time", 1583641503);
        item2.put("current_price", 2.2);


        List<Map<String, Object>> datas = new ArrayList<>();
        datas.add(item1);
        datas.add(item2);


        // topic为枚举值,请参考API文档
        String topic = "item";
        Option[] opts = new Option[]{
                // 预同步("pre_sync"),历史数据同步("history_sync"),增量天级同步("incremental_sync_daily"),增量实时同步("incremental_sync_streaming")
                Option.withStage("pre_sync"),
                // 必传,数据产生日期,实际传输时需修改为实际日期.增量实时同步可不传.
                Option.withDataDate(LocalDate.of(2022, 1, 1)),
                Option.withRequestId(UUID.randomUUID().toString()),
        };


        VolcengineByteair.WriteResponse writeResponse;
        try {
            writeResponse = byteairClient.writeData(datas, topic, opts);
        } catch (NetException | BizException e) {
            System.out.printf("[write] occur error, msg:%s\n", e.getMessage());
            return;
        }
        if (!writeResponse.getStatus().getSuccess()) {
            System.out.println("[write] failure");
            return;
        }
        System.out.println("[write] success");
    }
}

Spark on Java

编写Spark main class。调用client,发送数据。下面示例是从HDFS读数据,然后通过foreachPartition将数据发送。

package com.demo;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import volcengine.byteair.ByteairClient;
import volcengine.byteair.protocol.VolcengineByteair;
import volcengine.common.protocol.VolcengineCommon;
import volcengine.core.BizException;
import volcengine.core.NetException;
import volcengine.core.Option;

import java.time.LocalDate;
import java.util.*;

public class Main {
    
    public static void main(String[] args) {
        String input_path = args[0]; // 待发送的HDFS数据源
        SparkConf conf = new SparkConf();
        SparkSession spark = SparkSession.builder().appName("example_demo").config(conf).getOrCreate();
        Dataset<Row> df = spark.read().json(input_path);

        System.out.println("input data schema:");
        df.printSchema();
        System.out.println("input data count:" + df.count());

        // write
        df.toJSON().foreachPartition((ForeachPartitionFunction<String>) t -> {
            Example.init();
            ByteairClient byteairClient = Example.byteairClient;
            List<Map<String, Object>> datas = new ArrayList<>();
            JsonParser parser = new JsonParser();

            // topic为枚举值,请参考API文档
            String topic = "item";
            Option[] opts = new Option[]{
                    // 预同步("pre_sync"),历史数据同步("history_sync"),增量天级同步("incremental_sync_daily"),增量实时同步("incremental_sync_streaming")
                    Option.withStage("incremental_sync_daily"),
                    // 必传,数据产生日期,实际传输时需修改为实际日期.增量实时同步可不传.
                    Option.withDataDate(LocalDate.of(2022, 1, 1)),
                    Option.withRequestId(UUID.randomUUID().toString()),
            };

            while (t.hasNext()) {
                String rowStr = t.next();
                JsonObject rowJson = parser.parse(rowStr).getAsJsonObject();
                Map<String, Object> item = new HashMap<>();
                item.put("id", rowJson.get("id").getAsString());
                item.put("goods_id", rowJson.get("goods_id").getAsString());
                item.put("title", rowJson.get("title").getAsString());
                item.put("status", rowJson.get("status").getAsLong());
                item.put("brand", rowJson.get("brand").getAsString());
                item.put("pub_time", rowJson.get("pub_time").getAsLong());
                item.put("current_price", rowJson.get("current_price").getAsDouble());
                datas.add(item);
            }

            // 批量发送
            VolcengineByteair.WriteResponse writeResponse;
            try {
                writeResponse = byteairClient.writeData(datas, topic, opts);
            } catch (NetException | BizException e) {
                System.out.printf("[WriteData] occur error, msg:%s\n", e.getMessage());
                return;
            }
            if (!writeResponse.getStatus().getSuccess()) {
                System.out.println("[WriteData] failure," + writeResponse.getStatus().getMessage());
                return;
            }
            System.out.println("[WriteData] success");
        });

    }
}

将项目编译jar包,下面给出一个maven项目的pom文件参考

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demo</groupId>
    <artifactId>spark_demo</artifactId>
    <version>1.0</version>
    <properties>
        <scala.version>2.11</scala.version>
        <spark.version>2.x</spark.version>
        <project.build.sourceencoding>UTF-8</project.build.sourceencoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    
    <repositories>
        <repository>
            <id>xxx</id>
            <name>xxx</name>
            <url>xxx</url>
        </repository>
        <repository>
            <id>github-volcengine-repo</id>
            <name>The Maven Repository on Github</name>
            <url>https://volcengine.github.io/volcengine-sdk-java-rec/maven-repo/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>com.volcengine</groupId>
            <artifactId>volcengine-sdk-java-rec</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.15.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.15.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sketch_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.volcengine</groupId>
            <artifactId>volcengine-sdk-java-rec</artifactId>
            <version>1.1.1</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>log4j.properties</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <includes>
                                    <include>*:*</include>
                                </includes>
                            </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
            </plugin>
        </plugins>
    </build>
</project>

在环境上测试jar包,参考命令:

#!/bin/bash
set -e
set -x
xx_spark/bin/spark-submit \
--driver-memory 10G \
--executor-memory 8G \
--master yarn \
--deploy-mode client \
--executor-cores 1 \
--queue xxx \
--name spark_api_demo_xxx \
--class com.demo.Main \
output/spark_demo-1.0.jar \
/xxx/test/api_test.data

查看运行结果,看任务是否成功,以及log里面有没有相应的success信息。