You need to enable JavaScript to run this app.
导航
自定义配置集成
最近更新时间:2024.09.23 16:45:52首次发布时间:2023.06.09 16:02:39

说明

UBA数据集成私有化V4.2.0模板支持,用户属性数据集成私有化V4.4.0(含)以上版本支持

一、概述

企业在通过数据集成将其他厂的UBA数据上报至火山DataTester数据服务时,可能存在一些函数转换的需求,可以通过自定义配置文件和不同转换插件的方式进行数据导入。

二、功能介绍

自定义文件格式

自定义配置文件使用HOCON文件格式,需要包含env、source、transform、sink四个部分的配置
env: env中默认为空,可以通过execution.parallelism=1 配置任务并行度,如需配置异常重启策略,也可以参考可视化数据集成(4.4 集成任务异常失败重启)
source: source中为数据源,仅支持从单个数据源读取数据,一般为kafka,具体的数据源配置见Source插件说明
transform: transform为数据转换逻辑,可以配置多个转换逻辑,数据从上到下依次进行转换处理,具体转换插件配置见Transform插件说明
sink: sink为下游数据源,为DataTester侧kafka配置,可使用模版中Sink配置

2.1 配置模版

2.1.1UBA数据Demo模板

env {
  job.name = kafka_2_kafka
}

source {
    KafkaTableStream {
        # kafka地址
        consumer.bootstrap.servers = "{ip1}:{port},{ip2}:{port}"
        # 消费者组名称
        consumer.group.id = "test"
        # topic名称
        topics = kafka_source
        # 临时表名称
        result_table_name = fake
        # json格式
        format.type = json
        # 数据示例
        schema = "{\"user_id\": \"ST_1\", \"event_name\": \"abtest_exposure\", \"app_name\": \"seatunel_test\", \"app_id\": 10000000, \"custom\": \"{\\\"A\\\":\\\"a\\\"}\", \"local_time_ms\": 1654831159493, \"ab_version\": 4, \"params\": \"{\\\"A\\\":\\\"a\\\"}\", \"server_time\": 1654831159, \"log_type\": \"mario_event\"}"
    }
}

transform {
    # 如果源数据中没有ssid,则需要使用补充ssid插件
    SSID {
        source_id = user_id
        source_app_id = app_id
        ssid_psm = "vpc.device.idsvc"
    }

    # 使用json格式化插件,将数据格式化为下游kafka的schema
    JsonTemplateFormat {
        # 输出字段名称
        json_result_name = event
        # json模板         
        event = {
            "user":{
                "user_unique_id": user_id,
                "ssid": ssid
            },
            "header":{
                "app_id": app_id,
                "app_name": app_name,
                "custom": custom
                "ab_version":ab_version
            },
            "params": params,
            "event_name": event_name
            "server_time": server_time
            "log_type": log_type,
            "local_time_ms": local_time_ms
        }
    }
}

sink {
    # 将数据输出至sink kafka中
    Kafka {
        # 如无特殊需求,无需修改
        row_field_name = "event"
        # DataTester集群的kafka集群地址
        producer.bootstrap.servers = "{ip1}:{port},{ip2}:{port}"
        # 如无特殊需求,无需修改
        topics = "behavior_event" 
    }
}

2.1.2用户属性Demo模板

