You need to enable JavaScript to run this app.
DataLeap私有化V2.1.2

DataLeap私有化V2.1.2

复制全文
3.2 流开发
3.2.1 SQL任务
复制全文
3.2.1 SQL任务

通过标准化的 flink sql 语言,在线开发、测试、运维流式任务,不依赖jar包,方便更新和维护。

适用场景

  1. 对流式数据通过sql的方式做聚合加工,产出实时指标。
  2. 对结构比较清晰的数据做清洗过滤的流式ETL需求。
  3. 构建流式数据仓库。

数据源的使用

通过DDL语句实现对数据源的使用,目前提供以下几类语法:

  1. create table:用于定义表结构,包含有定义源表(source)、结果表(sink)
  2. create view:用于定义中间结果视图
  3. create function:用于自定义udf的声明
  4. insert into XXX select:用于表示真正需要执行的sql语句,并将执行结果insert到对应的结果表中

新建任务

功能入口:

  1. 在任务开发首页,点击“新建节点”;
  2. 在项目下,左侧目录结构中,右键新建任务;
  3. 任务编辑页面,点击“新建”。
    新建任务类型选择“流式sql任务“
    alt

SQL 编辑

通过DDL语法声明使用数据源,Flink SQL 加工数据,DataLeap 版本的DDL语法与开源版有少量不同,具体看下方的说明与DDL参数表格。
一个完整的任务示例:

1.	-- 1.创建 kafka source table
2.	create table click_data(
3.	    user_id bigint,
4.	    article_id bigint,
5.	    click_time timestamp,
6.	    watermark for click_time as withoffset(click_time, 3000)
7.	) with (
8.	    'connector.type' = 'kafka',
9.	    'connector.version' = '0.10',
10.	    'connector.topic' = 'test_topic',
11.	    'connector.cluster' = 'test_cluster',
12.	    'connector.group.id' = 'flink_1.9_sql_test',
13.	    'update-mode' = 'append',
14.	    'format.type' = 'json',
15.	);
16.	
17.	-- 2.创建 Mysql sink table
18.	create table article_pv(
19.	    article_id bigint,
20.	    PV bigint,
21.	    s_time timestamp
22.	) with (
23.	    'connector.type' = 'jdbc',
24.	    'connector.table' = 'test_table',
25.	    'connector.dbname' = 'testdb',
26.	    'connector.url' = 'mysql.testdb_write',
27.	    'connector.username' = 'sink.test',
28.	    'connector.password' = 'abcabc',
29.	    'connector.write.flush.max-rows' = '5'
30.	 );
31.	-- 3. 计算逻辑
32.	insert into article_pv 
33.	select article_id, count(*) as PV, TUMBLE_START(click_time, INTERVAL '1' HOUR) as s_time 
34.	from click_data 
35.	group by article_id, TUMBLE(click_time, INTERVAL '1' HOUR)

PB 格式定义
使用PB格式的数据源,需要在当前任务上传PB类的定义文件,或手工输入PB类,json格式无需设置。
一次只支持一个PB类的定义,例:

1.	syntax = "proto2";
2.	package abase_test;
3.	message AbaseTest {
4.	required int64 first_id = 1;
5.	required int64 latest_id = 2;
6.	}

说明

  1. 如果kafka topic是binlog数据,在flink sql source中指定下'format.type' = 'pb_binlog'即可,数据类型选择'json'方式
  2. 如果Kafka topic是自定义pb数据
  3. 在flink sql source中指定下'format.pb-class' = '{package的路径}.{java_outer_classname的类名}${入口message名称}'.
  4. 譬如,
    a)   `'format.pb-class' = 'parser.proto.ProtoParser$Instance'`
    数据源格式选择"pb","Pb类定义"中开头设置 package 和 option java_outer_classname,"入口message名称" 指定入口message名称
    举例:
    "Pb类定义"开头:
    package parser.proto;
    option java_outer_classname = "ProtoParser";
    "入口message名称":Instance

任务调试

