You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Connector 参考
MongoDB
复制全文
下载 pdf
MongoDB

概述

MongoDB 连接器提供了从 MongoDB 读取数据和向 MongoDB 写入数据的能力,支持作为数据源表、维表和目标表使用。

能力

支持情况

Source(扫描读取)

有界(批)模式

Lookup Source(维表)

同步模式

Sink(写入)

批模式 / 流模式(Append & Upsert)

连接器在 DDL 中定义了主键时,使用 Upsert 语义与外部系统交换 UPDATE/DELETE 消息;未定义主键时,仅支持 Append 模式交换 INSERT 消息。

使用限制

  • MongoDB 连接器支持在 Flink 1.16-volcano 及以上引擎版本中使用。
  • 支持 MongoDB 版本为 4.0 及以上。
  • 当作为 Source 使用时,仅支持有界(批)模式读取。
  • 当作为 Lookup 维表使用时,仅支持同步查询模式。
  • 若定义了主键(PRIMARY KEY),Sink 将工作在 Upsert 模式;若未定义主键,Sink 仅工作在 Append 模式。

前置条件

  1. MongoDB 实例已给 Flink 网段添加白名单。
  2. 用于连接 MongoDB 的账号拥有目标数据库和集合的读/写权限。
  3. 若使用分区扫描策略 split-vector,需要具有 splitVector 命令权限。
  4. 若使用分区扫描策略 sharded,需要具有 config 数据库的读取权限。

DDL 定义

-- 注册 MongoDB 表 'users'
CREATE TABLE MyUserTable (
  _id STRING,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'uri' = 'mongodb://user:password@127.0.0.1:27017',
  'database' = 'my_db',
  'collection' = 'users'
);

-- 从其他表写入数据到 MongoDB 表
INSERT INTO MyUserTable
SELECT _id, name, age, status FROM T;

-- 扫描读取 MongoDB 表数据
SELECT _id, name, age, status FROM MyUserTable;

-- 作为维表进行 Temporal Join
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable._id;

WITH 参数

基本参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处固定为 mongodb

uri

(none)

String

MongoDB 连接 URI。例如:mongodb://user:password@127.0.0.1:27017

database

(none)

String

要读取或写入的 MongoDB 数据库名称。

collection

(none)

String

要读取或写入的 MongoDB 集合名称。

Source 参数

参数

是否必选

默认值

数据类型

描述

scan.fetch-size

2048

Integer

每轮从数据库获取的文档数量提示值。

scan.cursor.no-timeout

true

Boolean

是否禁用游标超时。MongoDB 默认在 10 分钟不活跃后关闭空闲游标。设置为 true 可防止游标超时。但如果处理当前批次超过 30 分钟,会话仍会过期关闭。

scan.partition.strategy

default

String

分区策略。可选值:singlesamplesplit-vectorshardeddefault。详见下文「分区扫描」章节。

scan.partition.size

64mb

MemorySize

每个分区的内存大小。

scan.partition.samples

10

Integer

采样分区策略下每个分区的采样数量。仅在 scan.partition.strategy 设置为 sample 时生效。

Lookup 维表参数

参数

是否必选

默认值

数据类型

描述

lookup.cache

NONE

Enum

维表缓存策略。可选值:NONE(不缓存)、PARTIAL(部分缓存)。

lookup.partial-cache.max-rows

(none)

Long

缓存的最大行数,超过该值后最老的行将被淘汰。需设置 lookup.cachePARTIAL

lookup.partial-cache.expire-after-write

(none)

Duration

写入缓存后每行的最大存活时间。需设置 lookup.cachePARTIAL

lookup.partial-cache.expire-after-access

(none)

Duration

访问缓存后每行的最大存活时间。需设置 lookup.cachePARTIAL

lookup.partial-cache.caching-missing-key

true

Boolean

是否缓存未命中的 key。设置为 true 时,查找不到的 key 也会缓存空结果。需设置 lookup.cachePARTIAL

lookup.max-retries

3

Integer

维表查询失败时的最大重试次数。

lookup.retry.interval

1s

Duration

维表查询失败时的重试间隔。

Sink 参数

参数

是否必选

默认值

数据类型

描述

sink.buffer-flush.max-rows

1000

Integer

每次批量请求缓冲的最大行数。

sink.buffer-flush.interval

1s

Duration

批量刷写的时间间隔。

sink.max-retries

3

Integer

写入数据库失败时的最大重试次数。

sink.retry.interval

1s

Duration

写入失败时的重试间隔。

sink.parallelism

(none)

Integer

Sink 算子的并行度。默认与上游算子并行度一致。

sink.delivery-guarantee

at-least-once

Enum

数据交付保障级别。可选值:noneat-least-once。暂不支持 exactly-once。

特性说明

主键处理

MongoDB Sink 根据是否定义了主键来决定工作模式:

  • 定义了主键:Sink 工作在 Upsert 模式,可消费 UPDATE/DELETE 消息,提供幂等写入能力。
  • 未定义主键:Sink 工作在 Append 模式,仅消费 INSERT 消息。

MongoDB 使用主键字段组合生成文档的 _id

  • 单字段主键:直接将字段值转为 BSON 值作为 _id
  • 多字段主键:将多个字段组合成 BSON Document 作为 _id。例如 PRIMARY KEY (f1, f2) NOT ENFORCED,生成的 _id 形式为 {f1: v1, f2: v2}

说明

如果 DDL 中存在 _id 字段但主键未声明为 _id,会产生歧义。建议将 _id 列作为主键,或对 _id 列进行重命名。

主键索引限制

  • MongoDB 4.2 之前,索引条目总大小(含结构开销)不能超过 1024 字节。
  • MongoDB 4.2 及之后,已取消索引键大小限制。

分区扫描

为加速并行读取,连接器提供以下分区策略:

策略

描述

single

将整个集合作为一个分区。

sample

对集合采样生成分区,速度快但可能不均匀。

split-vector

使用 splitVector 命令生成分区,适用于非分片集合,速度快且均匀。需要 splitVector 权限。

sharded

直接读取 config.chunks 作为分区,仅适用于分片集合,速度快且均匀。需要 config 数据库读权限。

default

分片集合使用 sharded 策略,否则使用 split-vector 策略。

Lookup 缓存

MongoDB 连接器可作为维表用于 Temporal Join。通过配置 lookup.cachePARTIAL 启用部分缓存,提升维表关联性能。
缓存工作原理

  1. 查询时首先命中本地缓存。
  2. 缓存未命中时请求外部数据库,并将返回结果写入缓存。
  3. 缓存行数超过 lookup.partial-cache.max-rows 或存活时间超过 expire-after-write / expire-after-access 时淘汰旧数据。

说明

缓存中的数据可能不是最新的,可通过调小过期时间来提高数据新鲜度,但会增加数据库请求量。需根据吞吐量和数据准确性进行权衡。

默认情况下,Flink 会缓存主键查询未命中的空结果。可通过设置 lookup.partial-cache.caching-missing-keyfalse 关闭该行为。

幂等写入

定义主键后,MongoDB Sink 使用 Upsert 语义写入数据,提供幂等性保证。当 Flink 作业从 Checkpoint 恢复并重新处理消息时,Upsert 模式可避免数据重复或约束冲突。

过滤下推

MongoDB 连接器支持将简单的比较和逻辑过滤条件下推到 MongoDB 查询中,以优化查询性能。支持的过滤映射关系如下:

Flink SQL 过滤条件

MongoDB 查询操作符

=

$eq

<>

$ne

>

$gt

>=

$gte

<

$lt

<=

$lte

IS NULL

$eq: null

IS NOT NULL

$ne: null

OR

$or

AND

$and

数据类型映射

MongoDB BSON 类型与 Flink SQL 数据类型的映射关系如下:

MongoDB BSON 类型

Flink SQL 类型

ObjectId

STRING

String

STRING

Boolean

BOOLEAN

Binary

BINARY / VARBINARY

Int32

INTEGER

TINYINT / SMALLINT / FLOAT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

Object

ROW

Array

ARRAY

对于 MongoDB 中的特殊类型,使用 Extended JSON 格式映射为 Flink SQL 的 STRING 类型:

MongoDB BSON 类型

Flink SQL STRING 示例

Symbol

{"_value": {"$symbol": "12"}}

RegularExpression

{"_value": {"<equation>regularExpression": {"pattern": "^9</equation>", "options": "i"}}}

JavaScript

{"_value": {"$code": "function() { return 10; }"}}

DbPointer

{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}

最佳实践

使用 Upsert 模式进行数据同步

在数据同步场景中,建议定义主键并使用 Upsert 模式,以确保数据一致性和幂等性:

CREATE TABLE mongo_sink (
  _id STRING,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'uri' = 'mongodb://user:password@host:27017',
  'database' = 'my_db',
  'collection' = 'users',
  'sink.buffer-flush.max-rows' = '2000',
  'sink.buffer-flush.interval' = '2s'
);

使用维表关联进行数据打宽

通过 Temporal Join 将 MongoDB 作为维表进行实时数据打宽,建议开启 Lookup 缓存以提升性能:

CREATE TABLE mongo_dim (
  _id STRING,
  user_name STRING,
  user_level INT,
  PRIMARY KEY (_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'uri' = 'mongodb://user:password@host:27017',
  'database' = 'my_db',
  'collection' = 'user_info',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.max-rows' = '10000',
  'lookup.partial-cache.expire-after-write' = '60s'
);

SELECT o.*, d.user_name, d.user_level
FROM orders AS o
LEFT JOIN mongo_dim FOR SYSTEM_TIME AS OF o.proctime AS d
ON o.user_id = d._id;

FAQ

如何选择合适的分区策略?

  • 非分片集合:推荐使用 default(即 split-vector),均匀高效。
  • 分片集合:推荐使用 default(即 sharded),直接利用已有分片信息。
  • 小数据量:可使用 single,无需分区开销。
  • 无 splitVector 权限:使用 sample 策略作为替代。

如何处理写入失败的问题?

写入 MongoDB 失败时,连接器会根据 sink.max-retries 配置进行重试。如果多次重试仍然失败,建议:

  1. 检查 MongoDB 实例的连接状态和网络白名单配置。
  2. 确认 MongoDB 用户具有目标集合的写入权限。
  3. 检查文档大小是否超过 MongoDB 的 16MB 限制。
  4. 查看 Flink TaskManager 日志获取详细错误信息。

Lookup 维表缓存如何调优?

根据业务场景权衡数据新鲜度与查询性能:

  • 高实时性要求:设置较短的 expire-after-write(如 10s),牺牲部分吞吐量换取数据准确性。
  • 高吞吐量要求:设置较大的 max-rows 和较长的过期时间(如 300s),减少对 MongoDB 的请求压力。
  • 数据极少变更:可设置较长过期时间并增大缓存行数。
最近更新时间:2026.05.26 16:34:53
这个页面对您有帮助吗?
有用
有用
无用
无用