本文档介绍数据加工资源函数的语法格式和典型示例。
注意
res_rds_mysql 函数用于从火山引擎云数据库 MySQL 版和自建的公网 MySQL 数据库中拉取表数据。支持如下三种拉取方式:
拉取模式 | 说明 |
---|---|
普通拉取模式 | 首次执行加工任务时,日志服务将拉取全量的表数据,后续不再拉取。如果您数据库中的数据不持续更新,您可选择普通拉取模式。 |
全量更新模式 | 在执行加工任务过程中,日志服务将定期拉取一次全量的表数据。每次全量拉取,非常耗时。如果您数据库中的数据量较小(数据量<= 2GB)、更新不频繁(配置 refresh_interval >=300s),您可选择全量更新模式。 |
增量更新模式 | 在执行加工任务过程中,日志服务将定期根据数据库表中的时间戳字段拉取新增的数据。在增量更新模式下,日志服务每次只拉取新增的数据,效率高。您还可以配置 refresh_interval 为 1 0秒,实现秒级别同步数据库表中的数据,即时性高。如果您数据库表中的数据量大、更新频繁且您对数据实时性要求高,您可选择增量更新模式。 |
res_rds_mysql(host="数据库地址",port="数据库的端口", username="数据库的用户名", password="数据库的密码", database="数据库的名称", table="表名", vpc=None, fields=None, include_fields=None, exclude_fields=None,primary_keys=None, refresh_interval=0, update_time_key=None,base_retry_back_off=1, max_retry_back_off=60)
参数说明如下:
参数 | 参数类型 | 是否必选 | 默认值 | 说明 |
---|---|---|---|---|
host | String | 是 | / | MySQL 实例域名,目前支持火山引擎云数据库 MySQL 版和自建的公网数据库。 |
port | String | 是 | / | 数据库访问端口。 |
username | String | 是 | / | 数据库访问用户名。 |
password | String | 是 | / | 数据库访问密码。 |
database | String | 是 | / | 数据库名称。 |
table | String | 是 | / | 数据库表名。 |
vpc | String | 否 | / | 使用火山引擎云数据库 MySQL 版时,需配置 MySQL 实例所使用的 VPC ID,例如 |
fields | String | 否 | 全部字段 | 待拉取的字符串列表或者字符串别名列表。如果不填,默认使用 table 参数所返回的所有列。指定多个字段时需要使用半角逗号(,)分隔,例如 |
include_fields | String | 否 | / | 配置字段白名单。匹配 include_fields 的表数据将被拉取,其余数据被丢弃。单个字段名和值之间使用半角冒号(:)连接,多个字段之间使用半角逗号(,)分隔,例如 |
exclude_fields | String | 否 | / | 配置字段黑名单,匹配 exclude_fields 的表数据将被丢弃,其余数据被拉取。单个字段名和值使用冒号(:)连接,多个字段之间使用逗号(,)分隔,例如 说明 如果您同时设置了 include_fields 和 exclude_fields 参数,系统优先匹配 include_fields 参数,再匹配 exclude_fields 参数。 |
primary_keys | String | 否 | / | 主键,配置该参数后,数据库表中的数据被拉取后将以
|
update_time_key | String | 否 | / | 用于增量获取数据。例如 说明 数据加工基于该时间字段做数据筛选,实现增量更新。请确保已在数据表中对该字段做索引,否则将进行全表扫描,极大影响性能。 |
refresh_interval | Int | 否 | 0 | 从 MySQL 数据库拉取数据的时间间隔,单位:秒。默认值为 0,表示仅全量拉取一次。 |
base_retry_back_off | Int | 否 | 1 | 拉取数据失败后重新拉取的时间间隔,默认值为 1,单位:秒。 |
max_retry_back_off | Int | 否 | 60 | 拉取数据失败后重试请求的最大时间间隔。默认值为 60,单位:秒,建议使用默认值。 |
res_rds_mysql( host="192.168.**.**", port="3306", username="test_username", password="****", database="test_sql", vpc="vpc-xxxxx", table="table_tls", refresh_interval=300 )
res_rds_mysql( host="192.168.**.**", port="3306", username="test_username", password="****", database="test_sql", vpc="vpc-xxxxx", table="table_tls", refresh_interval=300, include_fields='size:400', exclude_fields='method:put' )
场景:使用主键模式从火山引擎 MySQL 数据库 test_sql 中的 table_tls 表中拉取数据。
加工规则:
f_table_map( res_rds_mysql( address="192.168.**.**", username="test_username", password="****", database="test_sql", vpc="vpc-xxxxx", table="table_tls", primary_keys="id", ), "id", ["name", "desc"], )
数据库数据
id | name | desc |
---|---|---|
1 | zhangsan | Man out of law |
日志样例:
[ { "id":1, "age":10 }, { "id":2, "age":20 } ]
加工结果:
[ { "id":1, "age":10, "name":"zhangsan", "desc":"Man out of law" }, { "id":2, "age":20 } ]
场景:使用增量更新模式从火山引擎 MySQL 数据库 test_sql 中的 table_tls 表中拉取数据。
注意
使用增量更新模式拉取数据时,需满足如下条件:
加工规则:
f_table_map( res_rds_mysql( address="192.168.**.**", username="test_username", password="****", database="test_sql", vpc="vpc-xxxxx", table="table_tls", primary_keys="id", update_time_key="update_time", refresh_interval=10 ), "id", ["name", "desc"], )
数据库数据
id | name | desc | update_time |
---|---|---|---|
1 | zhangsan | Man out of law | 2024-07-08 14:23:14 |
2 | lisi | Hey man, what can i say! | 2024-07-08 14:23:15 |
日志样例:
[ { "id":1, "age":10 }, { "id":2, "age":20 } ]
加工结果:
[ { "id":1, "age":10, "name":"zhangsan", "desc":"Man out of law" }, { "id":2, "age":20, "name":"lisi", "desc":"Hey man, what can i say!" } ]