MongoDB 连接器提供了从 MongoDB 读取数据和向 MongoDB 写入数据的能力,支持作为数据源表、维表和目标表使用。
能力 | 支持情况 |
|---|---|
Source(扫描读取) | 有界(批)模式 |
Lookup Source(维表) | 同步模式 |
Sink(写入) | 批模式 / 流模式(Append & Upsert) |
连接器在 DDL 中定义了主键时,使用 Upsert 语义与外部系统交换 UPDATE/DELETE 消息;未定义主键时,仅支持 Append 模式交换 INSERT 消息。
split-vector,需要具有 splitVector 命令权限。sharded,需要具有 config 数据库的读取权限。-- 注册 MongoDB 表 'users' CREATE TABLE MyUserTable ( _id STRING, name STRING, age INT, status BOOLEAN, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://user:password@127.0.0.1:27017', 'database' = 'my_db', 'collection' = 'users' ); -- 从其他表写入数据到 MongoDB 表 INSERT INTO MyUserTable SELECT _id, name, age, status FROM T; -- 扫描读取 MongoDB 表数据 SELECT _id, name, age, status FROM MyUserTable; -- 作为维表进行 Temporal Join SELECT * FROM myTopic LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime ON myTopic.key = MyUserTable._id;
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
|---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处固定为 |
uri | 是 | (none) | String | MongoDB 连接 URI。例如: |
database | 是 | (none) | String | 要读取或写入的 MongoDB 数据库名称。 |
collection | 是 | (none) | String | 要读取或写入的 MongoDB 集合名称。 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
|---|---|---|---|---|
scan.fetch-size | 否 | 2048 | Integer | 每轮从数据库获取的文档数量提示值。 |
scan.cursor.no-timeout | 否 | true | Boolean | 是否禁用游标超时。MongoDB 默认在 10 分钟不活跃后关闭空闲游标。设置为 |
scan.partition.strategy | 否 | default | String | 分区策略。可选值: |
scan.partition.size | 否 | 64mb | MemorySize | 每个分区的内存大小。 |
scan.partition.samples | 否 | 10 | Integer | 采样分区策略下每个分区的采样数量。仅在 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
|---|---|---|---|---|
lookup.cache | 否 | NONE | Enum | 维表缓存策略。可选值: |
lookup.partial-cache.max-rows | 否 | (none) | Long | 缓存的最大行数,超过该值后最老的行将被淘汰。需设置 |
lookup.partial-cache.expire-after-write | 否 | (none) | Duration | 写入缓存后每行的最大存活时间。需设置 |
lookup.partial-cache.expire-after-access | 否 | (none) | Duration | 访问缓存后每行的最大存活时间。需设置 |
lookup.partial-cache.caching-missing-key | 否 | true | Boolean | 是否缓存未命中的 key。设置为 |
lookup.max-retries | 否 | 3 | Integer | 维表查询失败时的最大重试次数。 |
lookup.retry.interval | 否 | 1s | Duration | 维表查询失败时的重试间隔。 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
|---|---|---|---|---|
sink.buffer-flush.max-rows | 否 | 1000 | Integer | 每次批量请求缓冲的最大行数。 |
sink.buffer-flush.interval | 否 | 1s | Duration | 批量刷写的时间间隔。 |
sink.max-retries | 否 | 3 | Integer | 写入数据库失败时的最大重试次数。 |
sink.retry.interval | 否 | 1s | Duration | 写入失败时的重试间隔。 |
sink.parallelism | 否 | (none) | Integer | Sink 算子的并行度。默认与上游算子并行度一致。 |
sink.delivery-guarantee | 否 | at-least-once | Enum | 数据交付保障级别。可选值: |
MongoDB Sink 根据是否定义了主键来决定工作模式:
MongoDB 使用主键字段组合生成文档的 _id:
_id。_id。例如 PRIMARY KEY (f1, f2) NOT ENFORCED,生成的 _id 形式为 {f1: v1, f2: v2}。说明
如果 DDL 中存在 _id 字段但主键未声明为 _id,会产生歧义。建议将 _id 列作为主键,或对 _id 列进行重命名。
主键索引限制:
为加速并行读取,连接器提供以下分区策略:
策略 | 描述 |
|---|---|
| 将整个集合作为一个分区。 |
| 对集合采样生成分区,速度快但可能不均匀。 |
| 使用 |
| 直接读取 |
| 分片集合使用 |
MongoDB 连接器可作为维表用于 Temporal Join。通过配置 lookup.cache 为 PARTIAL 启用部分缓存,提升维表关联性能。
缓存工作原理:
lookup.partial-cache.max-rows 或存活时间超过 expire-after-write / expire-after-access 时淘汰旧数据。说明
缓存中的数据可能不是最新的,可通过调小过期时间来提高数据新鲜度,但会增加数据库请求量。需根据吞吐量和数据准确性进行权衡。
默认情况下,Flink 会缓存主键查询未命中的空结果。可通过设置 lookup.partial-cache.caching-missing-key 为 false 关闭该行为。
定义主键后,MongoDB Sink 使用 Upsert 语义写入数据,提供幂等性保证。当 Flink 作业从 Checkpoint 恢复并重新处理消息时,Upsert 模式可避免数据重复或约束冲突。
MongoDB 连接器支持将简单的比较和逻辑过滤条件下推到 MongoDB 查询中,以优化查询性能。支持的过滤映射关系如下:
Flink SQL 过滤条件 | MongoDB 查询操作符 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MongoDB BSON 类型与 Flink SQL 数据类型的映射关系如下:
MongoDB BSON 类型 | Flink SQL 类型 |
|---|---|
ObjectId | STRING |
String | STRING |
Boolean | BOOLEAN |
Binary | BINARY / VARBINARY |
Int32 | INTEGER |
TINYINT / SMALLINT / FLOAT | |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
Object | ROW |
Array | ARRAY |
对于 MongoDB 中的特殊类型,使用 Extended JSON 格式映射为 Flink SQL 的 STRING 类型:
MongoDB BSON 类型 | Flink SQL STRING 示例 |
|---|---|
Symbol |
|
RegularExpression |
|
JavaScript |
|
DbPointer |
|
在数据同步场景中,建议定义主键并使用 Upsert 模式,以确保数据一致性和幂等性:
CREATE TABLE mongo_sink ( _id STRING, name STRING, age INT, status BOOLEAN, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://user:password@host:27017', 'database' = 'my_db', 'collection' = 'users', 'sink.buffer-flush.max-rows' = '2000', 'sink.buffer-flush.interval' = '2s' );
通过 Temporal Join 将 MongoDB 作为维表进行实时数据打宽,建议开启 Lookup 缓存以提升性能:
CREATE TABLE mongo_dim ( _id STRING, user_name STRING, user_level INT, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://user:password@host:27017', 'database' = 'my_db', 'collection' = 'user_info', 'lookup.cache' = 'PARTIAL', 'lookup.partial-cache.max-rows' = '10000', 'lookup.partial-cache.expire-after-write' = '60s' ); SELECT o.*, d.user_name, d.user_level FROM orders AS o LEFT JOIN mongo_dim FOR SYSTEM_TIME AS OF o.proctime AS d ON o.user_id = d._id;
default(即 split-vector),均匀高效。default(即 sharded),直接利用已有分片信息。single,无需分区开销。sample 策略作为替代。写入 MongoDB 失败时,连接器会根据 sink.max-retries 配置进行重试。如果多次重试仍然失败,建议:
根据业务场景权衡数据新鲜度与查询性能:
expire-after-write(如 10s),牺牲部分吞吐量换取数据准确性。max-rows 和较长的过期时间(如 300s),减少对 MongoDB 的请求压力。