env {
  execution.parallelism = 1
}
source {
    # KafkaSource样例
    KafkaTableStream {
        # kafka server地址
        consumer.bootstrap.servers = "192.168.0.197:9192"
        # 消费者group id
        consumer.group.id = "ab"
        # 需要消费的topic
        topics = sunday
        # 消费数据后,生成的flink table name,默认填写fake即可
        result_table_name = fake
        # topic中数据的scheam,支持json,csv
        # 需要把所有可能存在的数据均罗列出来
        format.type = json
        schema = "{\"user\":\"test_123\",\"ssid\":\"b0354d33-413a-4a98-9e27-c8676f0579ae\",\"params\":{\"ab_profile_20_custom\":20,\"ab_profile_21_custom\":21,\"ab_profile_22_custom\":\"22\"},\"app_id\":10000000,\"time_ms\":1676258783123}"
        format.field-delimiter = ","
        format.allow-comments = "true"
    }
}
transform {
    # 补充ssid插件,如果只有user_unique_id,则必须使用此插件
    # 如果已自行维护user_unique_id与ssid,则可不使用此插件
    SSID {
        source_id = user
        source_app_id = app_id
        # 私有化id服务psm,无特殊情况无需修改
        ssid_psm = "vpc.device.idsvc"
    }
    # json格式化插件,此样例中json格式,无特殊情况无需修改
    JsonTemplateFormat {
      # 输出字段名称
        json_result_name = event
        event = {
            # __profile_set,设置用户属性
            # __profile_set_once,设置用户属性,如果该属性不存在
            # __profile_unset,删除用户属性
            # __profile_append,添加一个属性到数组(仅支持数组类型)
            # __profile_remove,从数组删除一个属性(仅支持数组类型)
            # __profile_increment,累加属性(仅支持数字类型)
            # 此处无需写死,可以通过在source kafka进行配置,来做不同的操作,支持的操作如上所列
            "event_name": "__profile_set", 
            "user": {
                # 用户id
                "user_unique_id": user,
                # 用户ssid
                "ssid": ssid
            },
            "header": {
                # 应用id
                "app_id": app_id,
                # 无特殊情况无需修改
                "custom": "{}",
            },
            # 需要设置的用户属性具体内容
            # key应该为已经在tester中注册过的用户属性,比如age
            # value应该为需要从输入端取值的key的json路径
            "params": {
                # 解释:从输入端kafka的params.ab_profile_20_custom这个key中取值,并且赋值给tester中的用户属性age
                "age": "params.ab_profile_20_custom"
            },
            # 无特殊情况无需修改
            "extras": "{}",
            # 设置的时间戳,单位为毫秒,数据流会丢弃7天前的时间戳,建议设置为当前时间
            "local_time_ms": time_ms,
        }
    }
}
sink {
    Kafka {
        # 如无特殊需求,无需修改
        row_field_name = "event"
        # 如无特殊需求,无需修改
        topics = "user_profile"
        # DataTester集群的kafka集群地址
        producer.bootstrap.servers = "192.168.0.197:9192"
    }
}

2.2 Source插件说明

2.2.1 Kafka

参数名

类型

说明

topics

string

需要消费的topic

result_table_name

string

临时表名称,默认fake

consumer.group.id

string

消费者的group id

consumer.bootstrap.servers

string

kafka集群地址,通过,分隔

schema

string

数据json格式示例

format.type

string

数据格式,json

format.field-delimiter

string

数据分割符号, 逗号

format.allow-comments

string

默认填"true"

format.ignore-parse-errors

string

默认填"true"

示例

KafkaTableStream {
        consumer.bootstrap.servers = "{ip1}:{port},{ip2}:{port}"
        consumer.group.id = "test"
        schema = "{\"user_id\": \"ST_1\", \"event_name\": \"abtest_exposure\", \"app_name\": \"test\", \"app_id\": 10000000, \"custom\": \"{\\\"A\\\":\\\"a\\\"}\", \"local_time_ms\": 1654831159493, \"ab_version\": 4, \"params\": \"{\\\"A\\\":\\\"a\\\"}\", \"server_time\": 1654831159, \"log_type\": \"mario_event\"}"
        topics = kafka_source
        result_table_name = fake
        format.type = json
        format.allow-comments = "true"
        format.ignore-parse-errors = "true"
        format.field-delimiter = ","
}

2.2.2 Mysql和Hive

版本大于491可用

Mysql和Hive均通过jdbc的方式连接

参数名

类型

说明

driver

string

驱动名称
mysql为com.mysql.cj.jdbc.Driver
hive为org.apache.hive.jdbc.HiveDriver

