You need to enable JavaScript to run this app.
导航
基础使用
最近更新时间:2023.12.27 14:43:54首次发布时间:2022.10.26 10:29:42

本文将为您介绍火山引擎 E-MapReduce(EMR)中 Delta Lake 服务的常用基础使用命令操作。

1 前提条件

  1. 已创建包含 Delta Lake 服务的 EMR 引擎。详情请参见 创建集群
  2. 目前 Delta Lake 组件是白名单开放,您可通过 创建工单 的方式,申请使用。

2 初始化客户端

  1. 登陆 EMR 控制台

  2. 点击进入 集群列表 > 集群名称详情 > 服务列表 > Delta Lake 服务界面。

  3. 部署拓扑 中,展开组件名称。

  4. 点击集群节点的ECS ID,跳转进入到云服务器的实例界面,点击右上角的 远程连接 按钮,输入集群创建时的root密码或秘钥,进入远程终端。
    或使用 SSH 方式登录到集群主节点,详情请参见使用 SSH连接主节点

  5. 执行以下语句进行客户端初始化操作。

2.1 Spark SQL

spark-sql \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

2.2 Spark Shell

spark-shell \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

2.3 PySpark

PySpark 默认用的 python 环境由环境变量 PYSPARK_PYTHONspark-env.sh 中定义。EMR 已经将系统对应版本的 delta 包安装在了这个 python 环境中,您无需再自行 pip install。

pyspark \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

注意

  1. 如果不使用 SQL 功能(如 saveAsTable),spark-shell 和 pyspark 中的两个 --conf 可以去掉。

  2. 对于 PySpark,有些功能是 Spark 本身提供的,比如 spark.read.format("delta")df.write.format("delta"),这些 PySpark 提供了内置支持。
    有些功能是 Delta 独有的,比如实例化一个 Delta 表对象 DeltaTable.forPath。您需要理清楚两者之间的关系(可参考下面的例子)。

在接下来的介绍中,我们的样例代码将给出 Spark SQL 和 PySpark 的方式。详见:

2.4 TOS 支持

EMR 默认添加了 TOS 支持,同时内置了自动化鉴权,您无需输入 AK。
您可以像使用 HDFS 的方式一样使用 TOS,例如,直接将 hdfs://tmp/delta 替换为 tos://tmp/delta
如果 TOS 支持因为一些意外配置导致丢失,则只需要在 HDFS core-site 的配置中增加如下配置项:

fs.AbstractFileSystem.tos.impl: com.volcengine.cloudfs.fs.TOS
fs.tos.impl: com.volcengine.cloudfs.fs.TosFileSystem
fs.tos.endpoint: <your_tos_endpoint>

3 建表

3.1 表的分类

Delta Lake 建表支持用外部 metastore 表的方式和通过目录的方式建表。对于前者,表一些信息(表名、表路径)会被存储在外部 metastore 中。另外,在下文您可以看到,如果需要使用 Hive 进行查询,则需要在 metastore 中建一张 Hive 表。

因此我们这里对表进行一下定义:

  1. Delta 表:指不依赖于 metastore 的,schema 存储于底层存储的表,可以用 Spark 根据表路径查询。

  2. Spark 表:指 Spark 在 metastore 中创建的对应于 Delta 表的内表或外表,可以用 Spark 根据表名查询。

  3. Hive 表:指 Hive 在 metastore 中创建的对应于 Delta 表的外表,可以用 Hive SQL 进行查询。

3.2 直接建表

3.2.1 Spark SQL 方式

注意

  1. 这种方式下需要打通 Spark 访问 Hive metastore 的路径,若使用 EMR 自带的 PySpark 配置,则已经做好打通工作,您无需关心。

  2. 通过 metastore 方式建的表,不能作为 Hive 表来查询。同时,表信息中只有表名和表路径是正确的,其他信息包括 schema 仅具有参考意义,不保证和 Delta 表真实的 schema 一致。

-- 方式一:在 LOCATION 指定的目录下建 Delta 表,同时建立 Spark 表
CREATE TABLE IF NOT EXISTS default.people (
    id INT,
    name STRING,
    age INT
) USING DELTA
LOCATION `/tmp/delta/people`
-- 方式二:直接在指定目录建 Delta 表
CREATE OR REPLACE TABLE delta.`/tmp/delta/people` (
    id INT,
    name STRING,
    age INT
) USING DELTA

3.2.2 Spark Python API 方式

# 写新表,同时把它保存到外部 metastore(Spark 表)
df.write.format("delta").saveAsTable("default.people")
# 写或者 overwrite 一张表,表路径为 "/tmp/delta/people"
df.write.format("delta").mode("overwrite").save("/tmp/delta/people")

3.3 将 Hive 表转为 Delta 表

如果您已经有了一张 Hive 表,那么可以使用 CONVERT 命令直接把它转为 Delta 表:

3.3.1 Spark SQL 方式

