You need to enable JavaScript to run this app.
导航
自定义终端输出
最近更新时间:2024.07.11 17:32:56首次发布时间:2024.07.08 19:27:27

1.概述

提供自定义终端的能力,允许用户根据特定需求,自定义数据输出的方式和格式。

2.使用前提
  • 产品版本:必须使用V1.23版本。
  • 语言要求:支持Java或Scala语言。
  • 部署方式:需要进行私有化部署。
  • 注意事项:为了启用自定义终端功能,用户必须联系客户经理在部署时开启相应的功能开关。

3.产品使用流程

step1: maven配置依赖

在项目的pom.xml文件中添加以下依赖:

<dependency>
    <groupId>com.volcengine</groupId>
    <artifactId>cdp-openapisdk-java</artifactId>
    <version>1.23.2.udf</version>
</dependency>

step2:实现接口并打包

自定义输出分为两种类型,一种是分批数据流输出,一种是文件输出,前者会将数据一批一批的写出,后者就是将文件写到hdfs上,然后用户可以直接操作hdfs文件。公共接口如下:

public enum OutputType {
    BATCH, HDFS_FILE 
}

public interface UDFOutput {

    public OutputType getOutputType();

    /**
     * 数据开始输出时执行
     *
     * @param options 从前端配置的配置
     */
    public abstract void init(DataType[] schema, Map<String, String> options) throws Exception;


    /**
     * 整个任务结束时执行,可以用户自己定义一些回调操作
     *
     * @param metaData   如果用户需要给输出的数据集添加一些附属信息,后续会追加在此类中
     * @param taskStatus 当前stage的执行状态
     */
    public abstract void finish(DataType[] schema, MetaData metaData, TaskStatus taskStatus, Map<String, String> options) throws Exception;
}

批次输出

批次输出接口实现

public abstract class BatchOutput implements UDFOutput {

    public OutputType getOutputType() {
        return OutputType.BATCH;
    }

    /**
     * 每个分区数据开始输出时执行。因为
     *
     * @param options 从前端配置低的配置
     */
    public abstract void partitionInit(DataType[] schema, Map<String, String> options) throws Exception;

    /**
     * 每个batch输出的时候执行
     *
     * @param row row.size <= batch
     * @return 用户自己定义当前batch实际返回了多少数据
     */
    public abstract int run(DataType[] schema, Object[][] row) throws Exception;

}

接口说明

图片
任务的执行分为四个阶段:

  1. 全局初始化

    全局初始化只会执行一次,这里会将数据的schema传递给用户,同时会传递用户自定义的配置。这里可以用来变更下游的表的schema等操作。

  2. 分区初始化

    不同分区的执行是并行的,每个分区开始时执行。因为全局初始化执行的节点和分区数据执行是在不同的节点(非常重要)。如果用户在全局初始化的时候初始化了一个jdbc链接,在分区任务执行的时候是无法取到的,所以如果使用的是jdbc链接,需要在这初始化连接池。

  3. 分区数据传输

    每个分区的数据传输会进行多次,每次按batchSize取一批数据,直到数据完全发送完成。用户可以在这里对数据做过滤,转换,输出。并把数据的count返回。

  4. 全局结束

    所有分区数据传输完成后执行.

  5. 文件输出

文件输出

接口实现

public abstract class HdfsFileOutput implements UDFOutput {
    public OutputType getOutputType() {
        return OutputType.HDFS_FILE;
    }
}

接口说明

该模式只有两个执行阶段

文件输出的时候只需要关心 finish 方法即可,在这个方法里,可以从MetaData类中获取文件路径,由于一个任务可以输出多个文件,所以得到的是路径列表。文件放在hdfs上,如果用户有需要,可以将文件移动到目标存储。

public class MetaData {
    public String[] getFilePaths() {
        return filePaths;
    }

    public void setFilePaths(String[] filePaths) {
        this.filePaths = filePaths;
    }

    public String[] filePaths;
}

step3:上传Jar包

将实现的自定义输出类打包成Jar包(大小需小于500M),并通过自定义输出通道上传。

step4:配置任务

选择数据资产,并选择step3中创建的自定义通道,输入自定义配置(会传递给用户自定义的jar包),创建任务。

4.任务回调

任务执行结束后,系统会将执行情况通过Kafka发送消息。用户可以通过Webhook获取任务执行详情。

{
  "id": 30,
  "name": "添加附带信息",
  "project_id": 1,
  "subject_id": 1,
  "org_id": 1,
  "type": "oss_storage", // oss_storage 对应产品的云存储,external_storage外置存储,inner_storage内置存储
  "version_id": 19404,
  "status": "SUCCEEDED",  // 执行状态,有SUCCEEDE和FAILED,可能会新增
  "path": null,
  "c_date": "2024-05-28 00:00:00", // 业务日期
  "create_time": "2024-05-29 11:39:20",
  "end_time": "2024-05-29 11:40:37",
  "extra": {
    "schema_info": [
      {
        "asset_id": null,
        "asset_name": "基准",
        "asset_type": "BASEID",
        "asset_type_name": "基准ID",
        "col_name": "baseid_1",
        "col_type": "bigint",
        "original_type": "bigint",
        "disabled": false,
        "dependable": null,
        "depend_edit_enable": null,
        "period": null
      },
      {
        "asset_id": null,
        "asset_name": "sql导出任务3",
        "asset_type": "SEGMENT",
        "asset_type_name": "分群",
        "col_name": "segment_1000199",
        "col_type": "string",
        "original_type": "string",
        "disabled": false,
        "dependable": null,
        "depend_edit_enable": null,
        "period": null
      },
      {
        "asset_id": null,
        "asset_name": "RFM1",
        "asset_type": "TAG",
        "asset_type_name": "标签",
        "col_name": "tag_103",
        "col_type": "string",
        "original_type": "string",
        "disabled": false,
        "dependable": null,
        "depend_edit_enable": null,
        "period": null
      },
      {
        "asset_id": null,
        "asset_name": "Finder属性表_ssid",
        "asset_type": "ENTITY_PROPERTY",
        "asset_type_name": "主体属性",
        "col_name": "entity_property_36_ssid",
        "col_type": "string",
        "original_type": "string",
        "disabled": false,
        "dependable": null,
        "depend_edit_enable": null,
        "period": null
      },
      {
        "asset_id": null,
        "asset_name": "c_date",
        "asset_type": "CDATE",
        "asset_type_name": "业务日期",
        "col_name": "c_date",
        "col_type": "date",
        "original_type": "date",
        "disabled": false,
        "dependable": null,
        "depend_edit_enable": null,
        "period": null
      }
    ]
  },
  "task_version": 2,
  "platform": "tencentOss", //aliOss,tencentOss,volcEngineOss,externalHive,las,bytehouseCe,innerHive,hdfs,udfOutput,maxCompute,oceanBaseMysql,oceanBaseOracle,oracle         
  "_event_name": "cdp.asset.data.task",
  "_event_timestamp": 1716954041510,
  "_event_id": "6404daee-7660-4e5f-b9b3-a4e2b5386900",
  "_event_source": "vpc-cdp-open-platform-66c8df8f7d-x64vr"
}

5.OpenApi文档

具体API调研说明参见开放平台-开放能力-OpenAPI文档说明。