You need to enable JavaScript to run this app.
导航
资源函数
最近更新时间:2024.08.02 11:18:48首次发布时间:2024.08.02 11:18:48

本文档介绍数据加工资源函数的语法格式和典型示例。

res_rds_mysql 函数

注意

  • 目前 res_rds_mysql 函数处于内测阶段,如需使用请联系客户经理。
  • 数据拉取的上限为 20 万条,超过 20 万的数据将会被截断。

res_rds_mysql 函数用于从火山引擎云数据库 MySQL 版和自建的公网 MySQL 数据库中拉取表数据。支持如下三种拉取方式:

拉取模式

说明

普通拉取模式

首次执行加工任务时,日志服务将拉取全量的表数据,后续不再拉取。如果您数据库中的数据不持续更新,您可选择普通拉取模式。

全量更新模式

在执行加工任务过程中,日志服务将定期拉取一次全量的表数据。每次全量拉取,非常耗时。如果您数据库中的数据量较小(数据量<= 2GB)、更新不频繁(配置 refresh_interval >=300s),您可选择全量更新模式。

增量更新模式

在执行加工任务过程中,日志服务将定期根据数据库表中的时间戳字段拉取新增的数据。在增量更新模式下,日志服务每次只拉取新增的数据,效率高。您还可以配置 refresh_interval 为 1 0秒,实现秒级别同步数据库表中的数据,即时性高。如果您数据库表中的数据量大、更新频繁且您对数据实时性要求高,您可选择增量更新模式。

函数语法/格式

res_rds_mysql(host="数据库地址",port="数据库的端口", username="数据库的用户名", password="数据库的密码", database="数据库的名称", table="表名", vpc=None, fields=None, include_fields=None, exclude_fields=None,primary_keys=None, refresh_interval=0, update_time_key=None,base_retry_back_off=1, max_retry_back_off=60)

参数说明如下:

参数

参数类型

是否必选

默认值

说明

host

String

/

MySQL 实例域名,目前支持火山引擎云数据库 MySQL 版和自建的公网数据库。
使用火山引擎云数据库 MySQL 版时,需设置 host 为 MySQL 实例所属 VPC 的 IP 地址。您可以在目标 VPC 的概览页面查看其 IP 地址。详细操作,请参考查看私有网络详情

port

String

/

数据库访问端口。

username

String

/

数据库访问用户名。

password

String

/

数据库访问密码。

database

String

/

数据库名称。

table

String

/

数据库表名。

vpc

String

/

使用火山引擎云数据库 MySQL 版时,需配置 MySQL 实例所使用的 VPC ID,例如 vpc-2byx****

fields

String

全部字段

待拉取的字符串列表或者字符串别名列表。如果不填,默认使用 table 参数所返回的所有列。指定多个字段时需要使用半角逗号(,)分隔,例如field1,field2

include_fields

String

/

配置字段白名单。匹配 include_fields 的表数据将被拉取,其余数据被丢弃。单个字段名和值之间使用半角冒号(:)连接,多个字段之间使用半角逗号(,)分隔,例如 field1:value1,field2:value2

exclude_fields

String

/

配置字段黑名单,匹配 exclude_fields 的表数据将被丢弃,其余数据被拉取。单个字段名和值使用冒号(:)连接,多个字段之间使用逗号(,)分隔,例如 field1:value1,field2:value2

说明

如果您同时设置了 include_fields 和 exclude_fields 参数,系统优先匹配 include_fields 参数,再匹配 exclude_fields 参数。

primary_keys

String

/

主键,配置该参数后,数据库表中的数据被拉取后将以 Key:Value **** 字典形式保存到内存中,其中 Keyprimary_keys 参数的值,Value 为拉取到的数据库表的整行数据。

  • 数据库表中的数据量较大时,推荐使用该参数,提升效率。
  • primary_keys 参数的值必须存在于从数据库表拉取的字段中。

update_time_key

String

/

用于增量获取数据。例如 update_time_key="update_time" ,其中 update_time 为 MySQL 数据库中数据更新的时间字段,该时间字段支持 Datetime、Timestamp 数据类型。请保证该时间字段的值随时间递增。
如果不配置此参数,日志服务将执行全量更新。