CONVERT TO DELTA parquet.`<path-to-table>` [PARTITIONED BY (part int, part2 int)]

3.3.2 Spark Python API 方式

DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int");

注意

一旦将 Hive 表 CONVERT 成 Delta 表,就必须使用 Delta 提供的 VACUUM 命令(同时把版本过期时间设置为 0)才能将 Delta 表还原为 Hive 表,没有做这个动作直接删除 Delta 表的元数据会造成表损坏,并且无法恢复。同理,Delta 不允许任何不经 Delta 本身的直接操作元数据的行为
更多命令参见:https://docs.delta.io/latest/delta-utility.html

4 查询

Spark SQL 方式

-- 指定表名进行查询
SELECT * FROM default.people
-- 通过表路径进行查询
SELECT * FROM delta.`/tmp/delta/people`

Spark Python API 方式

# 指定表名进行查询
spark.table("default.people")
# 通过表路径进行查询
spark.read.format("delta").load("/tmp/delta/people")

4.1 使用 Hive 查询

EMR(1.4.0 以上版本)通过 Delta-Connectors 项目对 Hive 读 Delta 表进行了支持。注意,该项目不支持 Hive 写 Delta 表。

  1. 启动 Hive 客户端,并执行如下语句进行设置:
SET hive.input.format=io.delta.hive.HiveInputFormat;
SET hive.tez.input.format=io.delta.hive.HiveInputFormat;
  1. 在 Hive metastore 中创建一个 Delta 表的映射表(建表之前确保 Delta 表已经创建),这张映射表是一张外表,例如:
CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION 'tos://your_bucket/delta/table/path'
  1. 使用 Hive SQL 进行查询。

这里需要说明一下,Hive 表与 Delta 表,以及 Spark 在 metastore 创建的 Spark 表,虽然物理实体是一个,但是是三张表(参见:表的分类一节)。所以存在以下限制:

  • Hive 无法读取 Spark 在 metastore 创建的表。

  • Spark 无法读取 Hive 创建的映射表。

因此,当 Delta 表的 Schema 发生变化时,Hive 和 Spark 创建的表需要同步变化 schema。

  • 对于 Hive 来说,由于它是外表,所以仅仅需要 DROP 并重新 CREATE TABLE。

  • 对于 Spark 来说,如果是 Spark 创建的 Managed Table,不能像 Hive 那样先 DROP 再 CREATE(会删掉数据),所以最好所有的操作都通过表进行(包括 schema 变更),否则最好也像 Hive 一样用外表的方式。

4.2 使用 Presto 查询

Presto(0.269 及以上版本)自带了对 Delta 表的读支持。详情请参考官方页面

4.3 使用 Trino 查询

Trino(373 及以上版本)自带了对 Delta 表的读写支持。详情请参考官方页面

5 删除

Spark SQL 方式

-- 通过表名进行删除
DELETE FROM people WHERE age < 20
-- 通过表路径进行删除
DELETE FROM delta.`/tmp/delta/people` WHERE age < 20

Spark Python API 方式

from delta.tables import *
from pyspark.sql.functions import *
# 通过指定表路径获得表
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# 删除方式一
deltaTable.delete("age < 20")
# 删除方式二
deltaTable.delete(col('age') < '20')

6 更新

Spark SQL 方式

-- 通过表名进行更新
UPDATE people SET age = 20 WHERE name = 'zhangsan';
-- 通过表路径进行更新
UPDATE delta.`/tmp/delta/people` SET age = 20 WHERE name = 'zhangsan';

Spark Python API 方式

from delta.tables import *
from pyspark.sql.functions import *

# 通过指定表路径获得表
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# 更新方式一
deltaTable.update(
  condition = "name = 'zhangsan'",
  set = { "age": "20" }
)
# 更新方式二
deltaTable.update(
  condition = col('name') == 'zhangsan',
  set = { "age": "20" }
)

7 合并

相比于插入、更新和删除,合并提供了一种更有效的方式来批量的将一个表的数据“合并”到另一张表中,而不是对一张表中的每一行数据分别在第二张表中去执行插入、更新或删除。
合并在很多场景下都很有用,比如将上游产生的数据按照一定的时间粒度合并到当前表中,以保证当前表内容的实时性。
Spark SQL 方式

MERGE INTO people
USING peopleupdates
ON people.id = peopleupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = peopleupdates.id,
    name = peopleupdates.name,
    age = peopleupdates.age
WHEN NOT MATCHED
  THEN INSERT (
    id,
    name,
    age
  )
  VALUES (
    peopleupdates.id,
    peopleupdates.name,
    peopleupdates.age,
  )

Spark Python API 方式

from delta.tables import *

# 通过指定表路径获得表
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

# 执行 merge 动作
deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "name": "updates.name",
      "age": "updates.age"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "name": "updates.name",
      "age": "updates.age"
    }
  ) \
  .execute()

8 流式读写 Delta 表

8.1 Spark Streaming

8.1.1 Source