url

string

jdbc连接地址
mysql参考"jdbc:mysql://localhost/test"
hive参考jdbc:hive2://localhos/default

username

string

数据库用户名 如果无,可填 ""

password

string

数据库密码 如果无,可填 ""

query

string

查询语句, hive查询的sql需要标明所有需要的列名,提供给下游transform插件使用

示例

// hive示例
JdbcSource {
      driver = org.apache.hive.jdbc.HiveDriver
      url = "jdbc:hive2://192.168.0.45:9998/default"
      //目前私有化环境中hive未设置密码
      username = "datarangers"
      password = ""
      query = "select user_id, event_name, app_id, time from abtest_event"
 }
 // mysql示例
 JdbcSource {
        driver = com.mysql.cj.jdbc.Driver
        url = "jdbc:mysql://localhost/test"
        username = root
        password = xxxx
        query = "select * from abtest_test"
}

2.2.3 HDFS

版本大于4.9.2可用

参数名

类型

说明

path

string

文件路径 hdfs文件路径需要以hdfs://开头

format.type

string

文件格式,hdfs文件目前仅支持parquet格式

schema

string

文件schema,当填写 "" 时会自动读取文件schema,也可以手动填写,手动填写必须为Avro schema字符串
例如

{\"type\":\"record\",\"name\":\"hive_schema\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"user_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"amount\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null}]}"

示例

FileSource {
    path="hdfs://192.168.0.32:5060/user/test/a"
    format.type=parquet
    schema=""
}

2.3 transform插件说明

2.3.1 JsonTemplateFormat

使用json模板处理数据,value填写上游kafka的schema或者其他创建处理后新增的的字段名

如果字段名不存在或者不是字符串,则直接用模板的value值填充

支持通过a.b.c的方式获取嵌套数据中的字段, params和custom字段会特殊转换为json转义string
本插件需要为最后一个执行的transform插件,生成的event的json字符串直接发送到下游kafka

参数名

类型

说明

json_result_name

string

event

json

name值和json_result_name的值相等,无需修改,都为event

示例

JsonTemplateFormat {
    json_result_name = event     //输出字段名称,指定event
    event = {
        "log_type": "mario_event",
        "user": {
            "user_unique_id": "user_id",
            "ssid": "ssid"
        }
        "header": {
            "app_id": 10000000,
            // 公共属性
            "custom": "custom"
        },
        // 事件名称 必填
        "event_name": "event_name",
        // 事件发生事件,必填
        "local_time_ms": local_time_ms
        // 事件属性,必填
        "params": {
            "number": "number"
        }
    }
}

2.3.2 SSID

根据指定的user_unique_id字段名称,补充ssid

参数名

类型

说明

source_id

string

数据中user_unique_id字段名称

source_app_id

int

数据中appid字段名称

ssid_psm

string

填充 "vpc.device.idsvc" 即可

uid_type

string

如果需要多口径上报,需要填写多口径id类型,如“phone_id”,否则不需要填写

示例

SSID {
        source_id = user_id
        source_app_id = app_id
        ssid_psm = "vpc.device.idsvc"
    }

2.3.3 FieldTypeTransform

改变field中的数据类型,目前只支持数字和字符串之间。支持string,int,long,float,double

如转换失败对应字段值会被赋值null

参数名

类型

说明

source_field_names

string[]

需要改变的字段名称数组

result_field_types

string[]

需要改为的类型数组

示例

FieldTypeTransform{
    source_field_names = ["A","B","C"]
    result_field_types = ["int","float", "string"]
}

2.3.4 FieldMap

将String类型的Field进行简单映射

参数名

类型

说明

change_field_name

string

需要改变的字段名称

map

string

映射关系

示例

FieldMap{
    change_field_name = event
    map = "$AppStart : App_launch, $AppEnd : App_end"
}
将event字段中值为$AppStart的数据替换为App_launch ,将$AppEnd 替换为App_end

2.3.5 FieldMerge

根据指定字段的顺序,按序获取各个field的值,如果获取到值,则赋值给目标field;如果取不到值或者值为null,则去判断下一个field

name

type

说明

merge_field_names

string

需要merge的field names,用','分隔,优先级依次降低

result_field_name

string

merge后的值的field name

result_field_type

string

支持int, long, float, double, string

defualt_value

string

如果所有field均未取到值,或者均为null,最后赋值的默认值;值类型需要和result_field_type一致

示例

FieldMerge {
    merge_field_names = "age1,age2,age3"
    result_field_name = "age"
    result_field_type = "int"
    default_value = 100
}
从上游的数据中,按照age1、age2、age3的顺序,尝试获取value,获取到之后,则赋值给age;如果全都获取不到,则使用默认值100
age1 = 111,age2 = 222,age3 = 333
result = 111
(age1 = null,)age2 = 222,age3 = 333
result = 222
age1 = "abc",age2 = 222,age3 = 333
result = 222,虽然age1存在值abc,但是abc无法转换成int值,所以会被扔掉

2.3.6 FieldValueRegex

根据正则表达式,匹配指定字段满足要求的字段value值,然后按照定义的处理动作,依次对该字段进行修改

name

type

说明

rules

list

定义处理规则,其中rule包含的字段如下表格

fieldName

string

字段名,对该字段的value进行匹配和修改

regex

string

正则表达式,需转义,例如匹配以$开头的字符串,转义后表达式为"^\\$"

operation

list

  • 操作符,支持remove、replace、insert_head、insert_tail
    • remove:删除特定字符
    • replace:替换字符,用,隔开;如APPLaunch,app_launch即将APPLaunch替换为app_launch
    • insert_head:在头部插入指定字符串
    • insert_tail:在末尾插入指定字符串

operationValue

list

与操作符对应的操作值

示例

FieldValueRegex{
rules=[{
       fieldName="event_name"
       regex="^\\$"
       operation=["remove", "replace"]
       operationValue=["$", "APPLaunch,app_launch"]
    },{
       fieldName="user_id"
       regex="^user"
       operation=["insert_head", "insert_tail"]
       operationValue=["integration_", "_001"]
    }]
}

说明
原始字段为

{
    "user_id": "user_510ec7",
    "event_name": "$APPLaunch",
    "app_id": 10000000
}

经上述示例中的规则处理后的字段为

{
    "user_id": "integration_user_510ec7_001",
    "event_name": "app_launch",
    "app_id": 10000000
}

2.3.7 ExpressionFilter

仅支持配置文件使用 ,大于491版本

根据filter_expression的表达式对数据进行过滤,filter_expression为过滤规则字符串
filter_keys是filter_expression中需要替换为实际数据中的key
filter_keys中需要将filter_expression所有需要判断的key填写完整,嵌套key以(.)连接

参数名

类型

说明

filter_expression

string

过滤表达式字符串,支持等于(==)、不等于(!=)、与(&&)、或(

filter_keys

List

过滤表达式中,需要提取的字段列表

示例

ExpressionFilter {
    filter_expression = "(event == 'exposure' && params.count == 5)||(event == 'buy' && params.amount == 'ab')"
    filter_keys = [event, params.count, params.amount]
}
将满足条件 event为exposure并且params.count 值为5的数据
或者 event为buy 并且params.amount值为'ab'的数据过滤到下游
不满足条件的数据直接丢弃

2.3.8 FieldFilter

根据指定字段的值对数据进行过滤
filter_field_name为字段名称
filter_values是指定过滤到下游的数值,多个值以逗号分割的字符串

参数名

类型

说明

filter_field_name

string

字段名称

filter_values

string

指定过滤到下游的数值,多个值以逗号分割的字符串

示例

FieldFilter{
    filter_field_name = event
    filter_values = "purchase,buy"
}
将event字段中值为purchase或者buy的数据发送到下游,其他值的数据则过滤无法进入下游