You need to enable JavaScript to run this app.
导航
基础使用
最近更新时间:2024.01.29 19:50:11首次发布时间:2022.10.10 11:24:48

Trino 支持多种操作模式,可以通过 Trino Cli、JDBC、HUE,以及 Airflow 等方式,连接至 Trino 进行 SQL 查询分析。

说明

EMR Trino 默认启用 LDAP 认证,因此您在操作 Trino 时需要携带必要的认证信息。如果您需要关闭 LDAP 认证(不推荐),可以在控制台“集群详情 > 服务列表 > Trino > 服务参数” 页面将 trino.ldap-enabled 配置项设置为 false,并重启 Trino 服务即可。

1 使用前提

  1. 已创建 E-MapReduce(EMR)集群,并包含 Trino、Hue、Airflow 组件服务。详见创建集群

  2. 若 Trino web ui 访问链接不能点击,请检查 Trino 所在 ECS 实例是否绑定弹性公网IP,详见访问链接

  3. 需要在集群详情 > 访问链接 > 配置服务端口中,给源地址和对应端口添加白名单才可继续访问。

2 Trino Cli 使用

如果希望通过 Trino Cli 访问操作 Trino,您需要先登录到目标集群,然后执行如下命令进入 Trino 交互终端:
集群登录操作详见:登录集群

$ trino --user <username> --password

说明

  • 命令行传递的 username 和 password 参数来自您在控制台用户管理页面导入或手动添加的用户,如果是无 LDAP 认证模式可以不予传递。

EMR Trino 默认会从配置目录读取 default-cli.properties 配置文件,并使用该配置文件内容填充命令行缺失的参数。当然,您也可以主动在命令行中进行参数设置,在命令行中指定的参数优先级高于 default-cli.properties 配置文件。

在完成登录到 Trino Cli 交互终端后,接下来您可以在终端中输入需要执行的 SQL 语句执行查询任务,例如:

trino> select * from tpch.sf1.nation;
 nationkey |      name      | regionkey |                                                      comment
-----------+----------------+-----------+--------------------------------------------------------------------------------------------------------------------
         0 | ALGERIA        |         0 |  haggle. carefully final deposits detect slyly agai
         1 | ARGENTINA      |         1 | al foxes promise slyly according to the regular accounts. bold requests alon
         2 | BRAZIL         |         1 | y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
         3 | CANADA         |         1 | eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
         4 | EGYPT          |         4 | y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
         5 | ETHIOPIA       |         0 | ven packages wake quickly. regu
         6 | FRANCE         |         3 | refully final requests. regular, ironi
         7 | GERMANY        |         3 | l platelets. regular accounts x-ray: unusual, regular acco
         8 | INDIA          |         2 | ss excuses cajole slyly across the packages. deposits print aroun
         9 | INDONESIA      |         2 |  slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull
        10 | IRAN           |         4 | efully alongside of the slyly final dependencies.
        11 | IRAQ           |         4 | nic deposits boost atop the quickly final requests? quickly regula
        12 | JAPAN          |         2 | ously. final, express gifts cajole a
        13 | JORDAN         |         4 | ic deposits are blithely about the carefully regular pa
        14 | KENYA          |         0 |  pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t
        15 | MOROCCO        |         0 | rns. blithely bold courts among the closely regular packages use furiously bold platelets?
        16 | MOZAMBIQUE     |         0 | s. ironic, unusual asymptotes wake blithely r
        17 | PERU           |         1 | platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun
        18 | CHINA          |         2 | c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos
        19 | ROMANIA        |         3 | ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account
        20 | SAUDI ARABIA   |         4 | ts. silent requests haggle. closely express packages sleep across the blithely
        21 | VIETNAM        |         2 | hely enticingly express accounts. even, final
        22 | RUSSIA         |         3 |  requests against the platelets use never according to the quickly regular pint
        23 | UNITED KINGDOM |         3 | eans boost carefully special requests. accounts are. carefull
        24 | UNITED STATES  |         1 | y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be
(25 rows)

Query 085720_00001_abdsb, FINISHED, 2 nodes
Splits: 16 total, 16 done (100.00%)
1.65 [25 rows, 0B] [15 rows/s, 0B/s]

3 JDBC 访问 Trino

使用 JDBC 方式操作 Trino 需要先引入 trino-jdbc 依赖,您可以通过 maven 方式下载公共版本(如下),也可以获取由 EMR 编译生成的 trino-jdbc 依赖包(位于集群 /usr/lib/emr/current/trino 目录)。

