You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件

大数据研发治理套件

复制全文
下载 pdf
高级参数
高级参数配置总览
复制全文
下载 pdf
高级参数配置总览

本文将为您介绍数据集成任务,在不同场景下所使用到的高级参数配置及其相关说明。

使用前提

按需已创建 离线集成任务流式集成任务解决方案任务等任务类型。

高级参数配置步骤

离线集成任务配置说明

离线任务运行参数、数据源相关参数和资源使用参数均配置到下图红框处。

  1. 登录 DataLeap 租户控制台
  2. 选中任务所属的项目,进入数据开发界面,并打开需配置高级参数的任务。
  3. 在【任务运行参数 > 自定义参数设置】中,添加相应的高级参数,添加示例:
    Image

流式集成任务配置说明

  1. 数据源相关参数,任务运行参数等的配置,与离线集成任务中的配置方式一致。
  2. 流式集成任务的资源参数,可直接在下图红框处进行可视化配置:
    Image
  3. 流式集成任务的其他高级参数,可在【任务运行参数 > 高级参数】中添加,添加示例:
    Image

解决方案参数配置说明

DataSail 解决方案高级参数设置,您可在方案的资源组高级配置中进行设置。

  1. 登录 DataSail 控制台
  2. 在左侧导航栏中选择数据同步方案,进入同步方案配置界面。
  3. 单击目录树中项目选择入口,选择已创建的 DataLeap 项目。
    Image
  4. 在方案编辑界面中,您可通过以下两个入口配置高级参数:
    • 入口一:【基本配置 > 资源组高级配置】中,添加相应的离线、实时高级参数设置。
      Image
    • 入口二:【映射配置 > 高级参数配置】中,添加相应的高级参数设置。
      Image

通用参数

离线任务运行参数

参数名称

参数说明

默认值

job.common.dirty_record_skip_enabled

是否跳过脏数据。

true

job.common.global_timezone

该参数可定义全局的默认时区。默认值为 Asia/Shanghai,您可在特殊场景中定义数据库的时区信息。

Asia/Shanghai

  • job.common.reader_transport_channel_speed_record
  • job.common.writer_transport_channel_speed_record

读写限速参数,每秒读写条数限制,默认值 -1,代表不限制;配置大于 0 时,就开启条数限制。

-1

  • job.common.reader_transport_channel_speed_byte
  • job.common.writer_transport_channel_speed_byte

读写限速参数,每秒读写 bytes 限制,默认值 -1,代表不限制;配置大于 0 时,就开启限制。

-1

  • job.reader.reader_parallelism_num
  • job.writer.writer_parallelism_num

连接器的读并发和写并发,只适用于离线任务
不建议配置,不合理的配置会造成资源浪费或导致执行变慢。

无默认值,系统根据数据量大小自动推算并发数。

job.writer.case_insensitive

大小写不敏感。
在读写 TOS、OSS、ES、Kafka 等数据源时,如有大小写转换问题,建议配置为大小写敏感即 job.writer.case_insensitive=false

true

job.writer.pre_sql_list

写入数据源前置处理 SQL List ,格式是 json 数组,如:
job.writer.pre_sql_list=["delete from xx where id=xxx","delete from xx where id=xxx"]

流式任务运行参数

参数名称

参数说明

默认值

job.common.global_parallelism_num

连接器的全局并发数,只适用于流任务
该参数会决定启动多少个 TaskManager。

  • 如果 MQ 中的流量较小,则可通过在任务高级参数中指定此参数来控制作业的全局并发。
  • 如果 MQ 的 Partition 个数很多,但数据流量并不高,亦可通过此参数来节约任务的执行资源。

MQ Partition 个数 / 4

job.common.checkpoint_interval

每次 Checkpoint 时间间隔,单位为毫秒,默认 900000 毫秒(15分钟)进行一次 Checkpoint,只适用于流任务

900000

job.common.checkpoint_timeout

Checkpoint 超时时间,单位为毫秒,只适用于流任务

600000

