说明
UBA数据集成私有化V4.2.0模板支持,用户属性数据集成私有化V4.4.0(含)以上版本支持
企业在通过数据集成将其他厂的UBA数据上报至火山DataTester数据服务时,可能存在一些函数转换的需求,可以通过自定义配置文件和不同转换插件的方式进行数据导入。
自定义配置文件使用HOCON文件格式,需要包含env、source、transform、sink四个部分的配置
env: env中默认为空,可以通过execution.parallelism=1 配置任务并行度,如需配置异常重启策略,也可以参考可视化数据集成(4.4 集成任务异常失败重启)
source: source中为数据源,仅支持从单个数据源读取数据,一般为kafka,具体的数据源配置见Source插件说明
transform: transform为数据转换逻辑,可以配置多个转换逻辑,数据从上到下依次进行转换处理,具体转换插件配置见Transform插件说明
sink: sink为下游数据源,为DataTester侧kafka配置,可使用模版中Sink配置
env { job.name = kafka_2_kafka } source { KafkaTableStream { # kafka地址 consumer.bootstrap.servers = "{ip1}:{port},{ip2}:{port}" # 消费者组名称 consumer.group.id = "test" # topic名称 topics = kafka_source # 临时表名称 result_table_name = fake # json格式 format.type = json # 数据示例 schema = "{\"user_id\": \"ST_1\", \"event_name\": \"abtest_exposure\", \"app_name\": \"seatunel_test\", \"app_id\": 10000000, \"custom\": \"{\\\"A\\\":\\\"a\\\"}\", \"local_time_ms\": 1654831159493, \"ab_version\": 4, \"params\": \"{\\\"A\\\":\\\"a\\\"}\", \"server_time\": 1654831159, \"log_type\": \"mario_event\"}" } } transform { # 如果源数据中没有ssid,则需要使用补充ssid插件 SSID { source_id = user_id source_app_id = app_id ssid_psm = "vpc.device.idsvc" } # 使用json格式化插件,将数据格式化为下游kafka的schema JsonTemplateFormat { # 输出字段名称 json_result_name = event # json模板 event = { "user":{ "user_unique_id": user_id, "ssid": ssid }, "header":{ "app_id": app_id, "app_name": app_name, "custom": custom "ab_version":ab_version }, "params": params, "event_name": event_name "server_time": server_time "log_type": log_type, "local_time_ms": local_time_ms } } } sink { # 将数据输出至sink kafka中 Kafka { # 如无特殊需求,无需修改 row_field_name = "event" # DataTester集群的kafka集群地址 producer.bootstrap.servers = "{ip1}:{port},{ip2}:{port}" # 如无特殊需求,无需修改 topics = "behavior_event" } }
env { execution.parallelism = 1 } source { # KafkaSource样例 KafkaTableStream { # kafka server地址 consumer.bootstrap.servers = "192.168.0.197:9192" # 消费者group id consumer.group.id = "ab" # 需要消费的topic topics = sunday # 消费数据后,生成的flink table name,默认填写fake即可 result_table_name = fake # topic中数据的scheam,支持json,csv # 需要把所有可能存在的数据均罗列出来 format.type = json schema = "{\"user\":\"test_123\",\"ssid\":\"b0354d33-413a-4a98-9e27-c8676f0579ae\",\"params\":{\"ab_profile_20_custom\":20,\"ab_profile_21_custom\":21,\"ab_profile_22_custom\":\"22\"},\"app_id\":10000000,\"time_ms\":1676258783123}" format.field-delimiter = "," format.allow-comments = "true" } } transform { # 补充ssid插件,如果只有user_unique_id,则必须使用此插件 # 如果已自行维护user_unique_id与ssid,则可不使用此插件 SSID { source_id = user source_app_id = app_id # 私有化id服务psm,无特殊情况无需修改 ssid_psm = "vpc.device.idsvc" } # json格式化插件,此样例中json格式,无特殊情况无需修改 JsonTemplateFormat { # 输出字段名称 json_result_name = event event = { # __profile_set,设置用户属性 # __profile_set_once,设置用户属性,如果该属性不存在 # __profile_unset,删除用户属性 # __profile_append,添加一个属性到数组(仅支持数组类型) # __profile_remove,从数组删除一个属性(仅支持数组类型) # __profile_increment,累加属性(仅支持数字类型) # 此处无需写死,可以通过在source kafka进行配置,来做不同的操作,支持的操作如上所列 "event_name": "__profile_set", "user": { # 用户id "user_unique_id": user, # 用户ssid "ssid": ssid }, "header": { # 应用id "app_id": app_id, # 无特殊情况无需修改 "custom": "{}", }, # 需要设置的用户属性具体内容 # key应该为已经在tester中注册过的用户属性,比如age # value应该为需要从输入端取值的key的json路径 "params": { # 解释:从输入端kafka的params.ab_profile_20_custom这个key中取值,并且赋值给tester中的用户属性age "age": "params.ab_profile_20_custom" }, # 无特殊情况无需修改 "extras": "{}", # 设置的时间戳,单位为毫秒,数据流会丢弃7天前的时间戳,建议设置为当前时间 "local_time_ms": time_ms, } } } sink { Kafka { # 如无特殊需求,无需修改 row_field_name = "event" # 如无特殊需求,无需修改 topics = "user_profile" # DataTester集群的kafka集群地址 producer.bootstrap.servers = "192.168.0.197:9192" } }
新模板仅在大于等于4.10.0版本支持,该版本下游发送到sdk_origin_event,无需数据集成任务补充SSID
env { job.name = kafka_2_kafka execution.restart.attempts = "10" execution.restart.delayBetweenAttempts = "300000" execution.restart.strategy = "fixed-delay" } source { KafkaTableStream { # kafka地址 consumer.bootstrap.servers = "{ip1}:{port},{ip2}:{port}" # 消费者组名称 consumer.group.id = "test" # topic名称 topics = "datatester_seatunnel_source" # 临时表名称 无需修改 result_table_name = fake # json格式 无需修改 format.type = json format.allow-comments = "true" format.ignore-parse-errors = "true" format.field-delimiter = "," # 数据示例 schema = ""{\"user_id\":\"test\",\"event_name\":\"purchase\",\"time\":1729910736000,\"app_id\":10000000}"" } } transform { # 使用json格式化插件,将数据格式化为下游kafka的schema JsonTemplateFormat { # 输出字段名称 无需修改 json_result_name = event # json模板 event = { // 必填,无需修改 默认server "applog_type": "server", // 必填, 无需修改 默认app "app_type": "app", "event_v3": [ { // 必填 事件名称 "event": "event_name", // 必填 13位毫秒级时间戳,事件发生时间 "local_time_ms": time, //非必填,如果需要映射事件属性,可以在下面添加对应的映射关系 "params": { "amout":"amout" } } ], "header": { // 必填 应用id "app_id": app_id, // 非必填,事件公共属性,如果需要映射事件公共属性,可以在下面添加对应的映射关系 "custom": { }, // 必填,用户id "user_unique_id": "user_id", // 非必填, 多口径场景下用户id类型,如果需要指定id类型,填写对应id类型,如iphone_id "user_unique_id_type":"iphone_id" } } } } sink { # 将数据输出至sink kafka中 Kafka { # 无需修改 和JsonTemplateFormat插件中json_result_name的event值对应 row_field_name = "event" # DataTester集群的kafka集群地址 producer.bootstrap.servers = "{ip1}:{port},{ip2}:{port}" # 无需修改 默认sdk_origin_event topics = "sdk_origin_event" } }
参数名 | 类型 | 说明 |
---|---|---|
topics | string | 需要消费的topic |
result_table_name | string | 临时表名称,默认fake |
consumer.group.id | string | 消费者的group id |
consumer.bootstrap.servers | string | kafka集群地址,通过 |
schema | string | 数据json格式示例 |
format.type | string | 数据格式,json |
format.field-delimiter | string | 数据分割符号, 逗号 |
format.allow-comments | string | 默认填"true" |
format.ignore-parse-errors | string | 默认填"true" |
示例
KafkaTableStream { consumer.bootstrap.servers = "{ip1}:{port},{ip2}:{port}" consumer.group.id = "test" schema = "{\"user_id\": \"ST_1\", \"event_name\": \"abtest_exposure\", \"app_name\": \"test\", \"app_id\": 10000000, \"custom\": \"{\\\"A\\\":\\\"a\\\"}\", \"local_time_ms\": 1654831159493, \"ab_version\": 4, \"params\": \"{\\\"A\\\":\\\"a\\\"}\", \"server_time\": 1654831159, \"log_type\": \"mario_event\"}" topics = kafka_source result_table_name = fake format.type = json format.allow-comments = "true" format.ignore-parse-errors = "true" format.field-delimiter = "," }
版本大于491可用
Mysql和Hive均通过jdbc的方式连接
参数名 | 类型 | 说明 |
---|---|---|
driver | string | 驱动名称 |
url | string | jdbc连接地址 |
username | string | 数据库用户名 如果无,可填 "" |
password | string | 数据库密码 如果无,可填 "" |
query | string | 查询语句, hive查询的sql需要标明所有需要的列名,提供给下游transform插件使用 |
示例
// hive示例 JdbcSource { driver = org.apache.hive.jdbc.HiveDriver url = "jdbc:hive2://192.168.0.45:9998/default" //目前私有化环境中hive未设置密码 username = "datarangers" password = "" query = "select user_id, event_name, app_id, time from abtest_event" } // mysql示例 JdbcSource { driver = com.mysql.cj.jdbc.Driver url = "jdbc:mysql://localhost/test" username = root password = xxxx query = "select * from abtest_test" }
版本大于4.9.2可用
参数名 | 类型 | 说明 |
---|---|---|
path | string | 文件路径 hdfs文件路径需要以hdfs://开头 |
format.type | string | 文件格式,hdfs文件目前仅支持parquet格式 |
schema | string | 文件schema,当填写 "" 时会自动读取文件schema,也可以手动填写,手动填写必须为Avro schema字符串
|
示例
FileSource { path="hdfs://192.168.0.32:5060/user/test/a" format.type=parquet schema="" }
使用json模板处理数据,value填写上游kafka的schema或者其他创建处理后新增的的字段名
如果字段名不存在或者不是字符串,则直接用模板的value值填充
支持通过a.b.c的方式获取嵌套数据中的字段, params和custom字段会特殊转换为json转义string
本插件需要为最后一个执行的transform插件,生成的event的json字符串直接发送到下游kafka
参数名 | 类型 | 说明 |
---|---|---|
json_result_name | string | |
event | json | name值和json_result_name的值相等,无需修改,都为event |
示例
JsonTemplateFormat { json_result_name = event //输出字段名称,指定event event = { "log_type": "mario_event", "user": { "user_unique_id": "user_id", "ssid": "ssid" } "header": { "app_id": 10000000, // 公共属性 "custom": "custom" }, // 事件名称 必填 "event_name": "event_name", // 事件发生事件,必填 "local_time_ms": local_time_ms // 事件属性,必填 "params": { "number": "number" } } }
根据指定的user_unique_id字段名称,补充ssid
参数名 | 类型 | 说明 |
---|---|---|
source_id | string | 数据中user_unique_id字段名称 |
source_app_id | int | 数据中appid字段名称 |
ssid_psm | string | 填充 "vpc.device.idsvc" 即可 |
uid_type | string | 如果需要多口径上报,需要填写多口径id类型,如“phone_id”,否则不需要填写 |
示例
SSID { source_id = user_id source_app_id = app_id ssid_psm = "vpc.device.idsvc" }
改变field中的数据类型,目前只支持数字和字符串之间。支持string,int,long,float,double
如转换失败对应字段值会被赋值null
参数名 | 类型 | 说明 |
---|---|---|
source_field_names | string[] | 需要改变的字段名称数组 |
result_field_types | string[] | 需要改为的类型数组 |
示例
FieldTypeTransform{ source_field_names = ["A","B","C"] result_field_types = ["int","float", "string"] }
将String类型的Field进行简单映射
参数名 | 类型 | 说明 |
---|---|---|
change_field_name | string | 需要改变的字段名称 |
map | string | 映射关系 |
示例
FieldMap{ change_field_name = event map = "$AppStart : App_launch, $AppEnd : App_end" } 将event字段中值为$AppStart的数据替换为App_launch ,将$AppEnd 替换为App_end
根据指定字段的顺序,按序获取各个field的值,如果获取到值,则赋值给目标field;如果取不到值或者值为null,则去判断下一个field
name | type | 说明 |
---|---|---|
merge_field_names | string | 需要merge的field names,用','分隔,优先级依次降低 |
result_field_name | string | merge后的值的field name |
result_field_type | string | 支持int, long, float, double, string |
defualt_value | string | 如果所有field均未取到值,或者均为null,最后赋值的默认值;值类型需要和result_field_type一致 |
示例
FieldMerge { merge_field_names = "age1,age2,age3" result_field_name = "age" result_field_type = "int" default_value = 100 } 从上游的数据中,按照age1、age2、age3的顺序,尝试获取value,获取到之后,则赋值给age;如果全都获取不到,则使用默认值100 age1 = 111,age2 = 222,age3 = 333 result = 111 (age1 = null,)age2 = 222,age3 = 333 result = 222 age1 = "abc",age2 = 222,age3 = 333 result = 222,虽然age1存在值abc,但是abc无法转换成int值,所以会被扔掉
根据正则表达式,匹配指定字段满足要求的字段value值,然后按照定义的处理动作,依次对该字段进行修改
name | type | 说明 |
---|---|---|
rules | list | 定义处理规则,其中rule包含的字段如下表格 |
fieldName | string | 字段名,对该字段的value进行匹配和修改 |
---|---|---|
regex | string | 正则表达式,需转义,例如匹配以$开头的字符串,转义后表达式为 |
operation | list |
|
operationValue | list | 与操作符对应的操作值 |
示例
FieldValueRegex{ rules=[{ fieldName="event_name" regex="^\\$" operation=["remove", "replace"] operationValue=["$", "APPLaunch,app_launch"] },{ fieldName="user_id" regex="^user" operation=["insert_head", "insert_tail"] operationValue=["integration_", "_001"] }] }
说明
原始字段为
{ "user_id": "user_510ec7", "event_name": "$APPLaunch", "app_id": 10000000 }
经上述示例中的规则处理后的字段为
{ "user_id": "integration_user_510ec7_001", "event_name": "app_launch", "app_id": 10000000 }
仅支持配置文件使用 ,大于491版本
根据filter_expression的表达式对数据进行过滤,filter_expression为过滤规则字符串
filter_keys是filter_expression中需要替换为实际数据中的key
filter_keys中需要将filter_expression所有需要判断的key填写完整,嵌套key以(.)连接
参数名 | 类型 | 说明 |
---|---|---|
filter_expression | string | 过滤表达式字符串,支持等于(==)、不等于(!=)、与(&&)、或( |
filter_keys | List | 过滤表达式中,需要提取的字段列表 |
示例
ExpressionFilter { filter_expression = "(event == 'exposure' && params.count == 5)||(event == 'buy' && params.amount == 'ab')" filter_keys = [event, params.count, params.amount] } 将满足条件 event为exposure并且params.count 值为5的数据 或者 event为buy 并且params.amount值为'ab'的数据过滤到下游 不满足条件的数据直接丢弃
根据指定字段的值对数据进行过滤
filter_field_name为字段名称
filter_values是指定过滤到下游的数值,多个值以逗号分割的字符串
参数名 | 类型 | 说明 |
---|---|---|
filter_field_name | string | 字段名称 |
filter_values | string | 指定过滤到下游的数值,多个值以逗号分割的字符串 |
示例
FieldFilter{ filter_field_name = event filter_values = "purchase,buy" } 将event字段中值为purchase或者buy的数据发送到下游,其他值的数据则过滤无法进入下游