Spark SQL 方式
-- 直接通过 path 查询 DESCRIBE HISTORY '/path/to/delta/' [LIMIT <n>] -- 通过 delta 前缀查询 DESCRIBE HISTORY delta.`/path/to/delta/` [LIMIT <n>] -- 通过表名查询 DESCRIBE HISTORY deltaTable [LIMIT <n>]
Spark Python API 方式
from delta.tables import * # 通过指定表路径获得表 deltaTable = DeltaTable.forPath(spark, pathToTable) # 查询历史版本,其中参数 n 可选,指定获取 n 条记录。如果没有指定 n,则获取全部记录 lastOperationDF = deltaTable.history(<n>)
Spark SQL 方式
-- 根据时间戳查询历史版本 SELECT * FROM table_name TIMESTAMP AS OF timestamp_expression -- 根据版本号查询历史版本 SELECT * FROM table_name VERSION AS OF version
Spark Python API 方式
# 根据时间戳查询历史版本 df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta/people") # 根据版本号查询历史版本 df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta/people")
其中:
timestamp_expression
的格式为
'2018-10-18T22:15:12.013Z'
, 可以被转换为 timestamp 的标准时间格式
cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, 日期
current_timestamp() - interval 12 hours
时间表达式
date_sub(current_date(), 1)
时间表达式
其他可以被转换为 timestamp 的时间格式
version
则可以通过 DESCRIBE HISTORY
指令获取。
Spark SQL 方式
-- 通过路径获得表信息 DESCRIBE DETAIL '/path/to/delta/' -- 通过表名获得表信息 DESCRIBE DETAIL deltaTable
Spark Python API 方式
from delta.tables import * # 通过指定表路径获得表 deltaTable = DeltaTable.forPath(spark, pathToTable) # 查询表属性明细 detailDF = deltaTable.detail()
Delta Lake 有历史版本回溯的功能,它记录了所有的针对表的修改动作。每一次的表更改都会生成新的日志文件,还可能生成新的数据文件。针对日志文件和数据文件,Delta Lake 都引入了保存期机制:
对于日志文件,默认保存 30 天内的数据,过期会自动删除,您无需关心;
对于数据文件,默认有 7 天的保存期,过期的数据需要用户手动执行 VACUUM 命令删除;
用户可以通过配置 delta.logRetentionDuration = "interval <interval>"
和 delta.deletedFileRetentionDuration = "interval <interval>"
来分别设置日志文件和数据文件的保存期。具体的设置方式见下文:表配置。
Spark SQL 方式
-- 通过表名执行清理动作。RETAIN 100 HOURS 表示清理 100 VACUUM deltaTable [RETAIN 100 HOURS] -- vacuum files not required by versions older than the default retention period -- 通过路径字符串执行清理动作 VACUUM '/path/to/delta/' [RETAIN 100 HOURS]-- vacuum files in path-based table -- 通过 delta. 前缀执行清理动作 VACUUM delta.`/path/to/delta/` [RETAIN 100 HOURS] -- 只是列出清理详情而不真正执行 VACUUM deltaTable DRY RUN
Spark Python API 方式
from delta.tables import * # 通过指定表路径获得表 deltaTable = DeltaTable.forPath(spark, pathToTable) # 或者通过指定表名获得表 deltaTable = DeltaTable.forName(spark, tableName) # 清理超过安全期的历史数据 deltaTable.vacuum() # 清理超过100小时的历史数据(相对于表的版本) deltaTable.vacuum(100)
用流式写入 Delta 的方式通常会产生大量小文件,时间粒度越细文件越小。另外,如果您执行一段复杂的 SQL 最终写入 Delta,则写入数据也可能因为 Partition 数量过多而产生大量小文件。
小文件的存在会造成很多问题,比如元数据处理速度下降、执行时因为文件过碎导致的磁盘随机读、用户设置并行度过大引起的小 task 过多等等,这些都会显著降低 Spark 的查询性能,因此需要对其进行合并操作。Delta 通过提供 optimize 指令来完成这个动作。
Spark SQL 方式
-- 通过指定路径进行优化 OPTIMIZE '/path/to/delta/' [WHERE CLAUSE] -- 通过表名进行优化 OPTIMIZE deltaTable [WHERE CLAUSE] -- 通过 delta. 前缀进行优化 OPTIMIZE delta.`/path/to/delta/` [WHERE CLAUSE]
Spark Python API 方式
from delta.tables import * # 通过指定表路径获得表 deltaTable = DeltaTable.forPath(spark, pathToTable) # 或者通过指定表名获得表 deltaTable = DeltaTable.forName(spark, tableName) # 执行优化,其中 where 是可选的 deltaTable.optimize().where("date='2021-11-18'").executeCompaction()
Data skpping 是一种利用统计信息来过滤数据的一种方式,能够在表 scan 的时期根据过滤条件过滤掉大量数据进而加快查询。Delta 的统计信息是关于列的、文件级别的 MIN、MAX 统计信息。如果一个列在多个文件相对有序,那么可以根据该列统计信息过滤掉多个文件。反之,如果列值均衡的分布在多个文件之中,则过滤效果会大打折扣甚至没有过滤。对于一个多维表格,如果按照多维排序的方式,前面的维度过滤效果会比较好,后边的维度过滤效果会比较差。
Z-Order 提供了一种均衡各个维度排序效果的方法。Z-Order 是一种算法,能够使得参与排序的每个列都在局部相对有序,因此拿任何参与排序的列来过滤都能取得不错的过滤效果。
Delta Lake 在 OPTIMIZE
语句中提供了 ZORDER BY
子句来完成表的 Z-Order 排序。
Spark SQL 方式
OPTIMIZE events [WHERE CLAUSE] ZORDER BY (<columns>)
Spark Python API 方式
from delta.tables import * # 通过指定表路径获得表 deltaTable = DeltaTable.forPath(spark, pathToTable) # 或者通过指定表名获得表 deltaTable = DeltaTable.forName(spark, tableName) # 执行优化,其中 where 是可选的 deltaTable.optimize().where("date='2021-11-18'").executeZOrderBy(<columns>)
Delta Lake 的 history 功能提供了表的详细审计信息。当执行 DESCRIBE HISTORY
或者在 API 中调用 history()
函数时,返回的字段包含了不限于
userId
userName
operation
job
notebook
clusterId
...
等字段。这些字段有助于管理员发现并管理普通用户提交作业的历史,并据其优化集群管理。
您可以通过多种方式对 Delta 表进行配置:
通过建表时指定 TBLPROPERTIES
进行配置。
通过 Spark 的配置文件添加默认配置。
建表后通过修改 TBLPROPERTIES
进行配置。
用 CLI 查询时在 session 里进行配置。
Delta 表支持三种配置格式:
delta.<property>
:用于通过 TBLPROPERTIES
的进行的配置。spark.databricks.delta.properties.defaults.<property>
:用于通过 Spark SQL 设置表的默认配置,它会在建表时作为相应配置项的默认配置填到表中,没有这个配置的话会采用 Delta 的默认配置,也就是说,它等同于“delta.defaults.<property>
”,但是由于某些原因,使用了 spark.databricks.delta.properties.defaults.
这个前缀。spark.databricks.delta.<property>
:Spark SQL 在读写 Delta 表时的运行时配置。1 和 2 都属于表级的配置,3 是 Spark 引擎的配置。
表配置优先级从高到低如下:
使用 TBLPROPERTIES
显式设置表的属性。
用户在 session 中通过 set spark.databricks.delta.properties.defaults.<proeprty> = xxx
设置表的默认配置。
用户在 spark-default-conf 中设置的 spark.databricks.delta.properties.defaults.<property>
。
如果使用其他引擎,建议通过显式指定 TBLPROPERTIES
对表进行配置。
对于 3,它影响 Spark SQL 的执行行为。
注:下面的 <prefix> 指 delta
或者 spark.databricks.delta.properties.defaults
。
配置项 | 说明 |
---|---|
<prefix>.logRetentionDuration | 日志文件的生存期,默认为 30 天。 |
<prefix>.deletedFileRetentionDuration | 数据文件的生存期,默认为 7 周。 |
<prefix>.checkpointInterval | 设置日志文件的 checkpoint 周期,默认为 10。 |
<prefix>.appendOnly | 设置表为 appendOnly,此时表能够删除和追加数据,但不能更新数据。Delta 对 appendOnly 的表有优化,如果没有更新删除需求,建议设置此值。 |
<prefix>.enableChangeDataFeed | 是否开启 ChangeDataFeed 功能。 |
注意
强烈不建议通过设置 deletedFileRetentionDuration
为一个较小的值达到更快清理数据的目的。比如某些情况下不需要历史版本,设置了该值为 1h 甚至是 0,以便让自己能快速清理历史版本。在这种情况下可能会导致执行时长比较长的作业失败!
注:下面的<prefix> 指 spark.databricks.delta
。
配置项 | 说明 |
---|---|
<prefix>.stats.collect | 是否收集统计信息,默认为 true。 |
<prefix>.stats.skipping | 是否打开 data skipping 功能,默认为 true。 |
<prefix>.maxCommitAttempts | 提交失败是最大的重试次数,默认为 10000000。 |
<prefix>.optimize.minFileSize | 小于此大小的文件会被合并,默认值为 1024 * 1024 * 1024。 |
<prefix>.optimize.maxFileSize | 合并文件的目标最大值,默认为 1024 * 1024 * 1024。 |
<prefix>.optimize.maxThreads | 执行 optimize 动作时启用的线程数量,默认为 15。 |
<prefix>.vacuum.parallelDelete.enabled | 是否并行执行 vacuum,默认为 false,对于大表建议并行。 |
<prefix>.vacuum.parallelDelete.parallelism | 并行 vacuum 时的线程数量。 |