解决方案相关参数

  • 实时整库中离线全量同步参数:

    说明

    离线全量同步更多数据源高级参数配置,可参考离线任务运行参数 数据源参数

    参数名称

    参数说明

    默认值

    job.common.is_use_batch_mode

    是否 batch 模式:

    • true: flink 任务 batch 模式,在资源有限情况,部分 taskmanager 也可运行任务;
    • false:flink 任务 pipeline 模式,适用数据量大时,读写同时进行。

    true

    job.reader.enable_string_compatible

    PostgreSQL2Hudi 实时分库分表同步解决方案中,若源端存在当前不能识别的 postgre 数据类型时,您可根据实际业务情况添加该参数,来判断是否将其转换为 string 类型。

    • false:不转换,作业运行报错;
    • true:转换为 string 类型,作业正常运行。

    说明

    该参数仅适用于 PostgreSQL2Hudi 实时分库分表同步解决方案。

    false

    job.reader. add_zone_offset_to_zero_timestamp

    在实时整库全增量方案中,若源端表存在不同类型的时间字段(含时区与不含时区并存),可能导致时间字段在转换为时间戳时处理逻辑不一致,进而引发目标表时间值不一致的问题。
    此时,您可添加高级参数 job.reader.add_zone_offset_to_zero_timestamp=true,使不同类型的时间字段在转换为时间戳时的处理逻辑保持一致,从而避免目标表中时间字段值不一致的问题。

    job.writer.date_precision

    实时整库/分库分表解决方案在目标端为 LAS 数据源,且源端 datetime 类型时间戳字段需写入目标端 bigint 类型字段时,方案生成的批作业将默认增加高级参数 job.writer.date_precision = millisecond,自动将目标端字段转换为毫秒级时间戳,提升数据同步准确性。

    当前说明场景中默认值为 millisecond;其余场景默认值为 second。

  • 实时整库中实时增量同步参数

    参数名称

    参数说明

    默认值

    job.common.checkpoint_interval

    设定 Checkpoint 刷新时间。

    900000

    job.reader.poll_interval_ms

    设置读 binlog 的刷新时间,默认 500 毫秒。
    设置场景:调小此参数,提高实时性。

    500

    job.reader.debezium

    • 忽略特殊不能解析的 ddl:
      "database.history.skip.unparseable.ddl": "true"

    • binlog 忽略 sql 操作配置:
      "skipped.operations":"d"
      取值这几个: d,c,u

      • c for inserts/create,
      • u for updates,
      • d for deletes,
      • t for truncates,
      • none to not skip any operations.

      默认 : truncate operations are skipped.
      配置示例:{"database.history.skip.unparseable.ddl":"true","skipped.operations":"d,u"}
      更多说明可参考官网:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-skipped-operations

    -

    job.reader. add_zone_offset_to_zero_timestamp

    在实时整库全增量方案中,若源端表存在不同类型的时间字段(含时区与不含时区并存),可能导致时间字段在转换为时间戳时处理逻辑不一致,进而引发目标表时间值不一致的问题。
    此时,您可添加高级参数 job.reader.add_zone_offset_to_zero_timestamp=true,使不同类型的时间字段在转换为时间戳时的处理逻辑保持一致,从而避免目标表中时间字段值不一致的问题。

    job.writer.table_without_primary_key_strategy

    当解决方案目标端为 ByteHouse CDW 和 ByteHouse CE,且源端表为无主键表,此时通过 DDL 策略自动创建无主键表时,可配置该参数,具体值说明如下:

    • skip,忽略无主键表的 DDL 策略建表,丢弃后续的 DML 数据
    • fail,当有无主键表被 DDL 策略创建时,作业运行失败

    skip

    solution.writer.common.ddl.bytehouse.enable.binlog

    在使用实时整库同步解决方案自动创建 ByteHouse CDW 表时,若要使该表后续具备 Binlog 读取能力,则需要手动添加高级参数 solution.writer.common.ddl.bytehouse.enable.binlog=true,如此创建的表方可后续支持 Binlog 读取。

    -

    solution.reader.ddl.external_schema_mode

    解决方案支持灵活的 Schema 定义,可通过该高级参数,用于指定需要在 Schema Fetcher 中获取的字段及顺序。取值说明如下:

    • Replace:
      solution.reader.ddl.external_schema_mode=Replace
      当对字段顺序存在要求,或者想要自定义来源字段时,可使用该取值。此时,源端返回的字段及字段属性都会被丢弃。所以,您需要结合下方的solution.reader.ddl.external_schemas参数,自行定义每个字段的类型以及 isPrimaryKey 等关键字段,传递完整的 Schema 信息。
    • Merge:
      solution.reader.ddl.external_schema_mode=Merge
      当字段顺序无要求且场景中有增量字段时,可用该取值。结合下方 solution.reader.ddl.external_schemas 参数,并通过设置字段的 position 参数,来指定新增字段在 Schema 中插入的位置,插入顺序取值如下:
      • First:置于 Schema 首位。
      • Last:置于 Schema 末尾。
      • Before:须指定参数,如 Before(name) ,置于 name 字段之前。
      • After:须指定参数,如 After(name) ,置于 name 字段之后。

    MERGE

    solution.reader.ddl.external_schemas

    自定义源端 Schema 字段信息,示例如下:

    说明

    实时分库分表解决方案中,Mongo Schema 字段信息中的源表名信息,可以是“”的形式,表示所有源表均可匹配该字段信息。如下方示例中表名 test.dts_mongo_test,可将其填写为 “”。

    • solution.reader.ddl.external_schema_mode=Replace,全列替换模式,示例如下:

      solution.reader.ddl.external_schemas={
              "test.dts_mongo_test": [{
                      "name": "_id",
                      "type": "objectid",
                      "isPrimaryKey": true
              }, {
                      "name": "address2",
                      "type": "string"
              }, {
                      "name": "price",
                      "type": "double"
              }]
      }
      

      Image

    • solution.reader.ddl.external_schema_mode=Merge,融合模式,示例如下:

      solution.reader.ddl.external_schemas={
              "test.dts_mongo_test": [{
                      "name": "_id",
                      "type": "objectid",
                      "position": "first"
              }, {
                      "name": "address2",
                      "type": "string",
                      "isPrimaryKey": true,
                      "position": "before(_id)"
              }, {
                      "name": "price",
                      "type": "double",
                      "position": "last"
              }]
      }
      

      Image

  • Flink 运行参数表:

    参数名称

    参数说明

    默认值

    taskmanager.memory.managed.size

    每个 Task Manager 的托管内存占总内存大小。
    推荐配置:200m (实时任务使用较少)

    -

    taskmanager.memory.network.fraction

    每个 Task Manager 的网络内存的占比。
    推荐配置:0.05 (实时任务使用较少)

    -

资源使用相关参数

说明

资源使用参数只适用于离线集成任务。
流式集成任务的资源参数,可按照上方【流式集成任务配置说明】章节中的介绍,直接在页面中选配即可。

参数名称

参数说明

默认值

job.common.flink_tm_vcores

每个 Task Manager 使用的 CPU 核数。

  • 取值说明:必须大于或等于 0.5,且必须是 0.5 的倍数

1.0

job.common.slots_per_tm

每个 Task Manager 默认 slot 的数量。

2

job.common.flink_tm_slot_memory

每个 Task Manager 中的各个 slot 的内存大小,单位为 MB。

  • 取值说明:每个 Task Manager 的 CPU 和内存(GB)的比例,必须满足:flink_tm_vcores : (flink_tm_slot_memory [GB] * slots_per_tm) = 1:2 或 1:4。
  • 取值示例:flink_tm_vcores = 4.0,slots_per_tm = 2,则 flink_tm_slot_memory 必须为 4GB 或 8GB,即配置为 4096 或 8192。

2048

job.common.flink_tm_task_off_heap_memory

每个 Task Manager 的堆外内存占总内存的比例。

0.125

job.common.flink_tm_managed_memory_ratio

