ByteHouse 云数仓版支持用户定义函数(UDF,User Defined Functions),可以通过调用任何外部可执行程序或脚本来处理数据。
ByteHouse 云数仓版支持以下类型的UDF:
用户可以通过一个 Lambda 表达式来创建用户自定义函数。该表达式必须由函数参数、常数、运算符或其他函数调用组成。
CREATE FUNCTION name AS (parameter0, ...) -> expression
函数的参数数量没有限制。
限制条件:
如果 UDF 不满足上述任何限制,就可能会产生异常。
创建和使用 UDF 时,需要先创建并指定一个数据库。
CREATE DATABASE myudf; USE myudf;
示例1:
CREATE FUNCTION linear_equation AS (x, k, b) -> k*x + b; SELECT number, linear_equation(number, 2, 1) FROM numbers(3);
结果1:
┌─number─┬─plus(multiply(2, number), 1)─┐ │ 0 │ 1 │ │ 1 │ 3 │ │ 2 │ 5 │ └────────┴──────────────────────────────┘
示例2:
在下面的查询中,条件函数 if 在用户自定义函数中被调用:
CREATE FUNCTION parity_str AS (n) -> if(n % 2, 'odd', 'even'); SELECT number, parity_str(number) FROM numbers(3);
结果2:
┌─number─┬─if(modulo(number, 2), 'odd', 'even')─┐ │ 0 │ even │ │ 1 │ odd │ │ 2 │ even │ └────────┴──────────────────────────────────────┘
Python UDF 是通过 ByteHouse来调用的 python 脚本。Python UDF包含一个Python 程序,该程序在调用函数时运行并返回单个值。
当查询调用Python UDF时,运行时会发生以下步骤:
SQL类型 | Python类型 | 笔记 |
---|---|---|
UInt8 | NPY_UINT8 | |
UInt16 | NPY_UINT16 | |
UInt32 | NPY_UINT32 | |
UInt64 | NPY_UINT64 | |
Int8 | NPY_INT8 | |
Int16 | NPY_INT16 | |
Int32 | NPY_INT32 | |
Int64 | NPY_INT64 | |
Float32 | NPY_FLOAT32 | |
Float64 | NPY_FLOAT64 | |
Decimal32 | Cast to Float | |
Decimal64 | Cast to Float | |
UUID | NPY_STRING | 固定长度16字节 |
Date | NPY_UINT16 | https://numpy.org/doc/stable/reference/arrays.datetime.html |
DateTime | NPY_UINT32 | |
String | NPY_STRING | |
FixedString | NPY_STRING | |
Nullable | masked array | https://numpy.org/doc/stable/reference/maskedarray.html |
Array | ndarray | 只支持 Array 数组输入。https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html |
**日期类型 Date:**以两个字节存储自 1970-01-01 以来的天数(无符号)。 支持的值范围:[1970-01-01, 2148-12-31]。
**日期时间类型 DateTime:**以四个字节存储自 1970-01-01 00:00:00(无符号)以来的秒数。 支持的值范围:[1970-01-01 00:00:00, 2105-12-31 23:59:59]。
agate==1.6.3 aiohttp==3.8.3 aiosignal==1.3.1 async-timeout==4.0.2 asynctest==0.13.0 attrs==22.1.0 Babel==2.11.0 base58==2.1.1 bitarray==2.6.0 certifi==2022.6.15 cffi==1.15.1 charset-normalizer==2.1.0 click==8.1.3 clickhouse-connect==0.4.0 clickhouse-driver==0.2.4 colorama==0.4.5 cytoolz==0.12.0 dbt-clickhouse==1.3.0 dbt-core==1.3.0 dbt-extractor==0.4.1 eth-abi==3.0.1 eth-account==0.7.0 eth-hash==0.3.3 eth-keyfile==0.6.0 eth-keys==0.4.0 eth-rlp==0.3.0 eth-typing==3.2.0 eth-utils==2.0.0 ethereum-abi-utils==0.4.7 ethereum-tester==0.1.0b3 ethereum-utils==0.6.2 frozenlist==1.3.3 future==0.18.2 hexbytes==0.3.0 hologram==0.0.15 idna==3.3 importlib-metadata==5.0.0 ipfshttpclient==0.7.0 isodate==0.6.1 Jinja2==3.1.2 jsonschema==3.2.0 leather==0.3.4 llvmlite==0.39.1 Logbook==1.5.3 lru-dict==1.1.8 MarkupSafe==2.1.1 mashumaro==3.0.4 minimal-snowplow-tracker==0.0.2 msgpack==1.0.4 multiaddr==0.0.9 multidict==6.0.2 netaddr==0.8.0 networkx==2.8.8 numba==0.56.4 numpy==1.23.2 packaging==21.3 pandas==1.5.1 parsedatetime==2.4 parsimonious==0.8.1 pathspec==0.9.0 protobuf==4.21.9 pycparser==2.21 pycryptodome==3.15.0 pylru==1.2.1 pyparsing==3.0.9 pyrsistent==0.19.2 pysha3==1.0.2 python-dateutil==2.8.2 python-slugify==6.1.2 pytimeparse==1.1.8 pytz==2022.6 pytz-deprecation-shim==0.1.0.post0 PyYAML==6.0 requests==2.28.1 rlp==3.0.0 scipy==1.9.3 six==1.16.0 sqlparse==0.4.3 text-unidecode==1.3 toolz==0.12.0 typing_extensions==4.4.0 tzdata==2022.6 tzlocal==4.2 urllib3==1.26.11 varint==1.0.2 web3==3.16.3 websockets==10.4 Werkzeug==2.2.2 yarl==1.8.1 zipp==3.10.0
CREATE FUNCTION function_name [TYPE udf_type] [RETURNS data_type] [FLAGS flags] LANGUAGE PYTHON AS $custom_tag$ class function_name(): def entry(self, output, inputs): expression $custom_tag$ # udf_type: SCALAR (default), AGGREGATE (Not supported for current release) # data_type: use the type of the first input column if not set, check 'Data Types' section above for all supported types
在$$符号之间,您可以使用任何自定义标签或将其留空,只需确保使用相同的标签再次关闭它。(参考 PostgreSQL 4.1.2.4)
例如,这里有两种不同的方法可以指定字符串“Dianne's horse”:
$$Dianne's horse$$ $SomeTag$Dianne's horse$SomeTag$
$tag$中的所有内容都被视为python代码。在创建Python UDF期间不会进行python语法检查。
DROP FUNCTION [IF EXISTS] function_name;
IF EXISTS
是可选的,如果函数不存在,它可以防止发生错误。在正常情况下,如果函数不存在并且您尝试删除该函数,系统将会报错。查看所有UDF函数。
SHOW functions; -- 查看所有UDF -- DESCRIBE FUNCTION udf_name; --- 查看指定UDF
查看特定UDF的定义。
SHOW CREATE FUNCTION udf_name;
UDF与ByteHouse中的任何其他函数一样工作,但它需要您指定数据库名才能访问它。
例如,要使用计算圆面积的UDF:
drop function if exists test.area_of_circle; CREATE FUNCTION test.area_of_circle RETURNS Float64 LANGUAGE PYTHON AS $pikachu$ from iudf import IUDF from overload import overload from math import pi class area_of_circle(IUDF): @overload def process(radius): return radius*radius*pi $pikachu$;
此 UDF 示例将添加此表中每一行的值并输出结果。
在创建Python UDF之前,您必须指定数据库名称和表。
CREATE DATABASE test; CREATE TABLE test.test_udf_int ( a Int64, b Int64, c Int64, d Int64 ) ENGINE = CnchMergeTree() PRIMARY KEY a ORDER BY a; INSERT INTO test.test_udf_int(a, b, c, d) VALUES (0, 0, 0, 0 ) (1, 1, 1, 1) (2, 2, 2, 2);
接下来,使用 CREATE FUNCTION 创建 UDF 函数。 这是如何用Python编写入口函数的示例,并使用[code]作为分隔符。
由于ByteHouse是一个基于列的数据库,但通常您希望对每一行进行计算。 在此示例中,入口函数使用迭代器循环遍历每一行,并对每行中的四个输入值求和以获得输出值。
CREATE FUNCTION f_plus_int RETURNS Int64 LANGUAGE PYTHON AS $code$ import numpy as np class f_plus_int: def entry(self, output, inputs): nargs = len(inputs) it = np.nditer([output] + inputs, ['refs_ok', 'buffered'], [['writeonly']] + [['readonly']]*nargs) with it: for out, *ins in it: out[...] = self.process(*[x.item() for x in ins]) def process(self, x1, y1, x2, y2): return x1 + y2 + x2 + y2 $code$;
或者,您可以使用 Python Numpy 提供的完全基于列的方法。 它执行与 f_plus_int
相同的操作。
注意:使用基于列的方法的性能优于使用循环。
CREATE FUNCTION f_plus_int1 RETURNS Int64 LANGUAGE PYTHON AS $code$ import numpy as np class f_plus_int: def entry(self, output, inputs): np.add.reduce(inputs, out=output) $code$;
Python UDF函数创建后,就可以像一般内置函数一样调用。
select f_plus_int(a,b,c,d) from test.test_udf_int;
┌─f_plus_int(a, b, c, d)─┐ │ 0 │ │ 4 │ │ 8 │ └─────────────────────────────┘
drop function f_plus_int;
CREATE TABLE test.py_fixedstring ( a FixedString(2), b FixedString(3) ) ENGINE = CnchMergeTree() ORDER BY b; INSERT INTO test.py_fixedstring(a, b) VALUES ('aa', 'aaa') ('bb', 'bbb') ('cc', 'ccc') ('dd', 'ddd') ('ee', 'eee') ('ff', 'fff') ('gg', 'ggg') ('ab', 'abc') ('cb', 'cba') ('tt', 'zzz');
对于像FixedString
这样的返回类型,您必须指定返回字符串的长度。
CREATE FUNCTION py_fixed_string RETURNS FixedString(5) LANGUAGE PYTHON AS $code$ import numpy as np class py_fixed_string: def entry(self, output, inputs): nargs = len(inputs) it = np.nditer([output] + inputs, ['refs_ok', 'buffered'], [['writeonly']] + [['readonly']]*nargs) with it: for out, *ins in it: out[...] = self.process(*[x.item() for x in ins]) def process(self, a, b): return a + b $code$;
select py_fixed_string(a, b) from test.py_fixedstring;
┌─py_fixed_string(a, b)─┐ │ aaaaa │ │ ababc │ │ bbbbb │ │ cbcba │ │ ccccc │ │ ddddd │ │ eeeee │ │ fffff │ │ ggggg │ │ ttzzz │ └────────────────────────────┘
drop function py_fixedstring;
此外,ByteHouse提供了overload.py,它允许您在单个创建函数语句中创建多个UDF。 每个函数可以接受不同的参数作为输入,如下所示:
CREATE FUNCTION f_plus_int RETURNS Date LANGUAGE PYTHON AS $code$ from iudf import IUDF from overload import overload class f_plus_int(IUDF): @overload def process(a): return a + 1 @overload def process(a, b): return a + b $code$;
说明
@overload 是包装函数并返回 Function 类型的可调用对象的装饰器。 如果使用这个入口函数,则必须在函数中使用装饰器@overload。 需要从入口处调用,以避免参数号不匹配的错误。