You need to enable JavaScript to run this app.
导航
用户定义函数 UDF
最近更新时间:2024.04.12 15:28:06首次发布时间:2024.04.12 15:28:06

ByteHouse 云数仓版支持用户定义函数(UDF,User Defined Functions),可以通过调用任何外部可执行程序或脚本来处理数据。
ByteHouse 云数仓版支持以下类型的UDF:

  1. Lambda UDF:用户定义的Lambda函数
  2. Python UDF:用Python语言定义函数并返回结果

Lambda UDF

用户可以通过一个 Lambda 表达式来创建用户自定义函数。该表达式必须由函数参数、常数、运算符或其他函数调用组成。

语法

CREATE FUNCTION name AS (parameter0, ...) -> expression

限制

函数的参数数量没有限制。
限制条件:

  • 函数名必须是具有唯一性,不得与现有的用户自定义函数和系统函数重名。
  • 不允许定义递归函数。
  • 函数所使用的所有变量必须在其参数列表中指定。

如果 UDF 不满足上述任何限制,就可能会产生异常。

前提条件

创建和使用 UDF 时,需要先创建并指定一个数据库。

CREATE DATABASE myudf;
USE myudf;

使用示例

示例1:

CREATE FUNCTION linear_equation AS (x, k, b) -> k*x + b;
SELECT number, linear_equation(number, 2, 1) FROM numbers(3); 

结果1:

┌─number─┬─plus(multiply(2, number), 1)─┐
│      0 │                            1 │
│      1 │                            3 │
│      2 │                            5 │
└────────┴──────────────────────────────┘

示例2:
在下面的查询中,条件函数 if 在用户自定义函数中被调用:

CREATE FUNCTION parity_str AS (n) -> if(n % 2, 'odd', 'even');
SELECT number, parity_str(number) FROM numbers(3); 

结果2:

┌─number─┬─if(modulo(number, 2), 'odd', 'even')─┐
│      0 │ even                                 │
│      1 │ odd                                  │
│      2 │ even                                 │
└────────┴──────────────────────────────────────┘

Python UDF

简介

Python UDF 是通过 ByteHouse来调用的 python 脚本。Python UDF包含一个Python 程序,该程序在调用函数时运行并返回单个值。
当查询调用Python UDF时,运行时会发生以下步骤:

  1. 该函数将输入参数转换为Python Numpy 数据类型。
  2. 该函数传递转换后的输入参数,然后运行Python程序。
  3. Python代码返回单个值。返回值的数据类型必须对应函数定义指定的RETURNS数据类型。
  4. 该函数将Python的返回值转换为定义的数据类型,然后将该值返回给查询。

数据类型

SQL类型

Python类型

笔记

UInt8

NPY_UINT8

UInt16

NPY_UINT16

UInt32

NPY_UINT32

UInt64

NPY_UINT64

Int8

NPY_INT8

Int16

NPY_INT16

Int32

NPY_INT32

Int64

NPY_INT64

Float32

NPY_FLOAT32

Float64

NPY_FLOAT64

Decimal32

Cast to Float

Decimal64

Cast to Float

UUID

NPY_STRING

固定长度16字节

Date

NPY_UINT16

https://numpy.org/doc/stable/reference/arrays.datetime.html

DateTime

NPY_UINT32

String

NPY_STRING

FixedString

NPY_STRING

Nullable

masked array

https://numpy.org/doc/stable/reference/maskedarray.html

Array

ndarray

只支持 Array 数组输入。https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html

**日期类型 Date:**以两个字节存储自 1970-01-01 以来的天数(无符号)。 支持的值范围:[1970-01-01, 2148-12-31]。
**日期时间类型 DateTime:**以四个字节存储自 1970-01-01 00:00:00(无符号)以来的秒数。 支持的值范围:[1970-01-01 00:00:00, 2105-12-31 23:59:59]。

运行环境

  • Python 3.9
  • 预装依赖如下:
