You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Paimon Catalog
Paimon 使用 EMR Hive 管理元数据
复制全文
下载 pdf
Paimon 使用 EMR Hive 管理元数据

1. 概述

Apache Paimon 是一种新型的流式数据湖存储技术,结合了 LSM-Tree 结构和湖格式,提供了高吞吐、低延迟的数据摄入、完整的流式变更订阅以及高效的 OLAP 查询能力。
本手册将指导您如何使用 Flink 引擎进行 Paimon 的开发任务,并且利用 EMR-Hive 统一管理 Paimon 湖的元数据,可以方便将 Paimon 充分融入已有的 EMR 大数据组件当中。

2. 环境准备

确保您已经:

  1. 开通了流式计算 Flink 版产品,并能够在作业开发中创建 Flink SQL 任务。
  2. 已经在资源管理 - 资源池功能模块购买了按量或者包年包月资源池,可以正常提交 Flink 任务。
  3. 开通了 E-MapReduce 产品服务,可以参考 EMR on ECS 集群创建文档
    1. 注意: EMR 集群必须安装 Paimon 组件,否则无法正常使用 Hive 和 Spark 访问 Paimon 数据

2.2 产品要求

  • Flink 版本需 >= 1.16,具体支持功能如下:
    • Flink 1.16
      • ✅ 支持 Kerberos 认证
      • 内置 Paimon 0.6 版本
    • Flink 1.17
      • ❌ 暂不支持 Kerberos 认证
      • 内置 Paimon 0.8.2 字节加强版功能,支持如存储过程等语法

3. 集群依赖文件准备

3.1 文件清单

前置条件:Flink 需要访问 E-MapReduce 集群,我们需要获取以下文件

  • hive-site.xml
  • hdfs-site.xml
  • keytab 文件(可选, kerberos 需要)
  • Krb5.conf(可选,kerberos 需要)

3.2 登录 EMR 的 Master 节点

配置文件的获取方式如下,在 E-MapReduce 产品集群 - 集群详情 - 节点管理 - MasterGroup - 选择第一台 Master 节点:
Image
通过控制台远程连接或者通过公网 SSH 连接指定 Master 服务器,通过 Terminal 控制台进行访问

注意:如果通过公网 SSH 连接,则需要提前为 ECS 绑定公网 IP

3.3 hive-site.xml

可以通过如下命令行获取:

# hive conf 一般在以下这个目录
cd /etc/emr/hive/conf
# 没有这个目录的话也可以尝试从环境变量中获取
env | grep HIVE_CONF_DIR

将下图中的 hive-site.xml 通过 SCP 命令拷贝到本地,或者直接 cat 文本,获取文件内容:
Image

3.4 hdfs-site.xml

hdfs-site.xml操作同 hive-site.xml

另外由于有些 Hive 表的 Format 会涉及到 YARN 的类,需要视情况将 yarn-site.xml 中的配置项 yarn.resourcemanager.principal 拷贝出来添加到 hdfs-site.xml 中。
内容类似如下:

<property>
    <name>yarn.resourcemanager.principal</name>
    <value>yarn/_HOST@6C5B5406ADDF347BB8C9.EMR.COM</value>
</property>

3.5 keytab (可选,kerberos 需要)

keytab 在 EMR 集群概览页面,需要创建一个用户,在用户管理这边,直接下载即可。

3.6 krb5.conf(可选,kerberos 需要)

krb5.conf 在 /etc/krb5.conf 根目录,直接scp拷贝或者cat文本copy出来

操作路径:作业开发 - Flink SQL 作业 - 创建作业。
参考文档开发 Flink SQL 任务

Image

4.2 任务上传依赖文件

  1. 操作路径:控制台 - 进入项目 - 作业开发 - 创建 SQL 任务 - 参数配置(右侧按钮) - 依赖文件。
  2. 准备依赖文件
    1. hive-site.xml
    2. hdfs-site.xml
    3. 下载 flink-sql hive connector
flink-sql-connector-hive-3.1.3_2.12-volc-1.0-SNAPSHOT.jar
未知大小
  1. 上传依赖文件:如下图,执行上传文件资源的操作(注意文件名必须完全一致,不可以修改):