<dependency>
    <groupId>io.trino</groupId>
    <artifactId>trino-jdbc</artifactId>
    <version>${latest_version}</version>
</dependency>

3.1 使用编程方式

以 Java 语言为例,如果您需要在您的程序代码中使用 JDBC 访问 Trino,可以参考如下示例程序,区分是否启用 LDAP 认证。

  • 启用 LDAP 认证
Properties props = new Properties();

// username 和 password 来自在用户管理中导入或创建的用户
props.setProperty("user", "<username>");
props.setProperty("password", "<password>");
props.setProperty("SSL", "true");
// keystore 路径,对应 http-server.https.keystore.path 配置项
props.setProperty("SSLKeyStorePath", "<keystore_path>");
// keystore 密码,对应 http-server.https.keystore.key 配置项
props.setProperty("SSLKeyStorePassword", "<keystore_password>");

/*
 * 连接地址模板为 jdbc:trino://<host>:<port>,其中:
 * - port 在启用 LDAP 认证时为 9085,未启用 LDAP 认证时为 9084;
 * - host 为 Coordinator 所在的主机名,例如 master-1-1.emr-e9193fb05ae1e3477d99.cn-beijing.emr-volces.com
 */
String url = "jdbc:trino://<coordinator_hostname>:9085";
try (Connection connection = DriverManager.getConnection(url, props);
     Statement statement = connection.createStatement()) {
    ResultSet resultSet = statement.executeQuery("select * from tpch.sf1.nation");
    // ...
}
  • 关闭 LDAP 认证
Properties props = new Properties();

// username 来自在用户管理中导入或创建的用户
props.setProperty("user", "<username>");

/*
 * 连接地址模板为 jdbc:trino://<host>:<port>,其中:
 * - port 在启用 LDAP 认证时为 9085,未启用 LDAP 认证时为 9084;
 * - host 为 Coordinator 所在的主机名,例如 master-1-1.emr-e9193fb05ae1e3477d99.cn-beijing.emr-volces.com
 */
String url = "jdbc:trino://<coordinator_hostname>:9084";
try (Connection connection = DriverManager.getConnection(url, props);
     Statement statement = connection.createStatement()) {
    ResultSet resultSet = statement.executeQuery("select * from tpch.sf1.nation");
    // ...
}

上述程序编译成 jar 包之后可以在您的 EMR 集群上运行。如果您是在 EMR 集群之外运行则需要注意:

  • 您需要将集群上的 keystore 文件下载到您程序运行所在的位置,并修改 keystore_path 参数指向您本地的 keystore 文件。

  • 您需要保证程序运行所在节点与 EMR 集群 Trino Coordinator 节点的连通性,包括 host 连通性和 port 连通性。

3.2 使用数据库管理工具

通过 JDBC 方式也支持您在本地使用数据库管理工具连接访问 Trino,本小节以 DBeaver 为例介绍如何配置 Trino 数据库连接。本地访问 EMR 集群 Trino 服务通常需要您为 Trino Coordinator 所在节点绑定公网 IP,同时区分是否启用了 LDAP 认证。

  • 启动 LDAP 认证
  1. 通过在控制台侧“集群详情 - 节点管理”查看获取 Trino Coordinator 所在节点的 DNS 信息,例如 master-1-1.emr-``e9193fb05ae1e3477d99``.cn-beijing.emr-volces.com

  2. 确认本地对于 Trino 服务端口 9085 的连通性,如果 Trino Coordinator 所在节点未对本地出网 IP 开放端口,可以在 ECS 安全组中进行配置;

  3. 将集群上 Trino 的 keystore 文件复制到本地,集群上的 keystore 文件位置可以通过控制台侧 Trino 服务参数中 http-server.https.keystore.path 配置项获取;

  4. 在 DBeaver 中配置 Trino 数据库连接,如下图所示:


除了填写基本的主机、端口、用户名,以及密码信息外,还需要编辑驱动属性,添加如下配置项:

