这是用户在 2025-7-25 18:34 为 https://paimon.apache.org/docs/1.1/spark/procedures/ 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?
Procedures

Procedures #  流程 #

This section introduce all available spark procedures about paimon.
本节介绍所有可用的关于 Paimon 的 Spark 存储过程。

Procedure Name  流程名称 Explanation  说明 Example  示例
compact To compact files. Argument:
用于压缩文件。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • partitions: partition filter. the comma (",") represents "AND", the semicolon (";") represents "OR". If you want to compact one partition with date=01 and day=01, you need to write 'date=01,day=01'. Left empty for all partitions. (Can't be used together with "where")
    partitions: 分区过滤。逗号(",")表示"AND",分号(";")表示"OR"。如果您想合并分区 date=01 和 day=01,需要写成 'date=01,day=01'。留空表示所有分区。(不能与 "where" 一起使用)
  • where: partition predicate. Left empty for all partitions. (Can't be used together with "partitions")
    where: 分区谓词。留空表示所有分区。(不能与 "partitions" 一起使用)
  • order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.
    order_strategy: 'order'、'zorder'、'hilbert' 或 'none'。留空表示 'none'。
  • order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.
    排序列:需要对这些列进行排序。如果'排序策略'为'none',则留空。
  • partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.
    分区空闲时间:这个参数用于对那些在'分区空闲时间'内没有接收到任何新数据的分区进行完全合并。并且只有这些分区会被合并。这个参数不能与顺序合并一起使用。
  • compact_strategy: this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.
    合并策略:这个参数决定了如何选择要合并的文件,默认值由运行时执行模式决定。'完全'策略只支持批处理模式。所有文件都会被选中用于合并。'次要'策略:根据指定条件选择需要合并的文件集。
  • SET spark.sql.shuffle.partitions=10; --set the compact parallelism
    SET spark.sql.shuffle.partitions=10; --设置合并并行度


    CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b')

    CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b')

    CALL sys.compact(table => 'T', partition_idle_time => '60s')

    CALL sys.compact(table => 'T', compact_strategy => 'minor')

    expire_snapshots  过期快照 To expire snapshots. Argument:
    过期快照。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • retain_max: the maximum number of completed snapshots to retain.
    retain_max:保留的最大已完成快照数量。
  • retain_min: the minimum number of completed snapshots to retain.
    retain_min: 保留的完成快照的最小数量。
  • older_than: timestamp before which snapshots will be removed.
    older_than: 在此时间戳之前,快照将被删除。
  • max_deletes: the maximum number of snapshots that can be deleted at once.
    max_deletes: 一次性可以删除的最大快照数量。
  • CALL sys.expire_snapshots(table => 'default.T', retain_max => 10)
    expire_partitions  过期分区 To expire partitions. Argument:
    过期分区。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.
    expiration_time: 分区的过期间隔。如果分区的生命周期超过此值,则会被过期。分区时间从分区值中提取。
  • timestamp_formatter: the formatter to format timestamp from string.
    timestamp_formatter: 用于将字符串格式化为时间戳的格式化器。
  • timestamp_pattern: the pattern to get a timestamp from partitions.
    timestamp_pattern: 用于从分区中获取时间戳的模式。
  • expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.
    expire_strategy: 指定分区过期策略,可能值:'values-time' 或 'update-time',默认为 'values-time'。
  • max_expires: The maximum of limited expired partitions, it is optional.
    max_expires: 有限过期分区的最大值,可选。
  • CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time')
    调用 sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time')
    create_tag  创建标签 To create a tag based on given snapshot. Arguments:
    根据给定的快照创建标签。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • tag: name of the new tag. Cannot be empty.
    tag: 新标签的名称。不能为空。
  • snapshot(Long): id of the snapshot which the new tag is based on.
    snapshot(Long): 新标签基于的快照的 id。
  • time_retained: The maximum time retained for newly created tags.
    time_retained: 新创建标签的最大保留时间。
  • -- based on snapshot 10 with 1d
    -- 基于快照 10,保留 1 天

    CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d')

    -- based on the latest snapshot
    -- 基于最新快照

    CALL sys.create_tag(table => 'default.T', tag => 'my_tag')
    create_tag_from_timestamp To create a tag based on given timestamp. Arguments:
    根据给定的时间戳创建标签。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • tag: name of the new tag.
    tag:新标签的名称。
  • timestamp (Long): Find the first snapshot whose commit-time is greater than this timestamp.
    timestamp (Long):查找第一个提交时间大于此时间戳的快照。
  • time_retained : The maximum time retained for newly created tags.
    保留时间 : 新创建标签的最大保留时间。
  • CALL sys.create_tag_from_timestamp(`table` => 'default.T', `tag` => 'my_tag', `timestamp` => 1724404318750, time_retained => '1 d')
    rename_tag  重命名标签 Rename a tag with a new tag name. Arguments:
    使用新的标签名重命名标签。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • tag: name of the tag. Cannot be empty.
    tag: 标签名称。不能为空。
  • target_tag: the new tag name to rename. Cannot be empty.
    target_tag: 要重命名的新的标签名称。不能为空。
  • CALL sys.rename_tag(table => 'default.T', tag => 'tag1', target_tag => 'tag2')
    replace_tag Replace an existing tag with new tag info. Arguments:
    用新的标签信息替换现有的标签。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • tag: name of the existed tag. Cannot be empty.
    tag: 已存在的标签名称。不能为空。
  • snapshot(Long): id of the snapshot which the tag is based on, it is optional.
    快照(Long):基于该标签的快照 ID,可选。
  • time_retained: The maximum time retained for the existing tag, it is optional.
    保留时间:现有标签的最大保留时间,可选。
  • CALL sys.replace_tag(table => 'default.T', tag_name => 'tag1', snapshot => 10, time_retained => '1 d')
    delete_tag  删除标签 To delete a tag. Arguments:
    删除标签。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • tag: name of the tag to be deleted. If you specify multiple tags, delimiter is ','.
    tag: 要删除的标签名称。如果指定多个标签,分隔符为','。
  • CALL sys.delete_tag(table => 'default.T', tag => 'my_tag')
    expire_tags To expire tags by time. Arguments:
    通过时间来过期标签。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • older_than: tagCreateTime before which tags will be removed.
    older_than: 在此时间之前创建的标签将被移除。
  • CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00')
    rollback  回滚 To rollback to a specific version of target table, note version/snapshot/tag must set one of them. Argument:
    要回滚到目标表的一个特定版本,必须设置版本/快照/标签中的一个。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • version: id of the snapshot or name of tag that will roll back to, version would be Deprecated.
    版本: 将回滚到的快照的 ID 或标签的名称,版本将被弃用。
  • snapshot: snapshot that will roll back to.
    快照: 将回滚到的快照。
  • tag: tag that will roll back to.
    标签: 将回滚到的标签。
  • CALL sys.rollback(table => 'default.T', version => 'my_tag')

    CALL sys.rollback(table => 'default.T', version => 10)

    CALL sys.rollback(table => 'default.T', tag => 'tag1') CALL sys.rollback(table => 'default.T', snapshot => 2)
    rollback_to_timestamp To rollback to the snapshot which earlier or equal than timestamp. Argument:
    回滚到早于或等于 timestamp 的快照。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • timestamp: roll back to the snapshot which earlier or equal than timestamp.
    timestamp: 回滚到早于或等于 timestamp 的快照。
  • CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 1730292023000)

    rollback_to_watermark To rollback to the snapshot which earlier or equal than watermark. Argument:
    回滚到早于或等于水位线的快照。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • watermark: roll back to the snapshot which earlier or equal than watermark.
    watermark: 回滚到早于或等于水位线的快照。
  • CALL sys.rollback_to_watermark(table => 'default.T', watermark => 1730292023000)

    purge_files  清除文件 To clear table with purge files. Argument:
    清理包含 purge 文件的表。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • CALL sys.purge_files(table => 'default.T')
    migrate_database Migrate all hive tables in database to paimon tables. Arguments:
    将数据库中的所有 Hive 表迁移到 Paimon 表。参数:
  • source_type: the origin database's type to be migrated, such as hive. Cannot be empty.
    source_type: 要迁移的原始数据库类型,例如 hive。不能为空。
  • database: name of the origin database to be migrated. Cannot be empty.
    database: 要迁移的原始数据库名称。不能为空。
  • options: the table options of the paimon table to migrate.
    选项:要迁移的 Paimon 表的表选项。
  • options_map: Options map for adding key-value options which is a map.
    options_map:用于添加键值对选项的选项映射,是一个映射。
  • parallelism: the parallelism for migrate process, default is core numbers of machine.
    并行度:迁移过程的并行度,默认为机器的核心数。
  • CALL sys.migrate_database(source_type => 'hive', database => 'db01', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6)
    migrate_table Migrate hive table to a paimon table. Arguments:
    将 Hive 表迁移到 Paimon 表。参数:
  • source_type: the origin table's type to be migrated, such as hive. Cannot be empty.
    source_type: 要迁移的原始表的类型,例如 hive。不能为空。
  • table: name of the origin table to be migrated. Cannot be empty.
    table: 要迁移的原始表的名称。不能为空。
  • options: the table options of the paimon table to migrate.
    选项:要迁移的 Paimon 表的表选项。
  • target_table: name of the target paimon table to migrate. If not set would keep the same name with origin table
    目标表:要迁移的目标 Paimon 表名称。如果未设置,将保留与原表相同的名称。
  • delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true
    删除原表:如果设置了目标表,可以设置 delete_origin 来决定是否在迁移后从 HMS 删除原表元数据。默认为 true。
  • options_map: Options map for adding key-value options which is a map.
    options_map:用于添加键值对选项的选项映射,是一个映射。
  • parallelism: the parallelism for migrate process, default is core numbers of machine.
    并行度:迁移过程的并行度,默认为机器的核心数。
  • CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6)
    remove_orphan_files  删除孤儿文件 To remove the orphan data files and metadata files. Arguments:
    用于删除孤儿数据文件和元数据文件。参数:
  • table: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.
    table: 目标表标识符。不能为空,可以使用 database_name.* 清理整个数据库。
  • older_than: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.
    older_than: 为了避免删除新写入的文件,此过程默认仅删除超过 1 天的孤儿文件。此参数可以修改时间间隔。
  • dry_run: when true, view only orphan files, don't actually remove files. Default is false.
    dry_run: 当为 true 时,仅查看孤儿文件,不实际删除文件。默认为 false。
  • parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.
    parallelism: 最大并发删除文件数量。默认为 Java 虚拟机可用的处理器数量。
  • mode: The mode of remove orphan clean procedure (local or distributed) . By default is distributed.
    模式:删除孤儿清理过程的模式(本地或分布式)。默认为分布式。
  • CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')
    调用 sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')


    CALL sys.remove_orphan_files(table => 'default.*', older_than => '2023-10-31 12:00:00')
    调用 sys.remove_orphan_files(table => 'default.*', older_than => '2023-10-31 12:00:00')


    CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)
    调用 sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)


    CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5')
    调用 sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5')


    CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local')
    调用 sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local')
    remove_unexisting_files  删除不存在的文件 Procedure to remove unexisting data files from manifest entries. See Java docs for detailed use cases. Arguments:
    从清单条目中删除不存在的数据文件的步骤。有关详细用例,请参阅 Java 文档。参数:
  • table: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.
    table: 目标表标识符。不能为空,可以使用 database_name.* 清理整个数据库。
  • dry_run (optional): only check what files will be removed, but not really remove them. Default is false.
    dry_run (可选): 仅检查将要删除的文件,但不会真正删除它们。默认为 false。
  • parallelism (optional): number of parallelisms to check files in the manifests.
    parallelism (可选): 在清单中检查文件时使用的并行度数。

  • Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs.
    请注意,使用此步骤的用户需自行承担风险,如果在 Java 文档中列出的用例之外使用,可能会导致数据丢失。
    -- remove unexisting data files in the table `mydb.myt`
    -- 删除表 `mydb.myt` 中不存在的数据文件

    CALL sys.remove_unexisting_files(table => 'mydb.myt')

    -- only check what files will be removed, but not really remove them (dry run)
    -- 仅检查将要被删除的文件,但实际并不删除它们(模拟运行)

    CALL sys.remove_unexisting_files(table => 'mydb.myt', dry_run = true)
    repair  修复 Synchronize information from the file system to Metastore. Argument:
    将文件系统中的信息同步到 Metastore。参数:
  • database_or_table: empty or the target database name or the target table identifier, if you specify multiple tags, delimiter is ','
    数据库或表:可以为空,或目标数据库名称,或目标表标识符,如果指定多个标签,分隔符为','
  • CALL sys.repair('test_db.T')
    调用 sys.repair('test_db.T')


    CALL sys.repair('test_db.T,test_db01,test_db.T2')
    调用 sys.repair('test_db.T,test_db01,test_db.T2')
    create_branch  创建分支 To merge a branch to main branch. Arguments:
    将分支合并到主分支。参数:
  • table: the target table identifier or branch identifier. Cannot be empty.
    table: 目标表标识符或分支标识符。不能为空。
  • branch: name of the branch to be merged.
    branch: 要合并的分支名称。
  • tag: name of the new tag. Cannot be empty.
    tag: 新标签的名称。不能为空。
  • CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch')
    调用 sys.create_branch(table => 'test_db.T', branch => 'test_branch')


    CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag')
    调用 sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag')


    CALL sys.create_branch(table => 'test_db.T$branch_existBranchName', branch => 'test_branch', tag => 'my_tag')

    delete_branch  删除分支 To merge a branch to main branch. Arguments:
    将分支合并到主分支。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • branch: name of the branch to be merged. If you specify multiple branches, delimiter is ','.
    分支:要合并的分支的名称。如果您指定多个分支,分隔符为','。
  • CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch')
    fast_forward  快速前进 To fast_forward a branch to main branch. Arguments:
    将分支快速前移到主分支。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • branch: name of the branch to be merged.
    branch: 要合并的分支名称。
  • CALL sys.fast_forward(table => 'test_db.T', branch => 'test_branch')
    reset_consumer  重置消费者 To reset or delete consumer. Arguments:
    用于重置或删除消费者。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • consumerId: consumer to be reset or deleted.
    consumerId: 要重置或删除的消费者。
  • nextSnapshotId (Long): the new next snapshot id of the consumer.
    nextSnapshotId (Long): 消费者的新下一个快照 ID。
  • -- reset the new next snapshot id in the consumer
    -- 重置消费者中的下一个快照 ID

    CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid', nextSnapshotId => 10)

    -- delete consumer   -- 删除消费者
    CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
    clear_consumers To clear consumers. Arguments:
    清除消费者。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • includingConsumers: consumers to be cleared.
    includingConsumers: 要清除的消费者。
  • excludingConsumers: consumers which not to be cleared.
    排除消费者:不需要清除的消费者。
  • -- clear all consumers in the table
    -- 清除表中的所有消费者

    CALL sys.clear_consumers(table => 'default.T')
    调用 sys.clear_consumers(table => 'default.T')


    -- clear some consumers in the table (accept regular expression)
    -- 清除表中的某些消费者(接受正则表达式)

    CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*')
    调用 sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*')


    -- clear all consumers except excludingConsumers in the table (accept regular expression)
    -- 清除表中除 excludingConsumers 以外的所有消费者(接受正则表达式)

    CALL sys.clear_consumers(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*')
    调用 sys.clear_consumers(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*')


    -- clear all consumers with includingConsumers and excludingConsumers (accept regular expression)
    -- 使用 includingConsumers 和 excludingConsumers 清除所有消费者(接受正则表达式)

    CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*')
    调用 sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*')
    mark_partition_done To mark partition to be done. Arguments:
    标记要完成的分区。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • partitions: partitions need to be mark done, If you specify multiple partitions, delimiter is ';'.
    partitions: 需要标记完成的分区,如果指定多个分区,分隔符是';'。
  • -- mark single partition done
    -- 标记单个分区完成

    CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01')
    CALL sys.mark_partition_done(table => 'default.T', partitions => 'day=2024-07-01')


    -- mark multiple partitions done
    -- 标记多个分区完成

    CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01;day=2024-07-02')
    调用 sys.mark_partition_done(table => 'default.T', partitions => 'day=2024-07-01;day=2024-07-02')
    refresh_object_table To refresh_object_table a object table. Arguments:
    刷新对象表。参数:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • CALL sys.refresh_object_table('default.T')
    compact_manifest To compact_manifest the manifests. Arguments:
  • table: the target table identifier. Cannot be empty.
    table: 目标表标识符。不能为空。
  • CALL sys.compact_manifest(`table` => 'default.T')
    alter_view_dialect  修改视图方言 To alter view dialect. Arguments:
    用于修改视图方言。参数:
  • view: the target view identifier. Cannot be empty.
    view: 目标视图标识符。不能为空。
  • action: define change action like: add, update, drop. Cannot be empty.
    action: 定义变更操作,如:添加、更新、删除。不能为空。
  • engine: when engine which is not spark need define it.
    引擎:当引擎不是 Spark 时需要定义它。
  • query: query for the dialect when action is add and update it couldn't be empty.
    查询:当操作是添加或更新时,需要为方言查询,且不能为空。
  • -- add dialect in the view
    -- 在视图中添加方言

    CALL sys.alter_view_dialect('view_identifier', 'add', 'spark', 'query')
    调用 sys.alter_view_dialect('view_identifier', 'add', 'spark', 'query')

    CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 'add', `query` => 'query')

    -- update dialect in the view
    -- 更新视图中的方言

    CALL sys.alter_view_dialect('view_identifier', 'update', 'spark', 'query')
    CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 'update', `query` => 'query')

    -- drop dialect in the view
    -- 在视图中删除方言

    CALL sys.alter_view_dialect('view_identifier', 'drop', 'spark')
    CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 'drop')

    Edit This Page
    Copyright © 2024 The Apache Software Foundation. Apache Paimon, Paimon, and its feather logo are trademarks of The Apache Software Foundation.