本文将为您介绍火山引擎 E-MapReduce(EMR)中 Delta Lake 服务的常用基础使用命令操作。
点击进入 集群列表 > 集群名称详情 > 服务列表 > Delta Lake 服务界面。
在 部署拓扑 中,展开组件名称。
点击集群节点的ECS ID,跳转进入到云服务器的实例界面,点击右上角的 远程连接 按钮,输入集群创建时的root密码或秘钥,进入远程终端。
或使用 SSH 方式登录到集群主节点,详情请参见使用 SSH连接主节点。
执行以下语句进行客户端初始化操作。
spark-sql \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
spark-shell \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
PySpark 默认用的 python 环境由环境变量 PYSPARK_PYTHON
在 spark-env.sh
中定义。EMR 已经将系统对应版本的 delta 包安装在了这个 python 环境中,您无需再自行 pip install。
pyspark \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
注意
如果不使用 SQL 功能(如 saveAsTable),spark-shell 和 pyspark 中的两个 --conf 可以去掉。
对于 PySpark,有些功能是 Spark 本身提供的,比如 spark.read.format("delta")
,df.write.format("delta")
,这些 PySpark 提供了内置支持。
有些功能是 Delta 独有的,比如实例化一个 Delta 表对象 DeltaTable.forPath
。您需要理清楚两者之间的关系(可参考下面的例子)。
在接下来的介绍中,我们的样例代码将给出 Spark SQL 和 PySpark 的方式。详见:
EMR 默认添加了 TOS 支持,同时内置了自动化鉴权,您无需输入 AK。
您可以像使用 HDFS 的方式一样使用 TOS,例如,直接将 hdfs://tmp/delta
替换为 tos://tmp/delta
。
如果 TOS 支持因为一些意外配置导致丢失,则只需要在 HDFS core-site 的配置中增加如下配置项:
fs.AbstractFileSystem.tos.impl: com.volcengine.cloudfs.fs.TOS fs.tos.impl: com.volcengine.cloudfs.fs.TosFileSystem fs.tos.endpoint: <your_tos_endpoint>
Delta Lake 建表支持用外部 metastore 表的方式和通过目录的方式建表。对于前者,表一些信息(表名、表路径)会被存储在外部 metastore 中。另外,在下文您可以看到,如果需要使用 Hive 进行查询,则需要在 metastore 中建一张 Hive 表。
因此我们这里对表进行一下定义:
Delta 表:指不依赖于 metastore 的,schema 存储于底层存储的表,可以用 Spark 根据表路径查询。
Spark 表:指 Spark 在 metastore 中创建的对应于 Delta 表的内表或外表,可以用 Spark 根据表名查询。
Hive 表:指 Hive 在 metastore 中创建的对应于 Delta 表的外表,可以用 Hive SQL 进行查询。
注意
这种方式下需要打通 Spark 访问 Hive metastore 的路径,若使用 EMR 自带的 PySpark 配置,则已经做好打通工作,您无需关心。
通过 metastore 方式建的表,不能作为 Hive 表来查询。同时,表信息中只有表名和表路径是正确的,其他信息包括 schema 仅具有参考意义,不保证和 Delta 表真实的 schema 一致。
-- 方式一:在 LOCATION 指定的目录下建 Delta 表,同时建立 Spark 表 CREATE TABLE IF NOT EXISTS default.people ( id INT, name STRING, age INT ) USING DELTA LOCATION `/tmp/delta/people` -- 方式二:直接在指定目录建 Delta 表 CREATE OR REPLACE TABLE delta.`/tmp/delta/people` ( id INT, name STRING, age INT ) USING DELTA
# 写新表,同时把它保存到外部 metastore(Spark 表) df.write.format("delta").saveAsTable("default.people") # 写或者 overwrite 一张表,表路径为 "/tmp/delta/people" df.write.format("delta").mode("overwrite").save("/tmp/delta/people")
如果您已经有了一张 Hive 表,那么可以使用 CONVERT 命令直接把它转为 Delta 表:
CONVERT TO DELTA parquet.`<path-to-table>` [PARTITIONED BY (part int, part2 int)]
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int");
注意
一旦将 Hive 表 CONVERT 成 Delta 表,就必须使用 Delta 提供的 VACUUM 命令(同时把版本过期时间设置为 0)才能将 Delta 表还原为 Hive 表,没有做这个动作直接删除 Delta 表的元数据会造成表损坏,并且无法恢复。同理,Delta 不允许任何不经 Delta 本身的直接操作元数据的行为。
更多命令参见:https://docs.delta.io/latest/delta-utility.html
Spark SQL 方式
-- 指定表名进行查询 SELECT * FROM default.people -- 通过表路径进行查询 SELECT * FROM delta.`/tmp/delta/people`
Spark Python API 方式
# 指定表名进行查询 spark.table("default.people") # 通过表路径进行查询 spark.read.format("delta").load("/tmp/delta/people")
EMR(1.4.0 以上版本)通过 Delta-Connectors 项目对 Hive 读 Delta 表进行了支持。注意,该项目不支持 Hive 写 Delta 表。
SET hive.input.format=io.delta.hive.HiveInputFormat; SET hive.tez.input.format=io.delta.hive.HiveInputFormat;
CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING) STORED BY 'io.delta.hive.DeltaStorageHandler' LOCATION 'tos://your_bucket/delta/table/path'
这里需要说明一下,Hive 表与 Delta 表,以及 Spark 在 metastore 创建的 Spark 表,虽然物理实体是一个,但是是三张表(参见:表的分类一节)。所以存在以下限制:
Hive 无法读取 Spark 在 metastore 创建的表。
Spark 无法读取 Hive 创建的映射表。
因此,当 Delta 表的 Schema 发生变化时,Hive 和 Spark 创建的表需要同步变化 schema。
对于 Hive 来说,由于它是外表,所以仅仅需要 DROP 并重新 CREATE TABLE。
对于 Spark 来说,如果是 Spark 创建的 Managed Table,不能像 Hive 那样先 DROP 再 CREATE(会删掉数据),所以最好所有的操作都通过表进行(包括 schema 变更),否则最好也像 Hive 一样用外表的方式。
Presto(0.269 及以上版本)自带了对 Delta 表的读支持。详情请参考官方页面。
Trino(373 及以上版本)自带了对 Delta 表的读写支持。详情请参考官方页面。
Spark SQL 方式
-- 通过表名进行删除 DELETE FROM people WHERE age < 20 -- 通过表路径进行删除 DELETE FROM delta.`/tmp/delta/people` WHERE age < 20
Spark Python API 方式
from delta.tables import * from pyspark.sql.functions import * # 通过指定表路径获得表 deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m') # 删除方式一 deltaTable.delete("age < 20") # 删除方式二 deltaTable.delete(col('age') < '20')
Spark SQL 方式
-- 通过表名进行更新 UPDATE people SET age = 20 WHERE name = 'zhangsan'; -- 通过表路径进行更新 UPDATE delta.`/tmp/delta/people` SET age = 20 WHERE name = 'zhangsan';
Spark Python API 方式
from delta.tables import * from pyspark.sql.functions import * # 通过指定表路径获得表 deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m') # 更新方式一 deltaTable.update( condition = "name = 'zhangsan'", set = { "age": "20" } ) # 更新方式二 deltaTable.update( condition = col('name') == 'zhangsan', set = { "age": "20" } )
相比于插入、更新和删除,合并提供了一种更有效的方式来批量的将一个表的数据“合并”到另一张表中,而不是对一张表中的每一行数据分别在第二张表中去执行插入、更新或删除。
合并在很多场景下都很有用,比如将上游产生的数据按照一定的时间粒度合并到当前表中,以保证当前表内容的实时性。
Spark SQL 方式
MERGE INTO people USING peopleupdates ON people.id = peopleupdates.id WHEN MATCHED THEN UPDATE SET id = peopleupdates.id, name = peopleupdates.name, age = peopleupdates.age WHEN NOT MATCHED THEN INSERT ( id, name, age ) VALUES ( peopleupdates.id, peopleupdates.name, peopleupdates.age, )
Spark Python API 方式
from delta.tables import * # 通过指定表路径获得表 deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people') deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-updates') dfUpdates = deltaTablePeopleUpdates.toDF() # 执行 merge 动作 deltaTablePeople.alias('people') \ .merge( dfUpdates.alias('updates'), 'people.id = updates.id' ) \ .whenMatchedUpdate(set = { "id": "updates.id", "name": "updates.name", "age": "updates.age" } ) \ .whenNotMatchedInsert(values = { "id": "updates.id", "name": "updates.name", "age": "updates.age" } ) \ .execute()
spark.readStream .format("delta") .option(<option_key>, <option_value>) .load("/tmp/delta_table")
常用选项参数:
参数 | 说明 |
---|---|
maxFilesPerTrigger | 一个批次最多处理的文件数量,默认值为 1000。 |
maxBytesPerTrigger | 一个批次最多处理的数据量。注意这个最大的值可能会稍微超出,不是一个严格的值。另外,如果 Trigger 策略是 Trigger.Once,这个 Option 会被忽略。 |
ignoreDeletes | 该选项设置为 true 后,对分区的删除动作不予响应。 |
ignoreChanges | 该选项设置为 true 后,当数据发生变更(如 UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE),重新消费数据。这个可能会造成数据重复,因此您需要自己处理重复数据。 |
Append 模式(默认模式):
df.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/tmp/delta_table/_checkpoints") .start("/tmp/delta_table") // toTable 是 Spark 3.1 新增语法,3.1 以下用 `table()` 方法 df.writeStream .outputMode("append") .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") .toTable("delta_table")
Complete 模式:
spark.readStream .format("delta") .load("/tmp/delta_table") .select("name","sales") .groupBy("name") .agg(sum("sales")) .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/tmp/delta_table_output/_checkpoints") .start("/tmp/delta_table_output")
EMR(1.4.0 以上版本)通过 Delta-Connectors 项目对 Flink 读写 Dleta 表进行了支持。
// 有界流(批),使用 forBoundedRowData API。 DataStream<RowData> createBoundedDeltaSourceWithTimeTravel( StreamExecutionEnvironment env, String deltaTablePath) { DeltaSource<RowData> deltaSource = DeltaSource .forBoundedRowData( new Path(deltaTablePath), new Configuration()) // could also use `.versionAsOf(314159)` .timestampAsOf("2022-06-28 04:55:00") .build(); return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); } // 无界流,使用 forContinuousRowData API。 public DataStream<RowData> createContinuousDeltaSourceWithTimeTravel( StreamExecutionEnvironment env, String deltaTablePath) { DeltaSource<RowData> deltaSource = DeltaSource .forContinuousRowData( new Path(deltaTablePath), new Configuration()) // could also use `.startingVersion(314159)` .startingTimestamp("2022-06-28 04:55:00") .build(); return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); }
常用参数选项:
参数 | 说明 |
---|---|
有界模式 | |
versionAsOf | 指定要读的 Delta 表版本。 |
timestampAsOf | 指定要读的 Delta 表时间戳。 |
columnNames | 指定要读 Delta 表哪些列,如果没有指定,则读取全部的列。 |
无界模式 | |
columnNames | 指定要读 Delta 表哪些列,如果没有指定,则读取全部的列。 |
startingVersion | 从该版本流式读取 Delta 表。 |
startingTimestamp | 从该时间戳开始读取 Delta 表。 |
updateCheckIntervalMillis | 检查 Delta 表变动的时间间隔。 |
ignoreDeletes | 当设置为 True 时,对数据的删除不予响应。 |
ignoreChanges | 当设置为 True 时,当数据发生变更(如 UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE),重新消费数据。这个可能会造成数据重复,因此您需要自己处理重复数据。 |
import io.delta.flink.sink.DeltaBucketAssigner; import io.delta.flink.sink.DeltaSinkBuilder; public DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath, RowType rowType) { String[] partitionCols = { "partCol" }; DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new Path(deltaTablePath), new Configuration(), rowType) .withPartitionColumns(partitionCols) .build(); stream.sinkTo(deltaSink); return stream; }
TOS 提供了兼容 HDFS FileSystem API 的 FileSystem 实现,并且已经在 HDFS 中做了相关配置,因此可以像使用 HDFS 那样使用 TOS 作为 Delta 表的底层存储。下面是一个例子:
Spark SQL
CREATE TABLE delta_test ( id int, name string ) USING delta LOCATION 'tos://tmp/delta_test'; INSERT INTO delta_test VALUES (1, 'zhangsan'),(2, 'lisi'); SELECT * FROM delta_test; -- 结果显示: 1 zhangsan 2 lisi Time taken: 0.921 seconds, Fetched 2 row(s)
PySpark
from pyspark.sql.types import StructType,StructField, StringType, IntegerType data = [(1, 'zhangsa'), (2, 'lisi')] schema = StructType([ \ StructField("id", IntegerType(), True), \ StructField("name", StringType(), True), \ ]) df = spark.createDataFrame(data=data,schema=schema) # 如果不需要建表,直接使用 save('/path/to/your/table') 即可 df.write.format('delta').saveAsTable('delta_test', path='tos://tmp/delta_test') df_read = spark.read.format('delta').load('tos://tmp/delta_test') df_read.show() # 结果显示 +---+-------+ | id| name| +---+-------+ | 1|zhangsa| | 2| lisi| +---+-------+