说明

数据加工基于该时间字段做数据筛选,实现增量更新。请确保已在数据表中对该字段做索引,否则将进行全表扫描,极大影响性能。

refresh_interval

Int

0

从 MySQL 数据库拉取数据的时间间隔,单位:秒。默认值为 0,表示仅全量拉取一次。

base_retry_back_off

Int

1

拉取数据失败后重新拉取的时间间隔,默认值为 1,单位:秒。

max_retry_back_off

Int

60

拉取数据失败后重试请求的最大时间间隔。默认值为 60,单位:秒,建议使用默认值。

函数示例

示例 1:每隔 300 秒全量拉取一次数据

  • 场景:从火山引擎 MySQL 数据库 test_sql 中的 table_tls 表中每隔 300 秒拉取一次全量表数据。
  • 加工规则:
    res_rds_mysql(
        host="192.168.**.**",
        port="3306",
        username="test_username",
        password="****",
        database="test_sql",
        vpc="vpc-xxxxx",    
        table="table_tls",
        refresh_interval=300
    )
    

示例 2 :指定黑白名单进行数据拉取

  • 场景:从火山引擎 MySQL 数据库 test_sql 中的 table_tls 表中拉取数据,且指定只拉取 size 字段值是 400,method 字段值不是 put 的数据。
  • 加工规则:
    res_rds_mysql(
        host="192.168.**.**",
        port="3306",
        username="test_username",
        password="****",
        database="test_sql",
        vpc="vpc-xxxxx",    
        table="table_tls",
        refresh_interval=300,
        include_fields='size:400',
        exclude_fields='method:put'
    )
    

示例 3:使用主键模式拉取数据

  • 场景:使用主键模式从火山引擎 MySQL 数据库 test_sql 中的 table_tls 表中拉取数据。

  • 加工规则:

    f_table_map(
        res_rds_mysql(
            address="192.168.**.**",
            username="test_username",
            password="****",
            database="test_sql",
            vpc="vpc-xxxxx",    
            table="table_tls",
            primary_keys="id",
        ),
        "id",
        ["name", "desc"],
    )
    
  • 数据库数据

    id

    name

    desc

    1

    zhangsan

    Man out of law

  • 日志样例:

    [
    {
        "id":1,
        "age":10
    },
    {
        "id":2,
        "age":20
    }
    ]
    
  • 加工结果:

    [
    {
        "id":1,
        "age":10,
        "name":"zhangsan",
        "desc":"Man out of law"    
    },
    {
        "id":2,
        "age":20
    }
    ]
    

示例 4:使用增量更新模式拉取数据

  • 场景:使用增量更新模式从火山引擎 MySQL 数据库 test_sql 中的 table_tls 表中拉取数据。

    注意

    使用增量更新模式拉取数据时,需满足如下条件:

    • 数据库表中必须有唯一主键并且有一个时间字段,例如示例中的 idupdate_time
    • 使用增量更新模式拉取数据时,必须在函数中设置 primary_keysrefresh_intervalupdate_time_key 参数。
  • 加工规则:

    f_table_map(
        res_rds_mysql(
            address="192.168.**.**",
            username="test_username",
            password="****",
            database="test_sql",
            vpc="vpc-xxxxx",    
            table="table_tls",
            primary_keys="id",
            update_time_key="update_time",
            refresh_interval=10    
        ),
        "id",
        ["name", "desc"],
    )
    
  • 数据库数据

    id

    name

    desc

    update_time

    1

    zhangsan

    Man out of law

    2024-07-08 14:23:14

    2

    lisi

    Hey man, what can i say!

    2024-07-08 14:23:15

  • 日志样例:

    [
    {
        "id":1,
        "age":10
    },
    {
        "id":2,
        "age":20
    }
    ]
    
  • 加工结果:

    [
    {
        "id":1,
        "age":10,
        "name":"zhangsan",
        "desc":"Man out of law"    
    },
    {
        "id":2,
        "age":20,
        "name":"lisi",
        "desc":"Hey man, what can i say!"       
    }
    ]