参数说明
SSL启用 SSL 连接,值始终设置为 true
SSLKeyStorePath对应下载到本地的 keystore 文件路径
SSLKeyStorePassword对应 keystore 证书密钥,可以通过控制台侧 Trino 服务参数中 http-server.https.keystore.key 配置项获取
  • 关闭 LDAP 认证
  1. 通过在控制台侧“集群详情 - 节点管理”查看获取 Trino Coordinator 所在节点的 DNS 信息,例如 master-1-1.emr-``e9193fb05ae1e3477d99``.cn-beijing.emr-volces.com

  2. 确认本地对于 Trino 服务端口 9084 的连通性,如果 Trino Coordinator 所在节点未对本地出网 IP 开放端口,可以在 ECS 安全组中进行配置;

  3. 在 DBeaver 中配置 Trino 数据库连接,如下图所示:


关闭 LDAP 认证场景下连接 Trino 服务相对要简单很多,只需要配置主机、端口,以及用户名即可。

4 Hue 访问 Trino

您可以通过 Hue 界面提交 Trino SQL 语句进行查询分析,执行 SQL 的用户即为登录 Hue 的用户。EMR Hue 默认会依据 Trino 配置项进行自动化配置,做到开箱即用,但当存在以下情况时需要您在控制台手动重启 Hue 服务以感知 Trino 配置项的变化:

  • Trino 配置项发生变更,例如在控制台修改了 Trino 的配置参数。

  • Trino 安装顺序在 Hue 之后,因为 Trino 是可选服务,手动添加安装时无法保证二者的安装顺序。

当您在登录并进入 Hue 交互页面后,可以按照如下操作步骤运行 Trino SQL 查询:

  1. 在 Hue Editor 界面选择 Trino;

  2. 在 SQL 编辑器中输入 Trino SQL 语句,并提交运行;

  3. 查看运行结果。

5 Airflow 访问 Trino

通过 Airflow 操作 Trino 需要您事先编辑好 DAG 文件,并上传至 Airflow 的 DAGs 目录中,具体可以参考 EMR Airflow 使用说明。需要注意的是:

  • EMR Airflow 已为 Trino 做了自动化集成,添加了 ID 为 trino 的 connection 配置,您编写的 Trino DAG 脚本中需指定 conn_id=trino

  • 如果您变更了 Trino 的服务配置,则需要重启 Airflow WebServer 以感知 Trino 变更的配置项。

Trino DAG 脚本示例:

from datetime import timedelta
import airflow
from airflow.providers.trino.hooks.trino import TrinoHook
from airflow.operators.python_operator import PythonOperator

default_args = {'owner': 'airflow',
                'depends_on_past': False,
                'start_date': airflow.utils.dates.days_ago(0),
                'email': ['airflow@example.com'],
                'email_on_failure': True,
                'email_on_retry': False,
                'retries': 3,
                'retry_delay': timedelta(minutes=15)
                }

dag = airflow.DAG('trino-demo',
                  default_args=default_args,
                  description='Trino Demo',
                  schedule_interval=timedelta(minutes=30)
                  )

ph = TrinoHook(trino_conn_id='trino',
               catalog='hive',
               schema='default',
               port=8080
               )


def trino_drop_table(**context):
    sql = """DROP TABLE IF EXISTS tb_airflow_demo"""
    data = ph.get_records(sql)
    return True


def trino_create_table(**context):
    sql = """CREATE TABLE IF NOT EXISTS tb_airflow_demo(id int, name varchar)"""
    data = ph.get_records(sql)
    return True


def trino_insert_data(**context):
    sql = """INSERT INTO tb_airflow_demo SELECT 1, 'name1' """
    data = ph.get_records(sql)
    return True


def trino_select_data(**context):
    sql = """SELECT * FROM tb_airflow_demo LIMIT 20"""
    data = ph.get_records(sql)
    return True


# trino_drop_table_task = PythonOperator(task_id='trino_drop_table',provide_context=True,python_callable=trino_drop_table,dag=dag)
trino_create_table_task = PythonOperator(
    task_id='trino_create_table',
    provide_context=True,
    python_callable=trino_create_table,
    dag=dag
)

trino_insert_data_task = PythonOperator(
    task_id='trino_insert_data',
    provide_context=True,
    python_callable=trino_insert_data,
    dag=dag
)

trino_select_data_task = PythonOperator(
    task_id='trino_select_data',
    provide_context=True,
    python_callable=trino_select_data,
    dag=dag
)

trino_create_table_task >> trino_insert_data_task >> trino_select_data_task

if __name__ == "__main__":
    dag.cli()

将上述脚本上传到 Airflow 的 DAGs 目录下之后,您可以在 Airflow 的管理页面看到对应的 DAG 任务,如下图所示: