Procedures # 流程 #
This section introduce all available spark procedures about paimon.
本节介绍所有可用的关于 Paimon 的 Spark 存储过程。
| Procedure Name 流程名称 | Explanation 说明 | Example 示例 |
|---|---|---|
| compact |
To compact files. Argument:
用于压缩文件。参数: table: 目标表标识符。不能为空。 partitions: 分区过滤。逗号(",")表示"AND",分号(";")表示"OR"。如果您想合并分区 date=01 和 day=01,需要写成 'date=01,day=01'。留空表示所有分区。(不能与 "where" 一起使用) where: 分区谓词。留空表示所有分区。(不能与 "partitions" 一起使用) order_strategy: 'order'、'zorder'、'hilbert' 或 'none'。留空表示 'none'。 排序列:需要对这些列进行排序。如果'排序策略'为'none',则留空。 分区空闲时间:这个参数用于对那些在'分区空闲时间'内没有接收到任何新数据的分区进行完全合并。并且只有这些分区会被合并。这个参数不能与顺序合并一起使用。 合并策略:这个参数决定了如何选择要合并的文件,默认值由运行时执行模式决定。'完全'策略只支持批处理模式。所有文件都会被选中用于合并。'次要'策略:根据指定条件选择需要合并的文件集。 |
SET spark.sql.shuffle.partitions=10; --set the compact parallelism SET spark.sql.shuffle.partitions=10; --设置合并并行度 CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b') CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b') CALL sys.compact(table => 'T', partition_idle_time => '60s') CALL sys.compact(table => 'T', compact_strategy => 'minor') |
| expire_snapshots 过期快照 |
To expire snapshots. Argument:
过期快照。参数: table: 目标表标识符。不能为空。 retain_max:保留的最大已完成快照数量。 retain_min: 保留的完成快照的最小数量。 older_than: 在此时间戳之前,快照将被删除。 max_deletes: 一次性可以删除的最大快照数量。 |
CALL sys.expire_snapshots(table => 'default.T', retain_max => 10) |
| expire_partitions 过期分区 |
To expire partitions. Argument:
过期分区。参数: table: 目标表标识符。不能为空。 expiration_time: 分区的过期间隔。如果分区的生命周期超过此值,则会被过期。分区时间从分区值中提取。 timestamp_formatter: 用于将字符串格式化为时间戳的格式化器。 timestamp_pattern: 用于从分区中获取时间戳的模式。 expire_strategy: 指定分区过期策略,可能值:'values-time' 或 'update-time',默认为 'values-time'。 max_expires: 有限过期分区的最大值,可选。 |
CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter =>
'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time') 调用 sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time') |
| create_tag 创建标签 |
To create a tag based on given snapshot. Arguments:
根据给定的快照创建标签。参数: table: 目标表标识符。不能为空。 tag: 新标签的名称。不能为空。 snapshot(Long): 新标签基于的快照的 id。 time_retained: 新创建标签的最大保留时间。 |
-- based on snapshot 10 with 1d -- 基于快照 10,保留 1 天 CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d') -- based on the latest snapshot -- 基于最新快照 CALL sys.create_tag(table => 'default.T', tag => 'my_tag') |
| create_tag_from_timestamp |
To create a tag based on given timestamp. Arguments:
根据给定的时间戳创建标签。参数: table: 目标表标识符。不能为空。 tag:新标签的名称。 timestamp (Long):查找第一个提交时间大于此时间戳的快照。 保留时间 : 新创建标签的最大保留时间。 |
CALL sys.create_tag_from_timestamp(`table` => 'default.T', `tag` => 'my_tag', `timestamp` => 1724404318750, time_retained => '1 d') |
| rename_tag 重命名标签 |
Rename a tag with a new tag name. Arguments:
使用新的标签名重命名标签。参数: table: 目标表标识符。不能为空。 tag: 标签名称。不能为空。 target_tag: 要重命名的新的标签名称。不能为空。 |
CALL sys.rename_tag(table => 'default.T', tag => 'tag1', target_tag => 'tag2') |
| replace_tag |
Replace an existing tag with new tag info. Arguments:
用新的标签信息替换现有的标签。参数: table: 目标表标识符。不能为空。 tag: 已存在的标签名称。不能为空。 快照(Long):基于该标签的快照 ID,可选。 保留时间:现有标签的最大保留时间,可选。 |
CALL sys.replace_tag(table => 'default.T', tag_name => 'tag1', snapshot => 10, time_retained => '1 d') |
| delete_tag 删除标签 |
To delete a tag. Arguments:
删除标签。参数: table: 目标表标识符。不能为空。 tag: 要删除的标签名称。如果指定多个标签,分隔符为','。 |
CALL sys.delete_tag(table => 'default.T', tag => 'my_tag') |
| expire_tags |
To expire tags by time. Arguments:
通过时间来过期标签。参数: table: 目标表标识符。不能为空。 older_than: 在此时间之前创建的标签将被移除。 |
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00') |
| rollback 回滚 |
To rollback to a specific version of target table, note version/snapshot/tag must set one of them. Argument:
要回滚到目标表的一个特定版本,必须设置版本/快照/标签中的一个。参数: table: 目标表标识符。不能为空。 版本: 将回滚到的快照的 ID 或标签的名称,版本将被弃用。 快照: 将回滚到的快照。 标签: 将回滚到的标签。 |
CALL sys.rollback(table => 'default.T', version => 'my_tag') CALL sys.rollback(table => 'default.T', version => 10) CALL sys.rollback(table => 'default.T', tag => 'tag1') CALL sys.rollback(table => 'default.T', snapshot => 2) |
| rollback_to_timestamp |
To rollback to the snapshot which earlier or equal than timestamp. Argument:
回滚到早于或等于 timestamp 的快照。参数: table: 目标表标识符。不能为空。 timestamp: 回滚到早于或等于 timestamp 的快照。 |
CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 1730292023000) |
| rollback_to_watermark |
To rollback to the snapshot which earlier or equal than watermark. Argument:
回滚到早于或等于水位线的快照。参数: table: 目标表标识符。不能为空。 watermark: 回滚到早于或等于水位线的快照。 |
CALL sys.rollback_to_watermark(table => 'default.T', watermark => 1730292023000) |
| purge_files 清除文件 |
To clear table with purge files. Argument:
清理包含 purge 文件的表。参数: table: 目标表标识符。不能为空。 |
CALL sys.purge_files(table => 'default.T') |
| migrate_database |
Migrate all hive tables in database to paimon tables. Arguments:
将数据库中的所有 Hive 表迁移到 Paimon 表。参数: source_type: 要迁移的原始数据库类型,例如 hive。不能为空。 database: 要迁移的原始数据库名称。不能为空。 选项:要迁移的 Paimon 表的表选项。 options_map:用于添加键值对选项的选项映射,是一个映射。 并行度:迁移过程的并行度,默认为机器的核心数。 |
CALL sys.migrate_database(source_type => 'hive', database => 'db01', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6) |
| migrate_table |
Migrate hive table to a paimon table. Arguments:
将 Hive 表迁移到 Paimon 表。参数: source_type: 要迁移的原始表的类型,例如 hive。不能为空。 table: 要迁移的原始表的名称。不能为空。 选项:要迁移的 Paimon 表的表选项。 目标表:要迁移的目标 Paimon 表名称。如果未设置,将保留与原表相同的名称。 删除原表:如果设置了目标表,可以设置 delete_origin 来决定是否在迁移后从 HMS 删除原表元数据。默认为 true。 options_map:用于添加键值对选项的选项映射,是一个映射。 并行度:迁移过程的并行度,默认为机器的核心数。 |
CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6) |
| remove_orphan_files 删除孤儿文件 |
To remove the orphan data files and metadata files. Arguments:
用于删除孤儿数据文件和元数据文件。参数: table: 目标表标识符。不能为空,可以使用 database_name.* 清理整个数据库。 older_than: 为了避免删除新写入的文件,此过程默认仅删除超过 1 天的孤儿文件。此参数可以修改时间间隔。 dry_run: 当为 true 时,仅查看孤儿文件,不实际删除文件。默认为 false。 parallelism: 最大并发删除文件数量。默认为 Java 虚拟机可用的处理器数量。 模式:删除孤儿清理过程的模式(本地或分布式)。默认为分布式。 |
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00') 调用 sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00') CALL sys.remove_orphan_files(table => 'default.*', older_than => '2023-10-31 12:00:00') 调用 sys.remove_orphan_files(table => 'default.*', older_than => '2023-10-31 12:00:00') CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true) 调用 sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true) CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5') 调用 sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5') CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local') 调用 sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local') |
| remove_unexisting_files 删除不存在的文件 |
Procedure to remove unexisting data files from manifest entries. See Java docs for detailed use cases. Arguments:
从清单条目中删除不存在的数据文件的步骤。有关详细用例,请参阅 Java 文档。参数: table: 目标表标识符。不能为空,可以使用 database_name.* 清理整个数据库。 dry_run (可选): 仅检查将要删除的文件,但不会真正删除它们。默认为 false。 parallelism (可选): 在清单中检查文件时使用的并行度数。 Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs. 请注意,使用此步骤的用户需自行承担风险,如果在 Java 文档中列出的用例之外使用,可能会导致数据丢失。 |
-- remove unexisting data files in the table `mydb.myt` -- 删除表 `mydb.myt` 中不存在的数据文件 CALL sys.remove_unexisting_files(table => 'mydb.myt') -- only check what files will be removed, but not really remove them (dry run) -- 仅检查将要被删除的文件,但实际并不删除它们(模拟运行) CALL sys.remove_unexisting_files(table => 'mydb.myt', dry_run = true) |
| repair 修复 |
Synchronize information from the file system to Metastore. Argument:
将文件系统中的信息同步到 Metastore。参数: 数据库或表:可以为空,或目标数据库名称,或目标表标识符,如果指定多个标签,分隔符为',' |
CALL sys.repair('test_db.T') 调用 sys.repair('test_db.T') CALL sys.repair('test_db.T,test_db01,test_db.T2') 调用 sys.repair('test_db.T,test_db01,test_db.T2') |
| create_branch 创建分支 |
To merge a branch to main branch. Arguments:
将分支合并到主分支。参数: table: 目标表标识符或分支标识符。不能为空。 branch: 要合并的分支名称。 tag: 新标签的名称。不能为空。 |
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch') 调用 sys.create_branch(table => 'test_db.T', branch => 'test_branch') CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag') 调用 sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag') CALL sys.create_branch(table => 'test_db.T$branch_existBranchName', branch => 'test_branch', tag => 'my_tag') |
| delete_branch 删除分支 |
To merge a branch to main branch. Arguments:
将分支合并到主分支。参数: table: 目标表标识符。不能为空。 分支:要合并的分支的名称。如果您指定多个分支,分隔符为','。 |
CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch') |
| fast_forward 快速前进 |
To fast_forward a branch to main branch. Arguments:
将分支快速前移到主分支。参数: table: 目标表标识符。不能为空。 branch: 要合并的分支名称。 |
CALL sys.fast_forward(table => 'test_db.T', branch => 'test_branch') |
| reset_consumer 重置消费者 |
To reset or delete consumer. Arguments:
用于重置或删除消费者。参数: table: 目标表标识符。不能为空。 consumerId: 要重置或删除的消费者。 nextSnapshotId (Long): 消费者的新下一个快照 ID。 |
-- reset the new next snapshot id in the consumer -- 重置消费者中的下一个快照 ID CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid', nextSnapshotId => 10) -- delete consumer -- 删除消费者 CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid') |
| clear_consumers |
To clear consumers. Arguments:
清除消费者。参数: table: 目标表标识符。不能为空。 includingConsumers: 要清除的消费者。 排除消费者:不需要清除的消费者。 |
-- clear all consumers in the table -- 清除表中的所有消费者 CALL sys.clear_consumers(table => 'default.T') 调用 sys.clear_consumers(table => 'default.T') -- clear some consumers in the table (accept regular expression) -- 清除表中的某些消费者(接受正则表达式) CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*') 调用 sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*') -- clear all consumers except excludingConsumers in the table (accept regular expression) -- 清除表中除 excludingConsumers 以外的所有消费者(接受正则表达式) CALL sys.clear_consumers(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*') 调用 sys.clear_consumers(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*') -- clear all consumers with includingConsumers and excludingConsumers (accept regular expression) -- 使用 includingConsumers 和 excludingConsumers 清除所有消费者(接受正则表达式) CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*') 调用 sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*') |
| mark_partition_done |
To mark partition to be done. Arguments:
标记要完成的分区。参数: table: 目标表标识符。不能为空。 partitions: 需要标记完成的分区,如果指定多个分区,分隔符是';'。 |
-- mark single partition done -- 标记单个分区完成 CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01') CALL sys.mark_partition_done(table => 'default.T', partitions => 'day=2024-07-01') -- mark multiple partitions done -- 标记多个分区完成 CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01;day=2024-07-02') 调用 sys.mark_partition_done(table => 'default.T', partitions => 'day=2024-07-01;day=2024-07-02') |
| refresh_object_table |
To refresh_object_table a object table. Arguments:
刷新对象表。参数: table: 目标表标识符。不能为空。 |
CALL sys.refresh_object_table('default.T') |
| compact_manifest |
To compact_manifest the manifests. Arguments:
table: 目标表标识符。不能为空。 |
CALL sys.compact_manifest(`table` => 'default.T') |
| alter_view_dialect 修改视图方言 |
To alter view dialect. Arguments:
用于修改视图方言。参数: view: 目标视图标识符。不能为空。 action: 定义变更操作,如:添加、更新、删除。不能为空。 引擎:当引擎不是 Spark 时需要定义它。 查询:当操作是添加或更新时,需要为方言查询,且不能为空。 |
-- add dialect in the view -- 在视图中添加方言 CALL sys.alter_view_dialect('view_identifier', 'add', 'spark', 'query') 调用 sys.alter_view_dialect('view_identifier', 'add', 'spark', 'query') CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 'add', `query` => 'query') -- update dialect in the view -- 更新视图中的方言 CALL sys.alter_view_dialect('view_identifier', 'update', 'spark', 'query') CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 'update', `query` => 'query') -- drop dialect in the view -- 在视图中删除方言 CALL sys.alter_view_dialect('view_identifier', 'drop', 'spark') CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 'drop') |