每个 Task Manager 的托管内存占总内存的比例。

0.2

job.common.flink_tm_network_max

每个 Task Manager 的网络内存的最大值,单位为 GB。

  • 取值说明:配置时使用小写字母 g,比如:2g。

2g

job.common.flink_jm_vcores

Flink Job Manager 的 CPU 核数。

  • 取值说明:不得小于 0.5,且必须是 0.5 的倍数

1.0

job.common.flink_jm_memory

Flink Job Manager 的总内存大小,单位为 MB。

  • 取值说明:Job Manager 的 CPU 和内存(GB)的比例,必须满足:flink_jm_vcores : flink_jm_memory [GB] = 1:2 或 1:4。
  • 取值示例:flink_jm_vcores = 2.0,则 flink_jm_memory 必须为 4GB 或 8GB,即配置为 4096 或 8192。

4096

job.common.flink_jm_off_heap_memory

Flink Job Manager 的堆外内存占总内存的比例。

0.125

数据源参数

ByteHouse CDW/CE

ByteHouse_CDW/CE 批式读

读取 ByteHouse_CDW/CE 时,支持以下高级参数,您可根据实际情况进行配置:

参数名称

描述

默认值

job.reader.split_config

设置任务分片数量配置参数。
配置示例: job.reader.split_config = {"split_num": 10}
适用范围: 整数类型切分键,如切分键是 int 类型,默认分片数是和并发数相同 ,可以通过此参数修改分片数量:job.reader.split_config = {"split_num": 100}

分片数是和并发数相同

job.reader.string_split_size

设置分片大小。
适用范围:字符串类型切分键,您可根据该值计算分片数量。
该值不宜过大或过小:

  • 过小,分片数量较多,计算分片耗时较长
  • 过大,单分片数量过大,触发读数据超时

1000000

job.reader.customized_connection_properties

读取 ByteHouse 超时设置参数。
配置示例: 超时设置 : job.reader.customized_connection_properties = {"max_execution_time":3000}

job.reader.output_null_value

ByteHouse CDW 中读取 Nullable(T) 类型的数据时,没有具体值的行将默认读取返回为 null;如果需要返回 0,则您可配置 job.reader.output_null_value=false 参数,此时 null 值将返回 0。

true

job.reader.use_local_split

读取 ByteHouse CDW/CE 数据时,若配置的切分键中包含空值时,可以使用该高级参数,避免数据丢失或数据重复情况。

ByteHouse_CDW/CE 批式写

批式写入 ByteHouse_CDW/CE 时,支持以下高级参数,您可根据实际情况进行配置:

参数名称

描述

默认值

job.writer.flush_interval

写入 buffer 的刷新时间,默认 60000 毫秒

60000

job.writer.buffer_count

写入 buffer 记录条数,默认 8192

8192

job.writer.query_timeout

设定查询超时退出时间,默认 30000 毫秒

30000

job.writer.skip_delete_task

写入 ByteHouse CDW 是通过导入任务方式,默认任务执行完,再删除导入任务,通过此参数,您可设置是否跳过删除导入任务 ;
适用场景:需要在 ByteHouse CDW 查看导入服务日志时,参数设置为 true,则跳过删除导入任务。

false

job.writer.cfs_write_batch_size

  • 动态分区默认 4096;
  • 非动态分区默认 8192。

说明

该高级参数,需配合 CFS 写入方式进行使用。

4096/8192

job.writer.cfs_vw_id

当选择 CFS 方式写入 ByteHouse 云数仓版时,建议在自定义参数设置中添加该高级参数,指定数据导入服务所使用的计算组信息。格式如下:
job.writer.cfs_vw_id = vw-21xxxxx57-dts-test,需替换为具体的计算组信息,可前往 ByteHouse 控制台获取对应的计算组。详见 ByteHouse CDW 计算组

job.writer.bh_ce_partition_type

显示的指定 ByteHouse 的分区字段为 string 类型,当分区字段使用函数时,需要使用此高级参数函数。

string

job.writer.loading_mode

高级参数 job.writer.loading_mode 可设置为 FULL_REFRESH 或 INCREMENTAL,默认为 INCREMENTAL:

  • FULL_REFRESH:可支持非分区表清除原先表中数据后再导入新数据,避免数据重复。
  • INCREMENTAL:支持分区表导入数据时,事先清除对应分区中的数据后再导入新数据,对非分区表无效。

说明

  • 该高级参数,需配合 CFS 写入方式进行使用。
  • 该高级参数对写入 ByteHouse CE 数据源时不适用。

INCREMENTAL

job.writer.extraProperties

添加此参数,数据导入 ByteHouse CDW 时,支持原子性导入数据。
配置示例如下:
job.writer.extraProperties = {"job_time":"${date}"}

job.writer.bh_connection_properties

Map<String,String> 类型。
离线、实时解决方案中,目标数据源为 ByteHouse CE、ByteHouse CDW 时,可自定义设置 ByteHouse query 参数,设置到 jdbc 连接的 properties 中。
配置示例如下:
job.writer.bh_connection_properties={"query_timeout":63}

说明

该高级参数,需配合 JDBC 写入方式进行使用。

job.writer.session_properties

Map<String,String> 类型。
离线、实时解决方案中,目标数据源为 ByteHouse CE、ByteHouse CDW 时,可自定义设置 ByteHouse session 参数,通过在当前 session 中执行 set key=value 的语句来实现。
配置示例如下:
job.writer.session_properties={"max_threads":8}

job.writer.pre_sql_list

写入 ByteHouse CDW 数据源前置处理 SQL List ,格式是 json 数组,多条执行语句可用逗号分隔。如:
job.writer.pre_sql_list=["delete from table_name1 where id=xxx","delete from table_name2 where id=xxx"]

说明

该高级参数,需配合 JDBC 写入方式进行使用。

job.writer.virtual_warehouse