流式任务支持在线调试,通过2种方式获取测试数据:
1.   在当前任务消费的 topic 中抽取最近100条数据
2.   手工上传json格式的文件
测试过程中,SQL 在独立环境运行,所有的输出直接写到本地浏览器,不会对线上生产的数据存储系统造成影响。
alt

参数设置

基础信息:任务名称、描述、责任人、队列等基础信息按需填写。

资源设置

获取推荐配置:根据任务的历史24小时运行情况,给出推荐配置
Flink 运行参数:Flink 相关的动态参数和执行参数,具体设置详见 Flink 官方文档
用户自定义参数:kv格式,按需设置。

提交上线

任务测试通过后,先“保存”任务生成一个草稿版本。然后点击“提交上线”,即按最新的版本启动一个 Flink 实例,后续的运行情况可在“任务运维”中查看。
运维列表的“重启”功能以线上配置为准,不应用最新版本草稿。

历史版本

每次上线会生成一个版本,在“历史版本”功能中可以查看、对比和回滚,回滚的效果是恢复至草稿,需要重新点击上线。
alt

DDL参数设置

DataLeap的流式SQL在DDL部分与开源版本有少量不同,具体参考以下内容。

Kafka DDL 参数

namemeaningrequireddefaultconsumer/producer
connector.typeconnector 的类型,必须是 kafkaYES-consumer & producer
connector.versionkafka 版本,当前支持 '0.10'YES-consumer & producer
connector.clusterkafka 集群名YES-consumer & producer
connector.topickafka topicYES-consumer & producer
connector.group.idconsumer group idYES-consumer
update-mode更新模式, 'append'/'upsert'YES-consumer 就填 'append' 就行; producer 如果是查询的结果是可以更新的就用 upsert, 如果是查询的结果是不可更新的就用 append.
connector.owner作业 ownerNO-consumer & producer
connector.startup-modeearliest-offset/latest-offset/group-offsets/specific-timestamp,详细解释如下NOgroup-offsetsconsumer
connector.specific-timestamp当指定connector.startup-mode为specific-timestamp的时候,需要指定一个ms单位的时间戳,从该时间戳开始消费NO0consumer
connector.reset-to-earliest-for-new-partition该参数表示在任务启动时,如果用的是group-offsets配置,对于那些还没有offset的partition如何处理,详情可以参考:kafka partition扩容NOtrueconsumer
connector.kafka.properties.{param}{param} 是任意 kafka 参数, 如 connector.kafka.properties.ignore.dc.check = trueNO-consumer & producer
connector.log-failures-only写入 kafka 失败时是否只打一条日志,task 不退出NOfalseproducer
connector.rate-limiting-num读取 kafka 限速,是整个 source table 所有并发的流速之和, 配合 connector.rate-limiting-unit 使用。NO-1,默认不限速consumer
connector.rate-limiting-unit读取 kafka 限速的单位,配合 connector.rate-limiting-num 使用,可选单位:'BYTE', 'RECORD' 分别对应 byte 和 消息条数NO'BYTE'consumer

Mysql DDL 参数

namemeaningrequired取值范围default
connector.typeconnector typeYESjdbc-
connector.dbname数据库名YES--
connector.table数据库表名YES--
connector.init-sql新建 db 连接时预执行的语句。 如需要写入表情符号时,设置 'connector.init-sql' = 'SET NAMES utf8mb4'NO--
connector.write.flush.max-rows积攒多少条数据一批次写入NO0~5000
connector.write.flush.interval积攒多久时间的数据一批次写入, 单位 ms, 默认是 0 的话,就是没有定期输出,而不是每来一条就输出。NO0~0
connector.write.max-retries写入数据库失败的情况下,最多尝试多少次NO0~3

Kafka json Format