Image

4.3 设置任务自定义参数

  1. 操作路径:控制台 - 进入项目 - 作业开发 - 创建 SQL 任务 - 参数配置(右侧按钮) - 自定义参数。
  2. 准备参数

为了解析 EMR-Hive 的域名,需要设置以下两个自定义参数:

containerized.master.env.ENV_SEARCH_DOMAIN:emr-volces.com
containerized.taskmanager.env.ENV_SEARCH_DOMAIN:emr-volces.com

配置结果如下
Image

我们可以通过如下 SQL 创建 Hive Catalog:

CREATE CATALOG paimon_test1 WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    -- uri 填写 HMS thrift 的内网地址如下,
    'uri' = 'thrift://master-1-1.emr-0fc031e89a242e96662b.cn-beijing.emr-volces.com:9083',
    'hive-conf-dir' = '/opt/tiger/workdir',
    'hadoop-conf-dir' = '/opt/tiger/workdir',
    'warehouse' = 'tos://<BUCKET_NAME>/paimon_emr_test'
);

WITH 参数的意义如下:

  • type:选择 paimon 类型 Catalog。
  • metastore:选择 hive 的方式进行元数据管理。
  • uri:Hive 的 thrift 接口地址
  • hive-conf-dir:这里指定 hive-site.xml 的文件路径。注意:要固定填写 /opt/tiger/workdir
  • hadoop-conf-dir:这里指定 hdfs-site.xml 的文件路径。注意:要固定填写 /opt/tiger/workdir
  • warehouse:和 Hive Catalog 中的数据目录存储位置保持一致。

其中 thrift 接口地址可以在 EMR Hive 服务地址查看,服务列表 - Hive - 服务参数:

Image

4.5 创建 Database

在 Catalog 中创建一个 Database,用于组织和管理表。

CREATE DATABASE IF NOT EXISTS ${catalog_name}.${db_name};
  • ${db_name}:Database 的名称,自定义。

4.6 创建非分区表

在 Database 中创建表,定义表结构和相关配置。以下是一个主键表的非分区表的示例

CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` (
    word varchar, -- 示例字段
    cnt bigint,
    PRIMARY KEY (word) NOT ENFORCED
) WITH (
    'bucket' = '4',  -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据
    'changelog-producer' = 'input', -- 产生 changelog,用于下游流读
);
  • ${table_name}:表的名称,自定义。
  • bucket:分桶数量,推荐单个 bucket 存储 1GB 左右数据。
  • changelog-producer
    • 设置为 input,表示产生根据上游新增数据,用于下游流式读取。具体参考 Changelog 产出机制进行详细选择。如果不需要 changelog,则使用 none选项以节省存储和写入资源。

4.7 创建分区表

在 Database 中创建表,定义表结构和相关配置。以下是一个主键表的分区表的示例:

CREATE TABLE
  IF NOT EXISTS paimon_test1.test_db5.test_table5 (
    word varchar, -- 示例字段
    cnt bigint,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, word) NOT ENFORCED -- 一般分区主键表的主键字段必须包含分区字段
  ) PARTITIONED BY (dt, hh)
WITH
  (
    'bucket' = '4', -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据
    'changelog-producer' = 'none', -- 产生 changelog,用于下游流读
    'metastore.partitioned-table' = 'true' -- 确定是否将分区信息同步到 Hive 元数据管理
  );
  • metastore.partitioned-table:开启后将分区信息会同步到 Hive 元数据管理,默认 false不开启。

5. 数据写入示例

以下示例展示了如何使用 Flink SQL 将数据写入 Paimon 表。

5.1 创建数据源表

首先,创建一个数据源表,用于生成模拟数据。

CREATE TABLE doc_source (word varchar)
WITH
  (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.word.length' = '30'
  );
  • connector:使用 datagen 连接器生成模拟数据。
  • rows-per-second:每秒生成的行数。
  • fields.word.length:生成字段 word 的长度。

5.2 写入数据到 Paimon 表

将数据源表中的数据写入 Paimon 表。

INSERT INTO `paimon_test`.`default`.`doc_result`
select
  t.word,
  count(1)
from
  doc_source t
GROUP BY
  t.word;
  • paimon_test:Catalog 名称。
  • default:Database 名称。
  • doc_result:目标表名称。

5.3 开启 Checkpoint

Paimon 的快照提交机制与 Flink 的 Checkpoint 紧密关联,因此合理配置 Checkpoint 参数对于确保下游流式或批处理任务能够及时感知上游数据变化至关重要。以 Flink 默认的 Checkpoint 间隔为例,若设置为 5 分钟,则意味着每 5 分钟完成一次 Checkpoint 后,下游任务才能观察到最新的数据插入、更新或删除操作。因此,根据业务需求调整 Checkpoint 间隔是优化数据可见性和处理时效性的关键。

注意:可以根据数据实时性要求降低或者提高 Checkpoint 的间隔。一般建议 Checkpoint 时间间隔不低于 1 分钟。如果数据延迟严重、数据更新频繁、数据新鲜度要求不高等场景,建议适当提升 Checkpoint 时间到 5 分钟或者更长。

Checkpoint 开启如下图,在作业开发 - 配置参数 - Checkpoint 配置 - Checkpoint 间隔进行设置。
Image

6. 上线任务

6.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。请注意:如果遇到 Caused by: org.apache.thrift.transport.TTransportException此类问题,可以参考 8.1 验证 SQL 时报错的描述。可以先暂时忽略此问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适的资源池和跳过上线前深度检查后。可以上线任务。
Image
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

6.2 确认任务执行成功

可以进入 TOS 桶管理界面(需要账户有相关访问权限),可以看到已经在数据表的 TOS 目录下,产生了相关的数据文件。则说明数据写入成功。

Image

7. 数据读取示例

我们以上已经确认了数据写入成功,以下示例展示了如何使用 Flink SQL 从 Paimon 表中流/批式读取数据。进一步可以确认数据准确性。

7.1 创建打印表

创建一个打印表,用于输出读取的数据。

CREATE TABLE `print_table` (
    word varchar,
    cnt bigint
) WITH (
    'connector' = 'print'
);
  • connector:使用 print 连接器将数据打印到控制台。

7.2 读取 Paimon 表数据

从 Paimon 表中读取数据并写入打印表。

INSERT INTO `print_table`
SELECT * FROM `paimon_test`.`default`.`doc_result`;
  • paimon_test:Catalog 名称。
  • default:Database 名称。
  • doc_result:源表名称。

7.3 使用 Hive 查看数据

可以通过 Hive 命令行确认数据是否已经正确写入

root@master-1-1(192.168.13.162):~$ hive
hive> show databases;
OK
default
...
test_db1
Time taken: 0.583 seconds, Fetched: 5 row(s)
hive> use test_db1;
OK
Time taken: 0.031 seconds
hive> show tables;
OK
test_table
Time taken: 0.033 seconds, Fetched: 1 row(s)
hive> select * from test_table limit 10;
...

7.4 使用 EMR Spark 查询 Paimon 数据

参考 Spark 访问 Paimon ,通过 Spark 引擎对 Paimon 的湖数据进行批式处理。
注意:用Spark创建HMS类型的Catalog的方式如下:

# 注意将 tos 路径修改成你的测试路径
spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=hive \
    --conf spark.sql.catalog.paimon.warehouse=tos://<BUCKET_NAME>/paimon_emr_test \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

接着可以首先 use paimon ,之后就可以使用 SQL 访问 Paimon 数据表了:

spark-sql (default)> use paimon;
spark-sql (default)> show databases;
default
...
test_db1
Time taken: 0.53 seconds, Fetched 5 row(s)
spark-sql (default)> use test_db1;
Time taken: 0.288 seconds
spark-sql (test_db1)> select * from test_table limit 1;
...

8. 常见问题

8.1 验证 SQL 时报错

如果在验证 SQL 的时候(点击验证按钮,或者上线时候自动检查 SQL)报错如下,形如 Caused by: org.apache.thrift.transport.TTransportException此类错误,说明当前连接 Hive 接口不同。请不要慌张,当前版本暂时无法在验证 SQL 阶段访问 Hive 元数据。

org.apache.flink.table.api.ValidationException: Unable to create catalog 'paimon_test1'.

Catalog options are:
'hive-conf-dir'='/opt/tiger/workdir'
'metastore'='hive'
'type'='paimon'
'uri'='thrift://lakeformation.xxx.cn-beijing.ivolces.com:48869'
'warehouse'='tos://flink-cwz-paimon/paimon_test1'
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:511)
        ...
Caused by: java.lang.RuntimeException: Failed to determine if database default exists
        at org.apache.paimon.hive.HiveCatalog.databaseExistsImpl(HiveCatalog.java:223)
        ... 9 more
Caused by: org.apache.thrift.transport.TTransportException
        at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
        ... 15 more

解决方案:任务上线过程中选择更多设置 - 跳过上线前的深度检查。
Image

8.2 运行时访问 Hive 接口失败

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 Hive 的接口无法访问

Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException

Image
可能原因

  1. 没有上传 hive-site.xml 文件,或者文件名不正确
    1. 解决方法:检查 hive-site.xml 是否成功上传到依赖文件中,并且文件名必须完全符合要求。

8.4 任务无法启动,报 Hive 数据库不存在

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且运行事件中发现如下报错,org.apache.flink.table.catalog exceptions.DatabaseNotExistException
Image
解决办法:这个原因是因为在 Flink 任务提交阶段,静态解析 SQL 的时候,当前不会去连接 Hive 获取已有的数据库,所以必须在 SQL 中显式写明建库语句。在 SQL 代码中加入以下语句:

CREATE TABLE IF NOT EXISTS test_db;

重新提交任务之后,就可以恢复正常。

8.5 hive-site.xml 格式不正确

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且日志中发现如下报错,com.ctc.wstx.exc.WstxParsingException

Caused by: java.lang.RuntimeException: com.ctc.wstx.exc.WstxParsingException: Illegal processing instruction target ("xml"); xml (case insensitive) is reserved by the specs.
 at [row,col {unknown-source}]: [2,5]

Image
解决办法:这个原因是因为对接 Hive 元数据中依赖的 hive-site.xml 的格式可能不正确。需要检查 xml 文件是否符合语法规范。常见 xml 格式问题可以参考:

  1. 文件开头的内容必须是 <?xml ...,在尖括号前方不能包含任何不可见字符、空格、空行等。
  2. xml 文档内必须包含合法的标签,比如在内容中不能出现 <>&等特殊字符,如果出现需要用 CDATA 语法标记为普通文本。

8.6 删除 Hive Catalog 的库表应该怎么操作

如果需要删除 Hive 元数据中的库表,需要同时手动删除 Hive 元数据中的库表信息,以及 TOS 目录上的数据库表的文件路径。如果仅仅删除 Hive 元数据或者仅仅删除 Hive 目录的数据都会造成数据不一致。报以下类似的错误,导致任务失败:

  1. 仅删除 TOS 文件路径,但未删除 Hive 元数据

Image

  1. 仅删除 Hive 元数据,但未删除 TOS 文件数据

Image
解决方案:判断属于哪一种情况,将已有的 Hive 元数据和 TOS 文件数据都删除后才能保证数据库表继续正常写入。

8.7 Hive 中数据表是非分区表,TOS 文件系统上却有分区

解决方案:Paimon 不会自动将表的分区信息同步到 Hive 元数据管理。如果需要在 Hive 元数据管理看到数据表的分区字段,需要在建表语句中增加如下 WITH 参数:

'metastore.partitioned-table' = 'true' -- 确定是否将分区信息同步到 Hive 元数据管理

需要注意的是:因为分区字段无法动态增加,增加参数后,需要将原有的数据表清掉(包括 Hive 元数据和 TOS 的数据文件),然后重新创建。

最近更新时间:2025.08.28 10:49:56
这个页面对您有帮助吗?
有用
有用
无用
无用