可通过该高级参数,指定 ByteHouse CDW 中任务导入所需的计算组信息。配置示例如:
job.writer.virtual_warehouse=Your_ByteHouse_Virtual_Warehouse(如 vw-21xxxxxxxxxx7-dxx-xxt)
计算组操作详见 ByteHouse CDW 计算组

Elasticsearch

Elasticsearch 批式读

参数名

描述

默认值

job.reader.case_insensitive

读取数据时字段大小写是否需要敏感。
在读 ES 数据源时,如有字段包含大写,建议配置为大小写不敏感。即 job.reader.case_insensitive=false

true

FTP/SFTP

  • JSON 数据格式相关参数:

    参数

    描述

    默认值

    job.common.case_insensitive

    JSON 内容解析时是否对字段 Key 大小写敏感。

    true

    job.common.support_json_path

    是否支持带 . 的字段名。true 为支持,false 为不支持。

    false

    job.common.json_serializer_features

    Datasail 使用 fastjson 解析 JSON 内容,用户可以通过此参数设置 JSON 解析的 features,详情参考 SerializerFeature - fastjson 1.2.83 javadoc。多个 SerializerFeature 使用逗号分隔。

    job.common.convert_error_column_as_null

    是否将类型转化失败的字段默认置为 null。

    false

  • CSV、TXT 数据格式相关参数:

    参数

    描述

    默认值

    job.common.csv_escape

    CSV 的 escape 字符

    job.common.csv_quote

    CSV 的 quote 字符

    job.common.csv_with_null_string

    将 CSV 中的这个字段值视为 null

  • 其余参数:

    参数

    描述

    默认值

    job.reader.skip_if_path_not_exists

    当 FTP/SFTP 上读取的文件不存在时,是否跳过该文件。默认不跳过,任务失败。
    您可根据实际场景设置该参数为 true,则会跳过该文件,任务仍能够执行成功。设置格式如下:job.reader.skip_if_path_not_exists=true

    false

HDFS

HDFS 通用高级参数

参数

描述

默认值

job.common.case_insensitive

JSON 内容解析时是否对字段 Key 大小写敏感。

true

job.common.support_json_path

是否支持带 . 的字段名。true为支持,false 为不支持。

false

job.common.json_serializer_features

DataSail 使用 fastjson 解析 JSON 内容,用户可以通过此参数设置 JSON 解析的 features,详情参考 SerializerFeature - fastjson 1.2.83 javadoc。多个 SerializerFeature 使用逗号分隔。

job.common.convert_error_column_as_null

是否将类型转化失败的字段默认置为 null。

false

job.common.host_ips_mapping

HDFS 数据源通过连接串形式配置时,需在高级参数中配置 ip_mapping 信息,将 hdfs 集群的节点域名与 ip 进行映射,示例如下:
Image

job.common.host_ips_mapping = {
 "master-1-1.emr-c7axxxxxxxxx9b.cn-beijing.emr-volces.com":"xxx.xx.x.xx",
 "master-1-3.emr-c7axxxxxxxxx9b.cn-beijing.emr-volces.com":"xxx.xx.x.xx",
 "core-1-1.emr-c7axxxxxxxxx9b":"xxx.xx.x.xx",
 "master-1-1.emr-c7axxxxxxxxx9b":"xxx.xx.x.xx",
 "core-1-1.emr-c7axxxxxxxxx9b.cn-beijing.emr-volces.com":"xxx.xx.x.xx",
 "core-1-2.emr-c7axxxxxxxxx9b":"xxx.xx.x.xx",
 "master-1-2.emr-c7axxxxxxxxx9b.cn-beijing.emr-volces.com":"xxx.xx.x.xx",
 "master-1-3.emr-c7axxxxxxxxx9b":"xxx.xx.x.xx",
 "core-1-2.emr-c7axxxxxxxxx9b.cn-beijing.emr-volces.com":"xxx.xx.x.xx",
 "master-1-2.emr-c7axxxxxxxxx9b":"172.16.1.68"
 }

HDFS 批式读

参数

描述

默认值

job.reader.parse_partition_from_path

在需要读取并解析数据源地址路径下的分区字段场景中,可以添加此高级参数,在手动添加分区字段映射后,便可正常读取分区字段数据。
格式为:job.reader.parse_partition_from_path=true

false

job.reader.partition_num

读取分区字段数据场景中,如果是读取 Json 这类没有 Schema 定义的数据格式时,需添加此高级参数,来告知当前作业设置的路径中包含多少个分区字段数。
格式为:job.reader.partition_num=n,n 为分区字段个数。

HDFS 实时写

参数

描述

默认值

job.writer.rolling.max_part_size

文件切割大小,单位字节,默认 10G。

注意

这里是指未压缩读的数据大小, 而非 HDFS 最终文件大小。

10737418240

job.writer.hdfs.replication

HDFS 副本数

3

job.writer.hdfs.compression_codec

HDFS 压缩格式,支持

  • snappy
  • lz4
  • zstd
  • fourmc
  • fourmz
  • gzip
  • None(不压缩)

zstd

job.writer.dump.directory_frequency

写入 HDFS 文件夹的频率,支持以下参数:

  • 天级:dump.directory_frequency.day
  • 小时级:dump.directory_frequency.hour

dump.directory_frequency.day

HIVE

HIVE 通用高级参数

参数名称

参数说明

默认值

job.common.checkpoint_interval

设定 Checkpoint 刷新时间,默认 15 分钟,如果实时写入 Hive 时,写入 Hive 时间依据此参数。

900000

job.common.host_ips_mapping

Hive 数据源通过连接串方式接入自建集群时,需通过该高级参数配置 host 与真实 ip 之间的映射关系,示例如下:

{
  "core-1-1.emr-1": "1xx.xx.0.11",
  "core-1-2.emr-2": "1xx.xx.0.12",
  "master-1-1.emr-1": "1xx.xx.0.13"
}

HIVE 通用读写

参数名称

参数说明

默认值

job.writer.partition_strategy

参数分区创建策略
参数值:partition_first、partition_last
适应场景:流任务写数据时设置为 partition_first 时,便可实时查看当前 Hive 分区数据。