使用 json format 需在 create table 语句中通过'format.type'='json' 指定。
注意事项:
json类型根据用户在create table中声明的column name和column type做解析。SQL任务目前仅做部分自动适配,例如声明为int类型,但实际json里面是int的string表示,SQL也能自动识别并转换回int。但是如果例如声明为int但实际为long或者声明为int但实际是带有字母的string等,SQL无法直接转换,会报错。
json类型支持嵌套json。例如嵌套json "{"a": "a", "b": {"c": 1, "d": "s"}}", 声明嵌套结构时,需要声明为b Row<c int, d varchar> 即可表明字段c、d是嵌套于字段b中。json也支持array形式。
1.   如果是简单的array,例如"a": [1,2,3,4],则可声明为a Array,即表明a是一个int类型的array,注意,select 的时候,下标是从 1 开始的。例如"a": [6,7,8,9], select a[1] 会返回 6。
2.   如果是嵌套json的array,例如"a": [{"b": 3}, {"b": 6}],则可声明为a Array<Row>, 即表明a是一个object类型array,a的结构中,有一个类型为int的b列。获取的方式如 select a[1].b 会返回3.
参数

namemeaningrequireddefaultnote
format.typeformat 的类型,必须是 'json'YES--
format.derive-schemajson schema 指定方式之一,即自动按table 的schema 推断。强烈建议用这个Notrue样例: 'format.derive-schema'='true'
format.schemajson schema 指定方式之一,指定 type info, 不建议用这个,建议用 format.derive-schemaNo-样例: 'format.schema'='ROW<test1 VARCHAR, test2 TIMESTAMP>'
format.json-schemajson schema 指定方式之一 不建议用这个,建议用 format.derive-schemaNo-样例: 'format.schema'='{'title': 'Person', 'properties': {'firstName': {'type': 'string'}}}
format.fail-on-missing-field缺少字段的时候是否直接失败Nofalse-
format.default-on-missing-field缺少字段的时候是否自动添加默认值Nofalse-
format.skip-dirty跳过脏数据Nofalse-
format.skip-interval-ms脏数据打印间隔(默认是10s)No10000-
format.json-parser-feature.{feature}jackson 的 JsonParser.Feature 相关配置No-样例:'format.json-parser-feature.ALLOW_UNQUOTED_CONTROL_CHARS'='true'
format.enforce-utf-encoding是否强制编码为utf8编码(默认非BMP类型的unicode会以转义的方式来处理NOfalse-
format.filter-null-values是否把null字段不写进json中NOfalse-
format.bytes-as-json-node是否把json中byte字段当成any类型.NOfalse

Kafka pb format

使用 pb format 需在 create table 语句中通过'format.type'='pb' 指定。参数说明如下:

namemeaningrequireddefaultnote
format.typeformat 的类型,必须是 'pb'YES--
format.pb-class指定 pb classYES-样例: 'format.pb-class' = 'parser.proto.ProtoParser$Instance'
format.pb-skip-bytes解析 pb bytes 的时候忽略前几个bytes,这个是 AML 的特殊需求,普通用户请忽略No0样例:'format.pb-skip-bytes' = '8'
format.pb-sink-with-size-headertrue/false. 往外 sink pb bytes 的时候,是否在前面加上 8 字节的 pb bytes size。这个是 AML 的特殊需求,普通用户请忽略Nofalse样例:'format.pb-sink-with-size-header' = 'true'
format.ignore-parse-errors是否忽略解析错误的数据NOfalse-

pb 中类型和 sql 中类型的映射关系:

types in pbsql typenote
repeatedARRAY如: repeated int32 -> ARRAY
MAPMAP如:map<string, int32> -> MAP<varchar, int>
enumvarchar-
oneof-会将 oneof 字段直接解到上一层,详见文末附录
其他复杂类型Row-
doubledouble-
floatfloat-
int32int-
uint32int-
uint64bigint-
sint32int-
sint64bigint-
fixed32int-
fixed64bigint-
sfixed32int-
sfixed64bigint-
boolboolean-
stringvarchar-
bytesbinary-
最近更新时间:2022.09.05 11:25:30
这个页面对您有帮助吗?
有用
有用
无用
无用