提供自定义终端的能力,允许用户根据特定需求,自定义数据输出的方式和格式。
在项目的pom.xml
文件中添加以下依赖:
<dependency> <groupId>com.volcengine</groupId> <artifactId>cdp-openapisdk-java</artifactId> <version>1.23.2.udf</version> </dependency>
自定义输出分为两种类型,一种是分批数据流输出,一种是文件输出,前者会将数据一批一批的写出,后者就是将文件写到hdfs上,然后用户可以直接操作hdfs文件。公共接口如下:
public enum OutputType { BATCH, HDFS_FILE } public interface UDFOutput { public OutputType getOutputType(); /** * 数据开始输出时执行 * * @param options 从前端配置的配置 */ public abstract void init(DataType[] schema, Map<String, String> options) throws Exception; /** * 整个任务结束时执行,可以用户自己定义一些回调操作 * * @param metaData 如果用户需要给输出的数据集添加一些附属信息,后续会追加在此类中 * @param taskStatus 当前stage的执行状态 */ public abstract void finish(DataType[] schema, MetaData metaData, TaskStatus taskStatus, Map<String, String> options) throws Exception; }
public abstract class BatchOutput implements UDFOutput { public OutputType getOutputType() { return OutputType.BATCH; } /** * 每个分区数据开始输出时执行。因为 * * @param options 从前端配置低的配置 */ public abstract void partitionInit(DataType[] schema, Map<String, String> options) throws Exception; /** * 每个batch输出的时候执行 * * @param row row.size <= batch * @return 用户自己定义当前batch实际返回了多少数据 */ public abstract int run(DataType[] schema, Object[][] row) throws Exception; }
任务的执行分为四个阶段:
全局初始化
全局初始化只会执行一次,这里会将数据的schema传递给用户,同时会传递用户自定义的配置。这里可以用来变更下游的表的schema等操作。
分区初始化
不同分区的执行是并行的,每个分区开始时执行。因为全局初始化执行的节点和分区数据执行是在不同的节点(非常重要)。如果用户在全局初始化的时候初始化了一个jdbc链接,在分区任务执行的时候是无法取到的,所以如果使用的是jdbc链接,需要在这初始化连接池。
分区数据传输
每个分区的数据传输会进行多次,每次按batchSize取一批数据,直到数据完全发送完成。用户可以在这里对数据做过滤,转换,输出。并把数据的count返回。
全局结束
所有分区数据传输完成后执行.
文件输出
public abstract class HdfsFileOutput implements UDFOutput { public OutputType getOutputType() { return OutputType.HDFS_FILE; } }
该模式只有两个执行阶段
文件输出的时候只需要关心 finish 方法即可,在这个方法里,可以从MetaData类中获取文件路径,由于一个任务可以输出多个文件,所以得到的是路径列表。文件放在hdfs上,如果用户有需要,可以将文件移动到目标存储。
public class MetaData { public String[] getFilePaths() { return filePaths; } public void setFilePaths(String[] filePaths) { this.filePaths = filePaths; } public String[] filePaths; }
将实现的自定义输出类打包成Jar包(大小需小于500M),并通过自定义输出通道上传。
选择数据资产,并选择step3中创建的自定义通道,输入自定义配置(会传递给用户自定义的jar包),创建任务。
任务执行结束后,系统会将执行情况通过Kafka发送消息。用户可以通过Webhook获取任务执行详情。
{ "id": 30, "name": "添加附带信息", "project_id": 1, "subject_id": 1, "org_id": 1, "type": "oss_storage", // oss_storage 对应产品的云存储,external_storage外置存储,inner_storage内置存储 "version_id": 19404, "status": "SUCCEEDED", // 执行状态,有SUCCEEDE和FAILED,可能会新增 "path": null, "c_date": "2024-05-28 00:00:00", // 业务日期 "create_time": "2024-05-29 11:39:20", "end_time": "2024-05-29 11:40:37", "extra": { "schema_info": [ { "asset_id": null, "asset_name": "基准", "asset_type": "BASEID", "asset_type_name": "基准ID", "col_name": "baseid_1", "col_type": "bigint", "original_type": "bigint", "disabled": false, "dependable": null, "depend_edit_enable": null, "period": null }, { "asset_id": null, "asset_name": "sql导出任务3", "asset_type": "SEGMENT", "asset_type_name": "分群", "col_name": "segment_1000199", "col_type": "string", "original_type": "string", "disabled": false, "dependable": null, "depend_edit_enable": null, "period": null }, { "asset_id": null, "asset_name": "RFM1", "asset_type": "TAG", "asset_type_name": "标签", "col_name": "tag_103", "col_type": "string", "original_type": "string", "disabled": false, "dependable": null, "depend_edit_enable": null, "period": null }, { "asset_id": null, "asset_name": "Finder属性表_ssid", "asset_type": "ENTITY_PROPERTY", "asset_type_name": "主体属性", "col_name": "entity_property_36_ssid", "col_type": "string", "original_type": "string", "disabled": false, "dependable": null, "depend_edit_enable": null, "period": null }, { "asset_id": null, "asset_name": "c_date", "asset_type": "CDATE", "asset_type_name": "业务日期", "col_name": "c_date", "col_type": "date", "original_type": "date", "disabled": false, "dependable": null, "depend_edit_enable": null, "period": null } ] }, "task_version": 2, "platform": "tencentOss", //aliOss,tencentOss,volcEngineOss,externalHive,las,bytehouseCe,innerHive,hdfs,udfOutput,maxCompute,oceanBaseMysql,oceanBaseOracle,oracle "_event_name": "cdp.asset.data.task", "_event_timestamp": 1716954041510, "_event_id": "6404daee-7660-4e5f-b9b3-a4e2b5386900", "_event_source": "vpc-cdp-open-platform-66c8df8f7d-x64vr" }
具体API调研说明参见开放平台-开放能力-OpenAPI文档说明。