partition_last

job.writer.partition.date_format

写入动态日期分区格式,可配置为:yyyyMMdd、yyyy-MM-DD

yyyyMMdd

job.writer.partition.hour_format

写入动态小时分区格式,可配置为:hh、HH

HH

job.writer.null_string_as_null

复杂类型中的 string 类型,默认会将 null 写为空字符串。如果需要配置默认写入 null,可以将此参数配置为 true。如脏数据中存在无法转换的列时,会自动转换为 null 值。

false

job.writer.case_insensitive

默认会将数据全部转换为小写。

true

job.writer.convert_error_column_as_null

将脏数据中无法转换的列自动转换为 null。

false

job.writer.dump.directory_frequency

写入 HDFS 文件夹的频率,支持以下参数:

  • 天级:dump.directory_frequency.day
  • 小时级:dump.directory_frequency.hour

dump.directory_frequency.day

job.writer.dump.file_name_state.version

若场景中存在多个流式集成任务同时写同一个 Hive 表时,可使用该参数:

  • job.writer.dump.file_name_state.version=1,不支持多个作业写同一个 Hive 表。临时目录共享,目录结构为 ${BASE}/_DUMP_TEMPORARY/cp-${i}/task-${j}/
  • job.writer.dump.file_name_state.version=2,支持多个作业写同一个 Hive 表。临时目录隔离,目录结构为 ${BASE}/_DUMP_TEMPORARY/${JOBID}/cp-${i}/task-${j}/

注意

该高级参数目前仅支持在流式集成任务中使用,离线集成任务暂不支持。

1

  • job.writer.extra_config
  • job.reader.extra_config

EMR Hive 接入方式读写 TOS 分层桶时,需添加以下相关参数 :
{"fs.tos.access-key-id":"{{ss_ak}}","fs.tos.secret-access-key":"{{ss_sk}}","fs.tos.endpoint":"tos-{{region}}.ivolces.com","fs.tos.credentials.provider":"io.proton.common.object.tos.auth.SimpleCredentialsProvider","fs.tos.impl":"io.proton.fs.ProtonFileSystem","fs.AbstractFileSystem.tos.impl":"io.proton.fs.ProtonFS"}

说明

其中:

  • ss_ak/ss_sk 信息,您可以进入火山引擎访问控制台的密钥管理界面获取,即 Access Key ID/ Secret Access Key 信息。
  • endpoint 中需修改为具体 region 信息。各 region 信息详见 地域和访问域名

若未添加,则读写 TOS 分层桶时,会出现以下报错:

Caused by: java.lang.ClassNotFoundException: io.proton.common.object.tos.auth.EmrSidecarCredentialProvider
    at io.proton.common.object.tos.auth.DefaultCredentialsProviderChain.loadAllCredentialProviders(DefaultCredentialsProviderChain.java:45)
    at io.proton.common.object.tos.DelegationClientBuilder.createProvider(DelegationClientBuilder.java:104)
    at io.proton.fs.RawFileSystem.initialize(RawFileSystem.java:564)

  • job.reader.storage_type
  • job.writer.storage_type

EMR Hive 接入方式读写 TOS 分层桶时,需同时添加该参数。即指定读取 Hive 数据的底层存储类型为 raw_tos。

Kafka

Kafka 流式读

参数名称

描述

默认值

job.reader.connector.startup-mode

默认消费起始位置参数指定:

  • earliest-offset 最老:job.reader.connector.startup-mode=earliest-offset,指定最早的消费起始位置;
  • latest-offset 最新:job.reader.connector.startup-mode=latest-offset,指定最新的消费起始位置;
  • specific-offsets:通过分区 offset 位点指定消费起始位置,job.reader.connector.connector.specific-offsets = [{"partition":0,"offset":1},{"partition":1,"offset":2}...]
  • specific-timestamp:通过时间戳位点来指定消费起始位置,job.reader.connector.connector.specific-timestamp = 1716292387000(某个时间的时间戳,单位为毫秒)

job.reader.metadata_columns

读取 Kafka 元数据相关信息,多个元数据可用英文逗号隔开。配置示例如下:job.reader.metadata_columns = timestamp,offset,key,value,partition,headers

Kafka 离线写

参数名称

描述

默认值

job.common.host_ips_mapping

Kafka 通过公网接入,kafka broker 设置为域名;需配置 ip 与域名映射,示例如下:
{
"ukxxxxxxxx-kafka1":"xxx.xx.xxx.xx",
"ukxxxxxxxx-kafka2":"xxx.xx.xxx.xx",
"uxxxxxxxxx-kafka3":"xxx.xx.xxx.xx",
}

job.common.skip_dump_parse

Kafka 数据源通过公网形式接入,开启 SASL_SSL 认证时,需设置该参数为 true。
Image

false

job.writer.properties

max.request.size 消息体大小;
buffer.memory 缓存大小。

说明

适用范围:DataSail 整库解决方案配置中,如果单个消息体比较大时,可以调整此参数。

{"max.request.size":1048576,"buffer.memory":33554432}

job.writer.compression_type

消息压缩格式,支持 none、snappy、gzip、lz4

说明

DataSail 整库解决方案配置中,可指定消息压缩格式。

snappy

Mongo

MongoDB 解决方案读

参数名

描述

默认值

solution.reader.ddl.external_schema_mode

