火山引擎增长分析为降低客户实时数据接入及历史数据导入中ETL的工作量,提供了通过编写简单的映射文件并提交后台任务的方式,将客户数据接入到增长分析数据库中。下文中将详细介绍适用范围及适用方式
支持将客户自定义格式的事件或用户数据导入datafinder
当前版本仅支持数据在客户kafka或者客户hdfs,kafka中数据实时持续消费,hdfs中数据一次性导入。
数据必须为json格式,其中hdfs中的离线数据可以压缩
datafinder集群可以内网访问到客户kafka和hdfs,例如kafka可以通过broker.server访问(192.168..:9092);hdfs可以通过fs.defaultFS路径访问(hdfs://192.168..:9000/path)
实时和离线数据转换映射的配置方式是一致的,数据读写方式不同,下文中先介绍实时和离线模版,再一起介绍数据转换映射映射的配置方式
注意:仅能修改模版中标黄处,标蓝处为注释并在实际使用时删除。
env { execution.parallelism = 3 job.mode = "STREAMING" checkpoint.interval = 2000 execution.checkpoint.timeout = 6000 execution.checkpoint.data-uri = "hdfs://vpc-minibase/user/rangers/data/flink/checkpoint/" } source { Kafka { bootstrap.servers = "192.168.6.24:9192,192.168.6.25:9192" #修改为客户kafka的bootstrap.servers,填写2个即可 topic = "test_topic_input" #修改为需要接入的客户topic consumer.group = "finder_data_group" #客户需要指定消费组,需要注意不同任务消费同一个topic时消费组不能一样 kafka.config = { max.poll.records = "2000" max.poll.interval.ms = "1000" auto.offset.reset = "earliest" } start_mode = "group_offsets" format = text result_table_name = "input_source" } } transform { EventMapper { process_parallelism = 6 source_table_name = "input_source" result_table_name = "output_distination" #数据映射下文中会详细说明 ***** } } sink { kafka { topic = "sdk_origin_event" bootstrap.servers = "192.168.6.23:9192,192.168.6.24:9192" #修改为datafinder集群的bootstrap.servers format = text kafka.request.timeout.ms = 60000 source_table_name = "output_distination" kafka.config = { acks = "all" request.timeout.ms = "60000" buffer.memory = "33554432" } } }
注意:仅能修改模版中标黄处,标蓝处为注释并在实际使用时删除。
env { execution.parallelism = 6 job.mode = "BATCH" } source { source { HdfsFile { fs.defaultFS = "hdfs://192.168.1.1:9000" #修改为客户实际的fs.defaultFS path = "/user/rangers/data/event" #修改为客户实际文件路径,可以是指定具体文件,也可以指定文件目录 file_format_type = "text" result_table_name = "input_source" } } transform { EventMapper { process_parallelism = 6 source_table_name = "input_source" result_table_name = "output_distination" #数据映射下文中会详细说明 ***** } } sink { HdfsFile { path = "/user/rangers/data/integration/" file_format_type = "text" result_table_name = "output_distination" hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml" fs.defaultFS = "hdfs://vpc-minibase" } }
数据映射配置写在模版的transfrom下:
transform { EventMapper { process_parallelism = 6 source_table_name = "input_source" result_table_name = "output_distination" 数据映射编写位置 } }
为快速说明如何编写配置映射,以下具一个示例,例如,您的数据格式如下:
{ "event_n": "app_start", //事件名 "user_identity": "user_123", //用户标识 "event_time": 1712738195000, //事件发生时间 "device_os": "andriod", //设备系统 "department": "工厂", //事件公共属性 "properties": { //事件属性 "level": 10, "param_1": "audi", "param_2": "value2" } }
并且,你有以下要求:
你的映射配置需要编写如下:
transform { EventMapper { process_parallelism = 6 source_table_name = "input_source" result_table_name = "output_distination" #necessary_field用来设置几个必要的字段 necessary_field=[ { source_name="event_n" target_name="event" }, #此段标识原始数据中的event_n作为事件名 { source_name="event_time" target_name="local_time_ms" },#此段标识原始数据中的event_time作为事件时间 { target_name="app_id" value=10000000 } #此段标识需要导入的app_id ], #id_field用来设置用户id信息 id_field=[ { source_name="user_identity" target_name="user_unique_id" } #此段标识将user_identity作为user_unique_id ] #id_field用来设置事件公共属性 common_field=[ { source_name="device_os" target_name="os_name" }, #此段标识将device_os设置为事件公共属性os_name { source_name="department" target_name="department" } #此段标识将department设置为事件公共属性department ] #id_field用来设置事件属性 param_field=[ { source_name="properties.level" target_name="level" }, #此段标识将properties下的level设置为事件公共属性os_name { source_name="properties.param_1" target_name="car" } #此段标识将properties下的param_1设置为事件公共属性car ] } }
上文示例中的necessary_field、id_field这些字段就是外层字段
字段类型 | 说明 |
---|---|
necessary_field | 必须要有的字段,目前包含event_name,app_id,local_time_ms |
id_field | 必须要有的类型,内部至少需要有一个id信息,可以是user_unique_id和anonymous_id |
common_field | 公共属性 |
param_field | 事件属性 |
user_field | 用户属性 |
exclude_field | 原始数据中需要移除的字段 |
filter_mode | 用于过滤自己需要的数据 |
put_all_mode | 当原始数据为一条完全平铺的数据,不存在任何嵌套的情况的时候,配置该字段,可以将剩余的所有的字段全部作为事件属性或者自定义公共属性来处理,。 |
finder_common_param | list 类型,用于标识 Finder 公共属性: |
上文中类似source_name、target_name、value这些就是属性描述字段
字段名 | 说明 | 是否必须 |
---|---|---|
source_name | 原始数据中的字段名 | 否(未指定value的情况下,一定需要) |
target_name | 需要映射为Finder侧的字段名 | 是 |
value | 直接为某个字段指定值,不依赖原始数据 | 否 |
value_type | value类型,与value字段没有关系,标识当前映射关系的value类型,默认为string类型 | 否 |
parse_type | value_type为map时可用,
| 否 |
delimiter |
| 否 |
spliter | 如果上报的数据是 a:b&c:d这种形式,需要将其拆开为两个属性 | 否 |
id_type | 多口径场景下使用,默认是finder_uid | 否 |
operation_type | 针对用户属性生效: | |
value_mapping | 针对预置事件或者其它属性value值需要映射的情况
| 否 |
conditional | 用于执行满足某种条件才需要进行字段的映射: | |
conditional_target_name | 满足 conditional 的情况下,可以使用指定的 source_name 或者 target_name | |
conditional_source_name |
附录中会给出一个复杂的映射配置,可以参考其来配置更为复杂的处理方式,建议参考上文示例中的较为简单的示例进行使用,过于复杂的场景可能因为配置错误不符合预期
下载以下脚本
上传配置
bash upload_file.sh [文件名] [项目id或者app_id] # [项目id或者app_id] 如果开启了多应用就用项目名,为开启多应用则用app_id
配置上传成功会返回配置文件的ID,启动任务时需要用到
{"code":0,"message":"success","data":24}
部署任务
部署任务后会自动启动
bash deploy.sh [任务名] [项目id或者app_id] [配置文件的ID] [任务类型] #[任务名]:仅限英文小写字符串,并不能有特殊字符 #[任务类型]:实时为0,离线为2
如果启动成功会返回一个任务id,后期查询任务状态,停止、删除任务等,都需要使用到该任务id
{"code":0,"message":"success","data":77}
停止任务
bash stop.sh [任务id] [项目id或者app_id]
启动任务
bash start.sh [任务id] [项目id或者app_id]
删除任务
任务在运行中时无法删除
bash stop.sh [任务id] [项目id或者app_id]
env { # You can set flink configuration here execution.parallelism = 6 job.mode = "STREAMING" checkpoint.interval = 2000 execution.checkpoint.timeout = 6000 execution.checkpoint.data-uri = "hdfs://vpc-minibase/user/rangers/data/flink/checkpoint/" } source { Kafka { topic = "test_topic_input" bootstrap.servers = "192.168.6.23:9192,192.168.6.24:9192,192.168.6.25:9192,192.168.6.34:9192" format = text result_table_name = "fake" start_mode = "group_offsets" consumer.group = "finder_seatunnel_group" kafka.config = { max.poll.records = "2000" max.poll.interval.ms = "100" auto.offset.reset = "earliest" } } } transform { EventMapper { process_parallelism = 18 source_table_name = "fake" result_table_name = "fake1" filter_mode={ is_sa=true } exclude_field=[ "properties.$app_state", "properties.$is_first_day", "properties.$is_first_time", "properties.$receive_time", "properties.$wifi", "properties.$province", "properties.$viewport_height", "properties.$viewport_position", "properties.$viewport_width", "properties.$item_join", "properties.$receive_time", "properties.$lib_plugin_version", "properties.$ios_install_source", "properties.$channel_device_info", "properties.$ios_install_disable_callback", "properties.$is_channel_callback_event", "properties.$channel_extra_information" ], necessary_field=[ { source_name=type target_name=event conditional=[ { name=type value=track } ] conditional_source_name=event value_mapping=[ { old_value="$AppStart" mapping_value="app_launch" }, { old_value="$AppInstall" mapping_value="app_launch" }, { old_value="$AppEnd" mapping_value="app_terminate" }, { old_value="$AppViewScreen" mapping_value="app_pageview" }, { old_value="$AppClick" mapping_value="app_click" }, { old_value="$AppInstall" mapping_value="properties.$activation" }, { old_value="AppCrash" mapping_value="properties.$crash" }, { old_value="$MPLaunch" mapping_value="app_launch" }, { old_value="$MPShow" mapping_value="app_launch" }, { old_value="$MPShare" mapping_value="on_share" }, { old_value="$MPViewScreen" mapping_value="predefine_pageview" }, { old_value="$MPHide" mapping_value="app_terminate" }, { old_value="$MPClick" mapping_value="bav2b_click" }, { old_value="$MPAddFavorites" mapping_value="on_addtofavorites" }, { old_value="$MPPageLeave" mapping_value="predefine_pageview_hide" }, { old_value="$pageview" mapping_value="predefine_pageview" }, { old_value="$WebClick" mapping_value="bav2b_click" }, { old_value="profile_set" mapping_value="__profile_set" } { old_value="profile_set_once" mapping_value="__profile_set_once" }, { old_value="profile_increment" mapping_value="__profile_increment" } { old_value="profile_append" mapping_value="__profile_append" }, { old_value="profile_unset" mapping_value="__profile_unset" } { old_value="item_set" mapping_value="__item_set" }, { old_value="item_delete" mapping_value="__item_unset" } ] }, { source_name=time target_name=local_time_ms value_type=long }, { source_name=recv_time target_name=server_time value_type=long }, { source_name=project target_name=app_id value_mapping=[ { old_value=ebiz_test mapping_value=10000001 } ] } ], common_field=[ { source_name=time_free target_name=__is_history }, { source_name="$device_id" target_name=openudid conditional={ name="properties.$lib" value='iOS' } conditional_target_name=vendor_id }, { source_name="properties.$app_crashed_reason" target_name=app_crashed_reason }, { source_name="identities.$identity_idfv" target_name=vendor_id }, { source_name="identities.$identity_idfv" target_name=idfv }, { source_name="identities.$identity_idfa" target_name=idfa }, { source_name="identities.$identity_android_id" target_name=openudid }, { source_name="identities.$identity_caid" target_name=caid }, { source_name="identities.$identity_gaid" target_name=gaid }, { source_name="identities.$identity_gaid" target_name=oaid }, { source_name="properties.$app_version" target_name=app_version }, { source_name="properties.$timezone_offset" target_name=timezone_offset }, { source_name="properties.$user_agent" target_name=user_agent }, { source_name="properties.$source_package_name" target_name=package }, { source_name="properties.$model" target_name=device_model }, { source_name="properties.$brand" target_name=device_brand }, { source_name="properties.$manufacturer" target_name=device_manufacturer }, { source_name="properties.$os" target_name=os_name }, { source_name="properties.$os_version" target_name=os_version }, { source_name="properties.$screen_height" target_name=height },{ source_name="properties.$screen_width" target_name=width }, { source_name="properties.$browser" target_name=browser }, { source_name="properties.$browser_version" target_name=browser_version }, { source_name="properties.$network_type" target_name=access }, { source_name="properties.$latest_utm_source" target_name=utm_source }, { source_name="properties.$latest_utm_medium" target_name=utm_medium }, { source_name="properties.$latest_utm_term" target_name=utm_term }, { source_name="properties.$latest_utm_content" target_name=utm_content }, { source_name="properties.$latest_utm_campaign" target_name=utm_campaign }, { source_name="properties.$channel_name" target_name=channel }, { source_name="properties.$channel_ad_id" target_name=ad_id }, { source_name="properties.$channel_campaign_id" target_name=campaign_id }, { source_name="properties.$ip" target_name=client_ip }, { source_name="properties.$referrer" target_name=referrer }, { source_name="properties.$referrer_host" target_name=referrer_host }, { source_name="properties.$screen_orientation" target_name=screen_orientation }, { source_name="properties.$lib" target_name=sdk_lib }, { source_name="properties.$lib_version" target_name=sdk_version }, ], param_field=[ { source_name="properties.$resume_from_background" target_name=is_background }, { source_name="properties.$event_duration" target_name=session_duration }, { source_name="properties.$url_path" target_name=path }, { source_name="properties.$latest_share_distinct_id" target_name=query_from_user_unique_id }, { source_name="properties.$latest_share_depth" target_name=query_share_depth }, { source_name="properties.$latest_share_distinct_id" target_name=query_from_user_unique_id }, { source_name="properties.$latest_share_depth" target_name=query_share_depth }, { source_name="properties.$test_list" value_type=list delimiter="," }, { source_name="properties.$test_url" delimiter="," spliter="," }, { source_name="properties.$test_url1" delimiter="&" spliter="=" }, { source_name="properties" value_type=map parse_type=0 } ], id_field=[ { source_name=distinct_id target_name=anonymous_id conditional=[ { name="properties.$is_login_id" value=true } ] conditional_target_name=user_unique_id } ] } } sink { kafka { topic = "test_topic_output" bootstrap.servers = "192.168.6.23:9192,192.168.6.24:9192,192.168.6.25:9192,192.168.6.34:9192" format = text kafka.request.timeout.ms = 60000 source_table_name = "fake1" kafka.config = { acks = "all" request.timeout.ms = "60000" buffer.memory = "33554432" } } }