StarRocks 作为高性能分析型数据库,支持通过 Spark 读取或写入数据,您可以使用 Spark Connector 连接 Spark 与 StarRocks 实现数据导入与导出。目前 EMR 提供的 Spark Connector 版本支持原生和旁路(Bypass)两种不同的模式读写 StarRocks 数据,区别如下表所示:
模式 | 数据导入 | 数据导出 |
---|---|---|
原生模式 | 将数据在内存中攒批,然后以 StreamLoad 方式导入 StarRocks。 | 通过 RPC 请求 StarRocks 读取数据。 |
Bypass 模式 | 绕过 StarRocks 服务层,直接写 StarRocks 存储在对象存储系统中的数据文件。 | 绕过 StarRocks 服务层,直接读 StarRocks 存储在对象存储系统中的数据文件。 |
Bypass 模式相对于原生模式具备如下优势:
Bypass 模式目前已在 EMR on ECS、EMR on VKE,以及 Serverless Spark 等产品形态中提供开箱即用。
参数 | 必须 | 默认值 | 说明 |
---|---|---|---|
spark.sql.extensions | 是 | 设置为 | |
spark.sql.catalog.starrocks | 是 | 设置为 | |
spark.sql.catalog.starrocks.writer.mode | 是 | 设置为 BYPASS,表示直写 StarRocks 数据文件。 | |
spark.sql.catalog.starrocks.reader.mode | 是 | 设置为 BYPASS,表示直读 StarRocks 数据文件。 | |
spark.sql.catalog.starrocks.fe.http.url | 是 | StarRocks FE 的 HTTP 地址,支持输入多个 FE 地址,使用逗号 | |
spark.sql.catalog.starrocks.fe.jdbc.url | 是 | StarRocks FE 的 MySQL Server 连接地址,格式为 | |
spark.sql.catalog.starrocks.user | 是 | StarRocks 集群账号的用户名。 | |
spark.sql.catalog.starrocks.password | 是 | StarRocks 集群账号的用户密码。 | |
spark.sql.catalog.starrocks.fs.s3a.endpoint | 是 | StarRocks 存算分离集群对应的 TOS 对象存储 S3 访问 endpoint。 | |
spark.sql.catalog.starrocks.fs.s3a.endpoint.region | 是 | StarRocks 存算分离集群对应的 TOS 对象存储 S3 访问 region。 | |
spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled | 是 | 设置为 true | |
spark.sql.catalog.starrocks.fs.s3a.path.style.access | 是 | 设置为 false | |
spark.sql.catalog.starrocks.fs.s3a.access.key | 是 | 访问 TOS 对象存储的静态 AccessKey。 | |
spark.sql.catalog.starrocks.fs.s3a.secret.key | 是 | 访问 TOS 对象存储的静态 SecretKey。 | |
spark.sql.catalog.starrocks.fs.s3a.retry.limit | 否 | 5 | 访问 TOS 对象存储的失败重试次数。 |
spark.sql.catalog.starrocks.batch.size | 否 | 4096 | 执行写入操作时的攒批粒度,适当增大该参数可以优化写入性能。 |
说明
在 EMR on ECS 中使用 Bypass 模式,您需要确保:
启动 Spark SQL 客户端需要设置的参数示例如下:
spark-sql \ --conf spark.sql.extensions="com.starrocks.connector.spark.StarRocksExtensions" \ --conf spark.sql.catalog.starrocks="com.starrocks.connector.spark.catalog.StarRocksCatalog" \ --conf spark.sql.catalog.starrocks.batch.size=8092 \ --conf spark.sql.catalog.starrocks.writer.mode="BYPASS" \ --conf spark.sql.catalog.starrocks.reader.mode="BYPASS" \ --conf spark.sql.catalog.starrocks.fe.http.url="http://192.168.6.7:8030" \ --conf spark.sql.catalog.starrocks.fe.jdbc.url="jdbc:mysql://192.168.6.7:9030" \ --conf spark.sql.catalog.starrocks.user="root" \ --conf spark.sql.catalog.starrocks.password="******" \ --conf spark.sql.catalog.starrocks.fs.s3a.endpoint="https://tos-s3-cn-beijing.ivolces.com" \ --conf spark.sql.catalog.starrocks.fs.s3a.endpoint.region="cn-beijing" \ --conf spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled=true \ --conf spark.sql.catalog.starrocks.fs.s3a.path.style.access=false \ --conf spark.sql.catalog.starrocks.fs.s3a.retry.limit=27 \ --conf spark.sql.catalog.starrocks.fs.s3a.access.key="******" \ --conf spark.sql.catalog.starrocks.fs.s3a.secret.key="******" \ --jars /opt/emr/current/spark/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar
进入 Spark SQL 终端后,您可以切换到 StarRocks Catalog,然后执行查询语句:
USE starrocks.demo; SELECT * FROM example_table;
准备 demo.py
程序示例:
import sys import time from operator import add from pyspark.sql import SparkSession from pyspark.sql.functions import * if __name__ == "__main__": spark = SparkSession\ .builder\ .appName("Bypass Read")\ .config("spark.shuffle.service.enabled", "true")\ .config("spark.sql.extensions", "com.starrocks.connector.spark.StarRocksExtensions")\ .config("spark.sql.catalog.starrocks", "com.starrocks.connector.spark.catalog.StarRocksCatalog")\ .config("spark.sql.catalog.starrocks.fe.http.url", "http://xxx:8030")\ .config("spark.sql.catalog.starrocks.fe.jdbc.url", "jdbc:mysql://xxx:9030")\ .config("spark.sql.catalog.starrocks.user", "root")\ .config("spark.sql.catalog.starrocks.password", "******")\ .config("spark.sql.catalog.starrocks.writer.mode","BYPASS") \ .config("spark.sql.catalog.starrocks.reader.mode","BYPASS") \ .config("spark.sql.catalog.starrocks.fs.s3a.endpoint", "https://tos-s3-cn-beijing.ivolces.com")\ .config("spark.sql.catalog.starrocks.fs.s3a.endpoint.region", "cn-beijing")\ .config("spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled", "true")\ .config("spark.sql.catalog.starrocks.fs.s3a.path.style.access", "false")\ .config("spark.sql.catalog.starrocks.fs.s3a.retry.limit", "27")\ .config("spark.sql.catalog.starrocks.fs.s3a.access.key", "******")\ .config("spark.sql.catalog.starrocks.fs.s3a.secret.key", "******")\ .getOrCreate() spark.sql("use starrocks.demo").show() spark.sql("show tables").show() spark.sql("select count(1) from tb_all_primitivetype_read_duplicate ").show() spark.sql("insert into tb_all_primitivetype_read_duplicate select * from tb_all_primitivetype_read_duplicate limit 10 ").show() spark.sql("select count(1) from tb_all_primitivetype_read_duplicate ").show() spark.stop()
spark-submit \ --conf spark.app.name="bypass-test" \ --jars /opt/emr/current/spark/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar \ --conf "spark.yarn.jars=hdfs://master-1-1.emr-58048cfb501d90d6411d.cn-beijing.emr-volces.com:8020/user/application/label-pipeline/spark-jars-v2/*,hdfs://master-1-1.emr-58048cfb501d90d6411d.cn-beijing.emr-volces.com:8020/user/application/label-pipeline/app-jars/*" \ demo.py
spark-submit \ --deploy-mode cluster \ --conf spark.app.name="bypass-test" \ --conf "spark.yarn.jars=hdfs://master-1-1.emr-58048cfb501d90d6411d.cn-beijing.emr-volces.com:8020/user/application/label-pipeline/spark-jars-v2/*,hdfs://master-1-1.emr-58048cfb501d90d6411d.cn-beijing.emr-volces.com:8020/user/application/label-pipeline/app-jars/*" \ demo.py
说明
在 EMR on VKE 中使用 Bypass 模式,您需要确保:
在掌握了如何向 EMR on VKE 集群提交 Spark 作业之后,您可以修改作业 yaml 文件,添加如下配置以实现通过 Spark 读写 StarRocks 数据文件:
spec: deps: jars: # starrocks-spark-connector jar 文件,VKE 集群镜像已内置 - local:///opt/spark/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar sparkConf: spark.sql.extensions: "com.starrocks.connector.spark.StarRocksExtensions" spark.sql.catalog.starrocks: "com.starrocks.connector.spark.catalog.StarRocksCatalog" # 开启直读直写模式 spark.sql.catalog.starrocks.writer.modeL: "BYPASS" spark.sql.catalog.starrocks.reader.mode: "BYPASS" # 对应 StarRocks 集群的连接配置,即 FE 各节点的内网 IP 地址 spark.sql.catalog.starrocks.fe.http.url: "http://192.168.0.180:8030,192.168.0.181:8030,192.168.0.178:8030" spark.sql.catalog.starrocks.fe.jdbc.url: "jdbc:mysql://192.168.0.180:9030,192.168.0.181:9030,192.168.0.178:9030" # 对应 StarRocks 集群的认证配置 spark.sql.catalog.starrocks.user: "root" spark.sql.catalog.starrocks.password: "******" # 对应 StarRocks 存算分离集群 TOS 连接和认证配置 spark.sql.catalog.starrocks.fs.s3a.endpoint: "https://tos-s3-cn-beijing.ivolces.com" spark.sql.catalog.starrocks.fs.s3a.endpoint.region: "cn-beijing" spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled: "true" spark.sql.catalog.starrocks.fs.s3a.path.style.access: "false" spark.sql.catalog.starrocks.fs.s3a.access.key: "******" spark.sql.catalog.starrocks.fs.s3a.secret.key: "******" # 【推荐】指定加载 so 文件路径,如果不指定则会尝试自解压,有 30s 左右的时间开销 spark.driver.extraJavaOptions: "-Dcom.starrocks.format.jni.lib.path=/opt/starrocks/native" spark.executor.extraJavaOptions: "-Dcom.starrocks.format.jni.lib.path=/opt/starrocks/native" # 【可选】用于设置写入时攒批粒度 spark.sql.catalog.starrocks.batch.size: "8092"
参考 Serverless Spark 读写 StarRocks 操作手册。
参数 | 必须 | 参数值 | 说明 |
---|---|---|---|
starrocks.fe.jdbc.url | 是 | 配置 FE 节点 MySQL 服务器地址,格式为 | |
starrocks.fe.http.url | 是 | 配置 FE 节点 HTTP 服务器,格式为 | |
starrocks.table.identifier | 是 | 目标导入的 StarRocks 数据表,格式为: | |
starrocks.user | 是 | StarRocks 访问用户名称。 | |
starrocks.password | 是 | StarRocks 访问用户密码。 | |
starrocks.write.label.prefix | 否 |
| 用于配置 Stream Load 导入任务 label 的前缀,推荐为作业依据具体的业务场景配置 label 前缀。 |
starrocks.write.enable.transaction-stream-load | 否 |
| 用于配置导入操作是否使用 Stream Load 事务接口,相对于普通接口在内存占用和性能层面均有更好的表现。 说明 如果配置了 |
starrocks.write.buffer.size | 否 |
| 用于配置缓存在内存中的数据量,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。 |
starrocks.write.buffer.rows | 否 | Integer.MAX_VALUE | 用于配置缓存在内存中的数据条数,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。 |
starrocks.write.flush.interval.ms | 否 |
| 用于配置数据发送的时间间隔。 |
starrocks.write.max.retries | 否 |
| 用于配置最大失败重试次数。 说明 如果该参数配置值大于 0,则忽略 |
starrocks.write.retry.interval.ms | 否 |
| 失败重试时间间隔。 |
starrocks.columns | 否 | 用于配置写入部分列,多个列名以英文逗号 | |
starrocks.write.num.partitions | 否 | 用于配置 Spark 并行写入的分区数,默认由 Spark 决定。 | |
starrocks.write.partition.columns | 否 | 用于配置 Spark 分区的列,如果不指定则使用所有写入的列进行分区。 说明 该参数仅在配置 | |
starrocks.timezone | 否 | 用于配置时区。 |
说明
Spark Connector 原生模式在导入数据时底层基于 Stream Load 实现,除了本小节列出的配置参数外,您也可以通过 starrocks.write.properties.{stream load 参数名}
的形式直接设置 Stream Load 的导入行为。例如通过 starrocks.write.properties.format
设置导入的数据格式,对应 Stream Load 的 format
参数。
本小节以导入数据到 StarRocks 明细表 examples.tb_duplicate_key
为例,该表的建表语句如下:
CREATE TABLE IF NOT EXISTS tb_duplicate_key ( event_time BIGINT NOT NULL COMMENT 'timestamp of event', event_type INT NOT NULL COMMENT 'type of event', user_id INT NOT NULL COMMENT 'id of user', device VARCHAR(128) NULL COMMENT 'device' ) ENGINE = OLAP DUPLICATE KEY(event_time, event_type) DISTRIBUTED BY HASH(user_id) PROPERTIES ( 'replication_num' = '3' );
您可以直接通过 Spark SQL 形式将数据写入 StarRocks 对应数据表中,步骤如下:
--jars /opt/emr/current/spark/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar
CREATE TABLE
创建一张 StarRocks tb_duplicate_key
表的外表,不要求同名:CREATE TABLE IF NOT EXISTS tb_duplicate_key USING starrocks OPTIONS ( "starrocks.fe.http.url"="192.168.10.2:8030", "starrocks.fe.jdbc.url"="jdbc:mysql://192.168.10.2:9030", "starrocks.table.identifier"="examples.tb_duplicate_key", "starrocks.user"="system_query_user", "starrocks.password"="******" );
INSERT INTO
操作将数据插入外表:INSERT INTO tb_duplicate_key VALUES (1703128450, 1, 1001, 'PHONE'), (1703128451, 0, 1002, 'PAD'), (1703128452, 1, 1003, 'TV');
正常情况下,您可以在 StarRocks 中查询到刚刚由 Spark 侧写入的数据。
本小节以 Batch 任务为例,演示将内存中构造的数据通过 Spark DataFrame 方式导入 StarRocks 的 tb_duplicate_key
表。Scala 示例代码如下:
val spark = SparkSession .builder() .appName("load_data_example") .getOrCreate() import spark.implicits._ // 模拟数据 val data = Seq( (1703128450, 1, 1001L, "PHONE"), (1703128451, 0, 1002L, "PAD"), (1703128452, 1, 1003L, "TV"), ) // 将数据写入 StarRocks val df = data.toDF("event_time", "event_type", "user_id", "device") df.write .format("starrocks") .option("starrocks.fe.http.url", "192.168.10.2:8030") .option("starrocks.fe.jdbc.url", "jdbc:mysql://192.168.10.2:9030") .option("starrocks.table.identifier", "examples.tb_duplicate_key") .option("starrocks.user", "system_query_user") .option("starrocks.password", "******") .mode("append") .save()
关于如何提交 Spark 任务可以参考 Spark 使用文档,需要额外添加如下参数:
--jars /opt/emr/current/spark/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar
您可以访问 StarRocks 官方文档 了解关于使用 Spark Connector 向 StarRocks 导入数据的更多介绍,以及如何使用 Spark Connector 读取 StarRocks 中的数据。