由于 Mongo 灵活的 Schema 定义,且 Mongo Schema Fetcher 获取的字段是无序且不保证每次获取的结果一致,这对解决方案中的自动建表能力有一定的困难。
因此平台提供了该参数,用于指定需要在 Schema Fetcher 中获取的字段及顺序。取值说明如下:

  • Replace:
    solution.reader.ddl.external_schema_mode=Replace
    当对字段顺序存在要求,或者想要自定义来源字段时,可使用该取值。此时,Mongo 返回的字段及字段属性都会被丢弃。所以,您需要结合下方的solution.reader.ddl.external_schemas参数,自行定义每个字段的类型以及 isPrimaryKey 等关键字段,传递完整的 Schema 信息。
  • Merge:
    solution.reader.ddl.external_schema_mode=Merge
    当字段顺序无要求且场景中有增量字段时,可用该取值。结合下方 solution.reader.ddl.external_schemas 参数,并通过设置字段的 position 参数,来指定新增字段在 Schema 中插入的位置,插入顺序取值如下:
    • First:置于 Schema 首位。
    • Last:置于 Schema 末尾。
    • Before:须指定参数,如 Before(name) ,置于 name 字段之前。
    • After:须指定参数,如 After(name) ,置于 name 字段之后。

MERGE

solution.reader.ddl.external_schemas

自定义 Mongo Schema 字段信息,示例如下:

说明

实时分库分表解决方案中,Mongo Schema 字段信息中的源表名信息,可以是“”的形式,表示所有源表均可匹配该字段信息。如下方示例中表名 test.dts_mongo_test,可将其填写为 “”。

  • solution.reader.ddl.external_schema_mode=Replace,全列替换模式,示例如下:

    solution.reader.ddl.external_schemas={
            "test.dts_mongo_test": [{
                    "name": "_id",
                    "type": "objectid",
                    "isPrimaryKey": true
            }, {
                    "name": "address2",
                    "type": "string"
            }, {
                    "name": "price",
                    "type": "double"
            }]
    }
    

    Image

  • solution.reader.ddl.external_schema_mode=Merge,融合模式,示例如下:

    solution.reader.ddl.external_schemas={
            "test.dts_mongo_test": [{
                    "name": "_id",
                    "type": "objectid",
                    "position": "first"
            }, {
                    "name": "address2",
                    "type": "string",
                    "isPrimaryKey": true,
                    "position": "before(_id)"
            }, {
                    "name": "price",
                    "type": "double",
                    "position": "last"
            }]
    }
    

    Image

MongoDB 批示读

参数名

描述

默认值

reader_fetch_size

单批次读取文档 doc 的数量。

100000

filter

指定读取过滤条件,满足 MongoDB 语法,如读取 id = 1000 的数据,填写示例如下:
{id: {$eq: 1000}}

split_mode

分片模式支持两种:

  1. paginating:单个分片的数量为:totalRecords / batchSize
  2. parallelism:若 MongoDB 支持 splitVector 功能,则使用 MongoDB 内置切片功能,否则将根据文档数量/parallelism 平均划分,单个分片的数量为:totalRecords / parallelismpaginating:根据数据量/fetchsize精确分片

说明

  • 若需关闭分片,可设置并发度为 1。
  • 离线同步任务中,若需要读取 MongoDB 5.X 版本的时序表数据时,需要设置该高级参数为 job.reader.split_mode=paginating,方能读取到完整数据。

parallelism

MongoDB 批式写

参数名

描述

默认值

max_connection_per_host

连接池最大连接数。

100

connect_timeout_ms

连接超时时间。

10000

batch_size

单批次写入 MongoDB 的数据量。

100

write_mode

高级参数设置写入方式:

  • insert:直接写入数据,表示不覆盖,出现唯一键冲突时,任务会执行失败。
  • overwrite:写入时,不清除数据,唯一键相同,用新的数据覆盖旧数据,唯一键不同时,直接插入新数据。
  • insert_if_not_exists:写入时,不清除数据,唯一键相同,原数据保持不变,唯一键不同时,直接插入新数据。

MySQL

MySQL 批式读

批式读支持以下高级参数,您可根据实际情况进行配置:

参数名称

描述

默认值

job.reader.init_sql

读取数据前执行的 SQL 语句。对于视图的查询可能需要使用 init SQL 语句初始化环境。

job.reader.reader_fetch_size

每次拉取的数据条数,只在准确分片中有效。

10000

job.reader.query_timeout_seconds

Jdbc 方式读取数据,设定读取超时时间,单位秒。

300

job.reader.shard_split_mode

Jdbc 连接分片模式,支持准确分片、并发分片、不分片三种模式:

  • 准确分片(默认):根据配置的分片键将数据拆分为不同的区间,除下最后一个区间外,每个区间精准的有 reader_fetch_size 条数,此模式速度较慢,但可以保证下游文件大小基本一致。
    • 拉取数据量很大的表或者分片键不是主键或者索引键时,该分片模式分片时间会比较长;
    • 该分片模式支持分片键为整型数据类型和字符串数据类型;
    • 配置方式:将该参数配置为 accurate。
  • 非精确分片:此模式速度较快,但无法保障下游文件大小一致。
    • 该分片模式仅支持分片键为整型数据类型;
    • 参数配置为 parallelism时:根据表的最大最小值,将所有的数据按照执行并发数进行区间分片;
    • 参数配置为quick时:根据 reader_fetch_size 进行模糊分片。
  • 不分片:不进行分片,适用于表数据量较小,或没有主键、索引键的表。
    • 配置方式:将该参数配置为 nosplit 或者不配置 split_pk 。

准确分片

job.reader.customized_sql

自定义查询读取 SQL 语句。filter 过滤配置项不足以描述所筛选的条件,可通过该配置项来自定义执行较复杂的查询 SQL。
例如:需要进行多表 join 后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id。

说明

配置该高级参数项后,数据同步任务仍需配置 table_name、column 、split_pk 、shard_split_mode 等必填配置项。然而,在执行同步时,系统将忽略这些配置项信息,直接使用该高级参数项中配置的内容进行数据查询和筛选。

job.reader.session_properties

通过设定该高级参数,可自定义作用于 MySQL 中会话级的系统变量,格式为 Map<String, String>,如 job.reader.session_properties={ "wait_timeout": "3600" } ,设定超时参数为 3600 秒。

MySQL 批式写

批式写支持以下高级参数,您可根据实际情况进行配置:

参数名称

描述

默认值

job.writer.is_insert_ignore