spark.readStream
  .format("delta")
  .option(<option_key>, <option_value>)
  .load("/tmp/delta_table")

常用选项参数:

参数说明
maxFilesPerTrigger一个批次最多处理的文件数量,默认值为 1000。
maxBytesPerTrigger一个批次最多处理的数据量。注意这个最大的值可能会稍微超出,不是一个严格的值。另外,如果 Trigger 策略是 Trigger.Once,这个 Option 会被忽略。
ignoreDeletes该选项设置为 true 后,对分区的删除动作不予响应。
ignoreChanges该选项设置为 true 后,当数据发生变更(如 UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE),重新消费数据。这个可能会造成数据重复,因此您需要自己处理重复数据。

8.1.2 Sink

Append 模式(默认模式):

df.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta_table/_checkpoints")
  .start("/tmp/delta_table")
  
 // toTable 是 Spark 3.1 新增语法,3.1 以下用 `table()` 方法
 df.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("delta_table")

Complete 模式:

spark.readStream
  .format("delta")
  .load("/tmp/delta_table")
  .select("name","sales")
  .groupBy("name")
  .agg(sum("sales"))
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta_table_output/_checkpoints")
  .start("/tmp/delta_table_output")

EMR(1.4.0 以上版本)通过 Delta-Connectors 项目对 Flink 读写 Dleta 表进行了支持。

8.2.1 Source

// 有界流(批),使用 forBoundedRowData API。
DataStream<RowData> createBoundedDeltaSourceWithTimeTravel(
        StreamExecutionEnvironment env,
        String deltaTablePath) {

    DeltaSource<RowData> deltaSource = DeltaSource
        .forBoundedRowData(
            new Path(deltaTablePath),
            new Configuration())
        // could also use `.versionAsOf(314159)`
        .timestampAsOf("2022-06-28 04:55:00")
        .build();

    return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}

// 无界流,使用 forContinuousRowData API。
public DataStream<RowData> createContinuousDeltaSourceWithTimeTravel(
        StreamExecutionEnvironment env,
        String deltaTablePath) {

    DeltaSource<RowData> deltaSource = DeltaSource
        .forContinuousRowData(
            new Path(deltaTablePath),
            new Configuration())
        // could also use `.startingVersion(314159)`
        .startingTimestamp("2022-06-28 04:55:00")
        .build();

    return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}

常用参数选项:

参数说明
有界模式
versionAsOf指定要读的 Delta 表版本。
timestampAsOf指定要读的 Delta 表时间戳。
columnNames指定要读 Delta 表哪些列,如果没有指定,则读取全部的列。
无界模式
columnNames指定要读 Delta 表哪些列,如果没有指定,则读取全部的列。
startingVersion从该版本流式读取 Delta 表。
startingTimestamp从该时间戳开始读取 Delta 表。
updateCheckIntervalMillis检查 Delta 表变动的时间间隔。
ignoreDeletes当设置为 True 时,对数据的删除不予响应。
ignoreChanges当设置为 True 时,当数据发生变更(如 UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE),重新消费数据。这个可能会造成数据重复,因此您需要自己处理重复数据。

8.2.2 Sink

import io.delta.flink.sink.DeltaBucketAssigner;
import io.delta.flink.sink.DeltaSinkBuilder;

public DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath,
        RowType rowType) {
    String[] partitionCols = { "partCol" };
    DeltaSink<RowData> deltaSink = DeltaSink
        .forRowData(
            new Path(deltaTablePath),
            new Configuration(),
            rowType)
        .withPartitionColumns(partitionCols)
        .build();
    stream.sinkTo(deltaSink);
    return stream;
}

9 使用 TOS 作为存储底层

TOS 提供了兼容 HDFS FileSystem API 的 FileSystem 实现,并且已经在 HDFS 中做了相关配置,因此可以像使用 HDFS 那样使用 TOS 作为 Delta 表的底层存储。下面是一个例子:
Spark SQL

CREATE TABLE delta_test (
    id int,
    name string
) USING delta
LOCATION 'tos://tmp/delta_test';

INSERT INTO delta_test VALUES (1, 'zhangsan'),(2, 'lisi');

SELECT * FROM delta_test;

-- 结果显示:
1        zhangsan
2        lisi
Time taken: 0.921 seconds, Fetched 2 row(s)

PySpark

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

data = [(1, 'zhangsa'), (2, 'lisi')]
schema = StructType([ \
    StructField("id", IntegerType(), True), \
    StructField("name", StringType(), True), \
])

df = spark.createDataFrame(data=data,schema=schema)
# 如果不需要建表,直接使用 save('/path/to/your/table') 即可
df.write.format('delta').saveAsTable('delta_test', path='tos://tmp/delta_test')

df_read = spark.read.format('delta').load('tos://tmp/delta_test')
df_read.show()

# 结果显示
+---+-------+
| id|   name|
+---+-------+
|  1|zhangsa|
|  2|   lisi|
+---+-------+