agate==1.6.3
aiohttp==3.8.3
aiosignal==1.3.1
async-timeout==4.0.2
asynctest==0.13.0
attrs==22.1.0
Babel==2.11.0
base58==2.1.1
bitarray==2.6.0
certifi==2022.6.15
cffi==1.15.1
charset-normalizer==2.1.0
click==8.1.3
clickhouse-connect==0.4.0
clickhouse-driver==0.2.4
colorama==0.4.5
cytoolz==0.12.0
dbt-clickhouse==1.3.0
dbt-core==1.3.0
dbt-extractor==0.4.1
eth-abi==3.0.1
eth-account==0.7.0
eth-hash==0.3.3
eth-keyfile==0.6.0
eth-keys==0.4.0
eth-rlp==0.3.0
eth-typing==3.2.0
eth-utils==2.0.0
ethereum-abi-utils==0.4.7
ethereum-tester==0.1.0b3
ethereum-utils==0.6.2
frozenlist==1.3.3
future==0.18.2
hexbytes==0.3.0
hologram==0.0.15
idna==3.3
importlib-metadata==5.0.0
ipfshttpclient==0.7.0
isodate==0.6.1
Jinja2==3.1.2
jsonschema==3.2.0
leather==0.3.4
llvmlite==0.39.1
Logbook==1.5.3
lru-dict==1.1.8
MarkupSafe==2.1.1
mashumaro==3.0.4
minimal-snowplow-tracker==0.0.2
msgpack==1.0.4
multiaddr==0.0.9
multidict==6.0.2
netaddr==0.8.0
networkx==2.8.8
numba==0.56.4
numpy==1.23.2
packaging==21.3
pandas==1.5.1
parsedatetime==2.4
parsimonious==0.8.1
pathspec==0.9.0
protobuf==4.21.9
pycparser==2.21
pycryptodome==3.15.0
pylru==1.2.1
pyparsing==3.0.9
pyrsistent==0.19.2
pysha3==1.0.2
python-dateutil==2.8.2
python-slugify==6.1.2
pytimeparse==1.1.8
pytz==2022.6
pytz-deprecation-shim==0.1.0.post0
PyYAML==6.0
requests==2.28.1
rlp==3.0.0
scipy==1.9.3
six==1.16.0
sqlparse==0.4.3
text-unidecode==1.3
toolz==0.12.0
typing_extensions==4.4.0
tzdata==2022.6
tzlocal==4.2
urllib3==1.26.11
varint==1.0.2
web3==3.16.3
websockets==10.4
Werkzeug==2.2.2
yarl==1.8.1
zipp==3.10.0

语法

创建函数

CREATE FUNCTION function_name [TYPE udf_type] [RETURNS data_type] [FLAGS flags] LANGUAGE PYTHON AS
$custom_tag$

class function_name():
    def entry(self, output, inputs):
        expression
        
$custom_tag$

# udf_type: SCALAR (default), AGGREGATE (Not supported for current release)
# data_type: use the type of the first input column if not set, check 'Data Types' section above for all supported types
  • $custom_tag$

在$$符号之间,您可以使用任何自定义标签或将其留空,只需确保使用相同的标签再次关闭它。(参考 PostgreSQL 4.1.2.4)
例如,这里有两种不同的方法可以指定字符串“Dianne's horse”:

$$Dianne's horse$$
$SomeTag$Dianne's horse$SomeTag$

$tag$中的所有内容都被视为python代码。在创建Python UDF期间不会进行python语法检查。

DROP函数

DROP FUNCTION [IF EXISTS] function_name; 
  • IF EXISTS 是可选的,如果函数不存在,它可以防止发生错误。在正常情况下,如果函数不存在并且您尝试删除该函数,系统将会报错。

SHOW函数

查看所有UDF函数。

SHOW functions;  -- 查看所有UDF
-- DESCRIBE FUNCTION udf_name; --- 查看指定UDF

SHOW CREATE函数

查看特定UDF的定义。

SHOW CREATE FUNCTION udf_name; 

SELECT函数

UDF与ByteHouse中的任何其他函数一样工作,但它需要您指定数据库名才能访问它。
例如,要使用计算圆面积的UDF:

drop function if exists test.area_of_circle;

CREATE FUNCTION test.area_of_circle RETURNS Float64 LANGUAGE PYTHON AS
$pikachu$
from iudf import IUDF
from overload import overload
from math import pi

class area_of_circle(IUDF):
    @overload
    def process(radius):
        return radius*radius*pi
$pikachu$;

使用示例

示例1:

此 UDF 示例将添加此表中每一行的值并输出结果。

  • 创建 UDF 所需的数据库和表

在创建Python UDF之前,您必须指定数据库名称和表。

CREATE DATABASE test;
 
CREATE TABLE test.test_udf_int
(
    a Int64,
    b Int64,
    c Int64,
    d Int64
)
ENGINE = CnchMergeTree()
PRIMARY KEY a
ORDER BY a;