insert into 模式时,主键或者唯一键冲突时任务失败还是忽略冲突,false 为任务执行失败;true 为忽略冲突,任务正常执行。

false

job.writer.write_batch_interval

一次性批量提交的数据条数,该值可以减少与 MySQL 网络的交互次数并提升整体吞吐量。如果该值设置过大可能会导致数据同步进程 OOM。

100

job.writer.write_retry_times

MySQL 写入失败时重试次数。

3

job.writer.retry_interval_seconds

写入失败后两次重试的时间间隔,单位秒。

write_batch_interval / 10

job.writer.connection_parameters

Jdbc 连接的全部参数,可在默认值后追加补充:

  • tcpRcvBuf 读单行数据量大时使用:
    tcpRcvBuf=1024000 (1MB)
  • allowMultiQueries 允许支持使用多条 sql,用分号隔开: allowMultiQueries=true
  • connectTimeout 连接超时时间,毫秒:
    connectTimeout=60000

autoReconnect=true&useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull

job.writer.table_pattern

数据写入 MySQL 多张分表时必填参数。注意事项详见5.3 数据写入 MySQL 多张分表
该参数表示可以引用源表中的某个字段名来作为目标表名的一部分,例如源表或源 TOS 文件中有 table_name 字段名,则 ${table_name} 表示运行时自动取每条数据中该字段的值来拼接目标表名字符串。
配置示例:job.writer.table_pattern=${table_name}_copy

job.writer.internal_columns

数据写入 MySQL 多张分表时可选参数。
该参数可指定源表中的某些字段数据不写入目标表中。比如已经选做目标表名的 table_name 字段,可不在实际写入目标表的数据字段里出现。您可配置该参数为:job.writer.internal_columns=[ "table_name" ]

job.writer.session_properties

通过设定该高级参数,可自定义作用于 MySQL 中会话级的系统变量,格式为 Map<String, String>,如 job.writer.session_properties={ "wait_timeout": "3600" } ,设定超时参数为 3600 秒。

StarRocks

StarRocks 批式写

参数名称

参数说明

参数默认值

job.writer.sink_flush_interval_ms

写入 buffer 刷新时间,默认 60000 毫秒

60000

job.writer.sink_buffer_size

写入 buffer 数据大小,默认 10485760 (10MB)

10485760

job.writer.sink_buffer_count

写入buffer 记录条数,默认 40960

40960

job.writer.stream_load_properties

配置 StarRocks streamload 方式写入的可选参数,格式为 json 字符串配置。可配置多个可选参数,示例 {"partial_update":true,"insert_ignore":"ignore"}。

  • partial_update,是否使用部分列更新,即只更新有列映射的字段,仅支持主键表。
    有效值:truefalse
    默认值:false ,表示禁用此功能。
  • insert_ignore,忽略更新,按主键忽略新数据,仅支持主键表。
    有效值:ignore,表示启用此功能。
  • strict_mode,指定是否启用严格模式
    有效值: truefalsetrue 启用严格模式, false 表示禁用严格模式。
    默认值: false ,禁用严格模式。
  • max_filter_ratio,streamload 时的最大错误容忍度。错误容忍度是指一次 streamload 请求的所有数据记录中,由于异常数据而被过滤掉的数据记录的最大百分比。
    有效值:01之间
    默认值:0

更多可选参数参考 Starrocks 官网文档 opt_properties

job.writer.request_read_timeouts

写入等待获取结果时间,默认 60000 毫秒
适用场景:如数据量比较大,写入较慢,会触发 timeout,调大此参数

60000

job.writer.request_connect_timeouts

写入连接超时时间,默认 60000 毫秒

60000

job.writer.sink_enable_2PC

写入时任务分两阶段提交,默认 false
true 适用场景:流式集成写入,两阶段提交,保证数据 exactly once

false

Paimon

Paimon 批式读

高级参数 Key

高级参数 Value

job.reader.properties

