本文主要面向需要使用 Paimon 进行实时数据湖开发的工程师,为工程师提供表设计和参数调优的建议,包含:
merge-engine、changelog-producer、metastore.partitioned-table、TTL、快照保留、compaction 相关你可以直接复制本手册中的 SQL 到 Flink SQL 控制台,再替换占位符为真实值。
优先回答三个问题:
对应推荐如下:
场景 | 推荐 | 推荐 | 备注 |
|---|---|---|---|
只追加,不更新(Append) | 不设置 |
| 成本最低;常用于明细落地 |
根据主键更新(Upsert) |
|
| 只落地不流读可用 |
只更新部分列(宽表) |
|
| 必须有主键;可配置字段级合并规则 |
实时聚合统计 |
|
| 配置聚合函数与 retract 行为 |
Paimon 推荐使用三段式访问路径:${catalog}.${db}.${table}。你需要先创建 Catalog,然后创建 Database,再创建 Table。
CREATE CATALOG paimon_catalog WITH ( 'type' = 'paimon', 'warehouse' = 'tos://<tos-bucket>/paimon/warehouse' ); USE CATALOG paimon_catalog;
CREATE CATALOG paimon_las_catalog WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'is-las' = 'true', 'hive.client.las.region.name' = '<region>', 'hive.metastore.uris' = 'thrift://lakeformation.las.<region>.ivolces.com:48869', 'hive.hms.client.is.public.cloud' = 'true', 'hive.client.las.ak' = '<ak>', 'hive.client.las.sk' = '<sk>', 'catalog.properties.metastore.catalog.default' = '<las-catalog>', 'warehouse' = 'tos://<tos-bucket>/paimon/warehouse' ); USE CATALOG paimon_las_catalog;
说明:
使用 Flink 平台提供的数据目录功能,则不需要在 SQL 中使用创建 Catalog 的语法。
直接使用三段式对 Paimon 数据表进行访问,或者直接 USE Catalog 即可。
USE CATALOG paimon_las_catalog;
CREATE DATABASE IF NOT EXISTS test_db; USE test_db;
说明:
DatabaseNotExistException,通常是因为未显式创建 DB;建议始终在 SQL 中写 CREATE DATABASE IF NOT EXISTS ...。适用:
推荐:
changelog-producer = nonedt(按天);吞吐更大可 dt + hh模板:
CREATE TABLE IF NOT EXISTS append_table ( id BIGINT, name STRING, amount DECIMAL(10, 2), event_time TIMESTAMP(3), dt STRING ) PARTITIONED BY (dt) WITH ( 'changelog-producer' = 'none' );
适用:
推荐:
PRIMARY KEY (...) NOT ENFORCEDbucketchangelog-producer:
lookup (建议) 或 full-compactioninput模板:
CREATE TABLE IF NOT EXISTS primary_key_table ( word STRING, cnt BIGINT, PRIMARY KEY (word) NOT ENFORCED ) WITH ( 'bucket' = '4', 'changelog-producer' = 'input' );
适用:
硬性建议:
metastore.partitioned-table = true模板:
CREATE TABLE IF NOT EXISTS primary_key_partitioned ( word STRING, cnt BIGINT, dt STRING, hh STRING, PRIMARY KEY (dt, hh, word) NOT ENFORCED ) PARTITIONED BY (dt, hh) WITH ( 'bucket' = '4', 'changelog-producer' = 'input', 'metastore.partitioned-table' = 'true' );
适用:
推荐组合:
merge-engine = partial-updatechangelog-producer = lookup(或 full-compaction,非必要请优先选择 lookup)模板:
CREATE TABLE IF NOT EXISTS partial_update_table ( uid INT, username STRING, reg_time TIMESTAMP(3), logintypes ARRAY<ROW<logintype STRING, bind_time TIMESTAMP(3)>>, last_bind_time TIMESTAMP(3), vip_is_valid BOOLEAN, vip_start_time TIMESTAMP(3), vip_end_time TIMESTAMP(3), PRIMARY KEY (uid) NOT ENFORCED ) WITH ( 'bucket' = '4', 'merge-engine' = 'partial-update', 'changelog-producer' = 'lookup', 'fields.last_bind_time.sequence-group' = 'logintypes', 'fields.logintypes.aggregate-function' = 'nested_update', 'fields.logintypes.nested-key' = 'logintype' );
适用:
推荐组合:
merge-engine = aggregationchangelog-producer = lookup(或 full-compaction)aggregate-functionignore-retract模板:
CREATE TABLE IF NOT EXISTS aggregate_table ( window_start TIMESTAMP(3), window_end TIMESTAMP(3), category STRING, item_id BIGINT, total_amount DECIMAL(10, 2), cnt BIGINT, dt STRING, PRIMARY KEY (window_start, window_end, category, item_id, dt) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( 'bucket' = '4', 'changelog-producer' = 'lookup', 'merge-engine' = 'aggregation', 'fields.total_amount.aggregate-function' = 'sum', 'fields.cnt.aggregate-function' = 'sum', 'fields.total_amount.ignore-retract' = 'true', 'fields.cnt.ignore-retract' = 'true' );
常用方案:
dt STRING,常用格式 yyyy-MM-dddt STRING, hh STRING建议:
什么时候需要分桶:
bucketbucket经验建议:
1-2GBbucket:分桶数(例如 4 / 8 / 16),注意这个参数在两种表类型间有一些语义的区别。
bucket-key:分桶键
bucket,则必须设置 bucket-key。值 | 说明 | 适用 |
|---|---|---|
| 不产生 changelog | Append 表、只落地不流读 |
| 根据上游输入生成 | 下游流读但只看输入语义 |
| 查找模式 | 需要完整变更日志(常用于 upsert/partial/agg) |
| 全量压缩 | 需要完整变更日志,接受更高 compaction 成本 |
提示:
partial-update、aggregation 的流读场景通常要搭配 lookup (较为常见)或 full-compaction(不推荐使用)。'ignore-delete' = 'true'
对于非分区表,建议使用行级过期时间进行数据清理。
'record-level.expire-time' = '30 d', 'record-level.time-field' = 'update_time'
为了降低存储成本,建议设置分区过期时间。
'partition.expiration-strategy' = 'values-time', 'partition.expiration-time' = '7 d', 'partition.expiration-check-interval' = '1 d', 'partition.timestamp-formatter' = 'yyyy-MM-dd', 'partition.timestamp-pattern' = '$dt'
为了避免数据延迟过高导致消费不及时,建议调高快照保留时间。
'snapshot.time-retained' = '3 d', 'snapshot.expire.execution-mode' = 'async'
如果你不希望文件合并阻塞 Flink 写入 Paimon 表,那么可以使用异步文件合并的方式解耦相关过程。
'num-sorted-run.stop-trigger' = '2147483647', 'sort-spill-threshold' = '5', 'changelog-producer.lookup-wait' = 'false',
说明:compaction 参数在不同 Paimon/Flink 版本上差异较大,且很多属于“高级调优项”。建议优先使用平台默认值;确需调优时,以你当前平台的 Paimon 参数列表与官方文档为准再配置。具体 Compaction 原理和参数参考:[客户] Paimon Compaction 原理和最佳实践
当上游写入语义与下游主键约束不一致时,如果下游 Paimon 表已经设置了合理的排序键,可考虑在任务参数中关闭 upsert materialize:
table.exec.sink.upsert-materialize: NONE
CREATE TABLE kafka_source ( id BIGINT, name STRING, amount DECIMAL(10, 2), event_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = '<topic>', 'properties.bootstrap.servers' = '<brokers>', 'properties.group.id' = '<group-id>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' );
CREATE TABLE IF NOT EXISTS paimon_append_sink ( id BIGINT, name STRING, amount DECIMAL(10, 2), event_time TIMESTAMP(3), dt STRING ) PARTITIONED BY (dt) WITH ( 'changelog-producer' = 'none' );
INSERT INTO paimon_append_sink SELECT id, name, amount, event_time, DATE_FORMAT(event_time, 'yyyy-MM-dd') AS dt FROM kafka_source;
warehouse 当成表参数 warehouse 是 Catalog 级别 的属性(在 CREATE CATALOG ... WITH (...) 里配置),不是表级参数。建完 Catalog 后,通过 USE CATALOG ...; 再去建库建表即可。
connector/path/... 在当前“Catalog + Database + Table”的推荐工作流里,Paimon 表本身通常不需要写 connector='paimon'、path 之类“旧式直连口径”。正确方式是:
CREATE CATALOG ... 指定 metastore/warehouseUSE CATALOG ...CREATE DATABASE ...CREATE TABLE ... WITH (...) 里只放 Paimon 表属性(如 bucket、merge-engine、changelog-producer、字段级合并规则等)bucket Append 表通常不需要 bucket(除非你明确要提升读写并行度,或者使用 append queue 的能力)。如果需要分桶,请结合数据量与查询模式设置合理 bucket 数,并需要设置 bucket-key (1.1+ 版本强制需求)。
org.apache.thrift.transport.TTransportException 原因:
建议:
DatabaseNotExistException 原因:
处理:
CREATE DATABASE IF NOT EXISTS your_db; USE your_db;