INSERT INTO test.test_udf_int(a, b, c, d)
VALUES (0, 0, 0, 0 ) (1, 1, 1, 1) (2, 2, 2, 2);
  • 创建 UDF

接下来,使用 CREATE FUNCTION 创建 UDF 函数。 这是如何用Python编写入口函数的示例,并使用[code]作为分隔符。
由于ByteHouse是一个基于列的数据库,但通常您希望对每一行进行计算。 在此示例中,入口函数使用迭代器循环遍历每一行,并对每行中的四个输入值求和以获得输出值。

CREATE FUNCTION f_plus_int RETURNS Int64 LANGUAGE PYTHON AS
$code$
import numpy as np

class f_plus_int:
    def entry(self, output, inputs):
        nargs = len(inputs)

        it = np.nditer([output] + inputs,
                ['refs_ok', 'buffered'], [['writeonly']] + [['readonly']]*nargs)

        with it:
            for out, *ins in it:
                out[...] = self.process(*[x.item() for x in ins])
                
    def process(self, x1, y1, x2, y2):
       return x1 + y2 + x2 + y2
       
$code$;

或者,您可以使用 Python Numpy 提供的完全基于列的方法。 它执行与 f_plus_int 相同的操作。
注意:使用基于列的方法的性能优于使用循环。

CREATE FUNCTION f_plus_int1 RETURNS Int64 LANGUAGE PYTHON AS
$code$
import numpy as np

class f_plus_int:
    def entry(self, output, inputs):
        np.add.reduce(inputs, out=output)
$code$;
  • 调用 UDF

Python UDF函数创建后,就可以像一般内置函数一样调用。

select f_plus_int(a,b,c,d) from test.test_udf_int;
  • 返回结果
┌─f_plus_int(a, b, c, d)─┐
│                           0 │
│                           4 │
│                           8 │
└─────────────────────────────┘
  • 删除 UDF
drop function f_plus_int;

示例2:

  • 为UDF创建数据库和表
CREATE TABLE test.py_fixedstring
(
    a FixedString(2),
    b FixedString(3)
) 
ENGINE = CnchMergeTree()
ORDER BY b;

INSERT INTO test.py_fixedstring(a, b)
VALUES ('aa', 'aaa') ('bb', 'bbb') ('cc', 'ccc') ('dd', 'ddd') ('ee', 'eee') ('ff', 'fff') ('gg', 'ggg') ('ab', 'abc') ('cb', 'cba') ('tt', 'zzz');
  • 创建UDF

对于像FixedString 这样的返回类型,您必须指定返回字符串的长度。

CREATE FUNCTION py_fixed_string
RETURNS FixedString(5)
LANGUAGE PYTHON AS
$code$
import numpy as np

class py_fixed_string:
    def entry(self, output, inputs):
        nargs = len(inputs)

        it = np.nditer([output] + inputs,
                ['refs_ok', 'buffered'], [['writeonly']] + [['readonly']]*nargs)

        with it:
            for out, *ins in it:
                out[...] = self.process(*[x.item() for x in ins])

    def process(self, a, b):
        return a + b
$code$; 
  • 调用 UDF
select py_fixed_string(a, b) from test.py_fixedstring; 
  • 返回结果
┌─py_fixed_string(a, b)─┐
│ aaaaa                      │
│ ababc                      │
│ bbbbb                      │
│ cbcba                      │
│ ccccc                      │
│ ddddd                      │
│ eeeee                      │
│ fffff                      │
│ ggggg                      │
│ ttzzz                      │
└────────────────────────────┘
  • 删除 UDF
drop function py_fixedstring; 

过载函数:

此外,ByteHouse提供了overload.py,它允许您在单个创建函数语句中创建多个UDF。 每个函数可以接受不同的参数作为输入,如下所示:

CREATE FUNCTION f_plus_int
RETURNS Date
LANGUAGE PYTHON AS
$code$
from iudf import IUDF
from overload import overload
 
class f_plus_int(IUDF):
    @overload
    def process(a): 
        return a + 1

    @overload
    def process(a, b):
        return a + b
$code$;

说明

@overload 是包装函数并返回 Function 类型的可调用对象的装饰器。 如果使用这个入口函数,则必须在函数中使用装饰器@overload。 需要从入口处调用,以避免参数号不匹配的错误。