运行时动态设置 Paimon 表的属性(参见 https://paimon.apache.org/docs/0.8/maintenance/configurations/),格式为 Map<String, String> 类型的 JSON 字符串

job.reader.limit

限制要读取的行数,通常用于数据抽样或测试

Paimon 批式写

高级参数 Key

高级参数 Value

job.writer.properties

运行时动态设置 Paimon 表的属性(参见 https://paimon.apache.org/docs/0.8/maintenance/configurations/),格式为 Map<String, String> 类型的 JSON 字符串

job.writer.overwrite_partition

覆盖写特定的分区,当且仅当“写入模式”为“覆盖”时生效,格式为 Map<String, String> 类型的 JSON 字符串

job.common.checkpoint_interval

流作业 Flink 快照生成周期(单位为毫秒),默认为 300s(5 分钟)。
当流式写 Paimon 时,每次快照完成后才会进行数据提交(Commit)操作,因此较短的快照周期可以让新数据更快地被下游查到。
但是过短的快照周期会产生大量的小文件,导致性能和查询效率受到严重影响,请合理设置该值。

Paimon 解决方案自动建表高级参数

在解决方案的刷新目标表映射界面,我们可以填入一些高级参数来控制建表行为,详见 实时整库同步

高级参数 Key

高级参数 Value

solution.writer.common.ddl.buckets_num

目标表的分桶(bucket)数。如果不设置,默认为 -1(动态 bucket)。

solution.writer.paimon.ddl.bucket_keys

目标表的分桶键(bucket key)。如果有多个字段,可用半角逗号分隔(例如 id,name)。
如果不设置,默认 bucket key 等同于主键;如果表不含主键,则默认会使用所有字段作为 bucket key。

solution.writer.paimon.ddl.options

建表时的各类可选参数(参见 https://paimon.apache.org/docs/0.8/maintenance/configurations/),格式为 Map<String, String> 类型的 JSON 字符串

PostgreSQL

PostgreSQL 批式读

参数名称

描述

默认值

job.reader.init_sql

读取数据前执行的 SQL 语句。对于视图的查询可能需要使用 init SQL 语句初始化环境

job.reader.reader_fetch_size

每次拉取的数据条数,只在准确分片中有效。

10000

job.reader.shard_split_mode

分片模式,支持准确分片、并发分片、不分片三种模式:

  • 准确分片(默认):根据配置的分片键将数据拆分为不同的区间,除下最后一个区间外,每个区间精准的有 reader_fetch_size 条数。
    • 拉取数据量很大的表或者分片键不是主键或者索引键时,该分片模式分片时间会比较长;
    • 该分片模式支持分片键为整型数据类型和字符串数据类型;
    • 配置方式:将该参数配置为 accurate。
  • 并发分片:根据表的最大最小值,将所有的数据按照并发数进行区间分片。
    • 该分片模式仅支持分片键为整型数据类型;
    • 配置方式:将该参数配置为 parallelism。
  • 不分片:不进行分片,适用于没有主键、索引键的表。
    • 配置方式:将该参数配置为 nosplit 或者不配置 split_pk。

准确分片

job.reader.customized_sql

自定义查询读取 SQL 语句。filter 过滤配置项不足以描述所筛选的条件,可通过该配置项来自定义执行较复杂的查询 SQL。
例如:需要进行多表 join 后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id。

说明

配置该高级参数项后,数据同步任务仍需配置 table_name、column 、split_pk 、shard_split_mode 等必填配置项。然而,在执行同步时,系统将忽略这些配置项信息,直接使用该高级参数项中配置的内容进行数据查询和筛选。

PostgreSQL 批式写

参数名

描述

默认值

job.writer.is_insert_ignore

insert into 模式时,主键或者唯一键冲突时任务失败还是忽略冲突

false

job.writer.write_batch_interval

一次性批量提交的数据条数,该值可以减少与 PostgreSQL 网络的交互次数并提升整体吞吐量。如果该值设置过大可能会导致数据同步进程 OOM。

100

job.writer.write_retry_times

PostgreSQL 写入失败时重试次数。

3

job.writer.retry_interval_seconds

写入失败后两次重试的时间间隔,单位秒

write_batch_interval / 10

Redis

Redis 离线读

参数名称

描述

默认值

job.reader.key_pattens

读取 Redis keys 的匹配策略,默认为*,即读取所有的 key。
支持精确匹配和模糊匹配:

  • 精确匹配:比如只同步user_a和user_b的 key,填写user_a,user_b即可;
  • 模糊匹配:比如只同步前缀是user_info的 key,填写 user_info*即可。

*

job.reader.db_index

Redis 逻辑库索引号,默认为 0。如果您的数据位于 Redis 的其他 DB,比如在 DB 6 中,则填写6即可。

0

job.reader.reader_parallelism_num

读取 Redis 分片数,Redis 服务为单线程模型,推荐设置为 1,默认为 1。

1

job.reader.client_timeout_ms

创建 Redis 连接的超时时间,单位为毫秒(ms)。

60000

job.reader.max_attempt_count

执行单次 Redis Command 失败的最大重试次数

3

job.reader.read_mode

指定 Redis 读取模式:

  • scan 模式:当读取的 key 内容数据非常多时,任务可能会出现如 JedisConnectionException: Unexpected end of stream 等异常信息,此时读取 Redis 推荐使用 scan 模式,该模式会将 key 内容分批次去读取,每批次读取一部分内容,提高同步性能。
  • all 模式:默认把 Redis 中所有的 key 内容都一次性全量拉出来,然后再处理。

all

job.reader.scan_batch_size

与 job.reader.read_mode 参数一起使用。当 job.reader.read_mode=scan 模式时,需通过该参数指定每次拉取的 size,默认每次读取 10000 个 key。

10000

job.reader.compress_codec

TOS 读取压缩文件时需要配置该参数,可选值为 zip。

Redis 离线写

参数名称

描述

默认值

job.writer.write_batch_interval

一次性批量提交的数据条数,该值可以减少与 Redis 网络的交互次数并提升整体吞吐量。但如果该值设置过大可能会导致数据同步进程 OOM。

50

job.writer.database

指定写入 Redis 中的 Database 信息,默认为0。

0

TOS

TOS 离线读

参数名

描述

默认值

job.reader.enable_success_file_check

若任务中需要检查前置 Success 文件是否就绪,您可添加该高级参数。设置为 True,表示开启 Success 文件检查,当检查的文件存在时,任务才会执行成功。默认 false,不检查。

false

job.reader.success_file_path

设置检查的 Success 文件全路径信息。

job.reader.success_file_check_interval_ms

设置检查 Success 文件时间间隔,单位毫秒,默认 60000

60000

job.reader.success_file_check_times

设置 Success 文件检查次数,默认 60

60

job.reader.csv_delimiter_ascii

当 TOS 文件中的分隔符是不可见的 ascii 时,可使用该高级参数,并将其设置为对应字符的 ascii,如 job.reader.csv_delimiter_ascii=1,表示文件分隔符为不可见的控制字符 SOH。
该参数优先级高于界面设置的分隔符。

Iceberg

Iceberg 解决方案自动建表高级参数

在解决方案的刷新目标表映射界面,我们可以填入一些高级参数来控制建表行为,详见 实时整库同步

参数名称

描述

示例值

solution.writer.iceberg.ddl.options

建表时,配置表自定义属性。

{"write.format.default":"parquet", "write.metadata.delete-after-commit.enabled":"true"}

solution.writer.common.ddl.partition.include_tables

建表时,配置分区包含的表(正则表达式匹配所有表)。

.*

solution.writer.common.ddl.partition.keys

分区键配置

id

job.writer.upsert-enabled

写入模式,默认 upsert 模式,设置为 false 时启用 append 模式。

false

参数配置规则

Iceberg 额外表属性,常用配置参考:https://iceberg.apache.org/docs/latest/configuration/#write-properties

需加 job.writer 前缀(如 job.writer.xxx)

最近更新时间:2026.06.01 19:15:22
这个页面对您有帮助吗?
有用
有用
无用
无用