You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件

大数据研发治理套件

复制全文
下载 pdf
高级参数
配置 Kafka Connector 高级参数
复制全文
下载 pdf
配置 Kafka Connector 高级参数

本文将为您介绍 DataSail Kafka Connector(Source/Sink)的高级参数配置,包括连接信息、消费策略、序列化格式等,可以帮助您提升数据同步的实时性、优化吞吐量,并解决日常运维中遇到的问题。

Source 参数(读取侧)

本章节详细介绍 Kafka Source 在读取数据时可供配置的高级参数。

job.reader.bootstrap_servers

  • 参数概述
    • 参数名job.reader.bootstrap_servers(别名: kafka_servers
    • 适用范围:Source / Sink / Common
    • 单位:无
    • 默认值:无,必须填写。
    • 核心作用:指定用于建立与 Kafka 集群初始连接的 Broker 地址列表。客户端会通过此列表发现集群中的所有 Broker。格式为 host1:port1,host2:port2,...
  • 适用场景
    • 场景一:连接 Kafka 集群
      • 典型现象:作业启动失败,日志中出现无法连接到 Kafka 的错误。
      • 常见报错关键字Failed to connect to broker, Connection refused, No resolvable bootstrap urls given in bootstrap.servers
      • 处理方式:此为基础核心配置,必须正确填写。请检查地址、端口及网络连通性。
  • 调参建议
    • 默认推荐:建议提供 2-3 个 Broker 地址。这可以保证在部分 Broker 节点宕机时,客户端依然能成功连接到集群,提高可用性。无需列出所有 Broker。
  • 注意事项
    • 此参数为建立连接的入口,并非意味着客户端只会连接这里列出的 Broker。客户端会利用这些地址发现并连接集群中的所有 Broker。
    • 请确保作业运行环境与 Kafka Broker 之间网络策略通畅。
  • 总结说明job.reader.bootstrap_servers 是连接 Kafka 集群的“敲门砖”,您需要在此提供至少一到两个 Kafka Broker 的地址和端口,以确保数据同步任务能够成功启动。为提高连接的可靠性,推荐配置 2-3 个地址。

job.reader.topics

  • 参数概述
    • 参数名job.reader.topics
    • 适用范围:Source
    • 单位:无
    • 默认值:无,必须填写。
    • 核心作用:指定需要从中读取数据的一个或多个 Kafka Topic。多个 Topic 名称之间使用逗号 , 分隔。
  • 适用场景
    • 场景一:订阅单个或多个 Topic
      • 典型现象:作业需要从特定的数据源 Topic 消费数据。
      • 处理方式:直接填写目标 Topic 名称。如果需要消费多个 Topic,用逗号隔开,例如 topic_A,topic_B
  • 调参建议
    • 默认推荐:根据业务需求,明确指定需要消费的 Topic 列表。
    • 支持 Topic 名称的正则表达式,但请谨慎使用,确保表达式的准确性,避免无意中订阅到非预期的 Topic。
  • 注意事项
    • 请确保填写的 Topic 真实存在,并且作业所使用的账号有权限读取。
    • Topic 名称拼写错误是常见的配置问题,请仔细检查。
  • 总结说明通过 job.reader.topics 参数,您可以精确指定任务需要读取的一个或多个数据源(Topic)。如果您的业务数据分散在不同的 Topic 中,可以使用逗号将它们隔开,实现一次性订阅。

job.reader.group_id

  • 参数概述
    • 参数名job.reader.group_id
    • 适用范围:Source
    • 单位:无
    • 默认值:系统会基于实例名和任务 ID 自动生成一个唯一的 group.id
    • 核心作用:指定 Kafka 消费者组(Consumer Group)的名称。同一消费者组内的消费者会协调分工,共同消费一个 Topic 的不同分区,每个分区只会被组内的一个消费者处理。
  • 适用场景
    • 场景一:多个作业实例协同消费
      • 典型现象:一个数据同步任务通过提升并发(多个作业实例)来加快消费速度。
      • 处理方式:所有实例使用相同的 group.id,可以确保每个 Topic 分区只被一个实例消费,避免数据重复处理。
    • 场景二:作业断点续传
      • 典型现象:作业失败重启后,需要从上次中断的位置继续消费,而不是从头或从最新的位置开始。
      • 处理方式:保持 group.id 不变。Kafka Broker 会为每个消费者组记录其消费进度(Offset),重启时消费者可以根据 group.id 找到之前的进度并继续。
  • 调参建议
    • 默认推荐:通常情况下,建议使用系统自动生成的默认值。这可以保证每个任务实例都有独立的消费组,便于管理且避免相互干扰。
    • 手动设置场景:当您有多个独立的任务或应用需要共享消费进度时,可以手动设置相同的 group.id。但请谨慎操作,确保您了解其影响。
  • 参数联动
    • job.reader.group_id 与 Kafka 的 Offset 提交机制紧密相关。消费位点的记录是基于消费者组的。
    • 它也与 job.reader.connector.auto_offset_reset 参数联动,决定了当为一个新的 group.id 首次启动消费,或当已记录的 Offset 失效时,从何处开始读取数据。
  • FAQ(用户提问)
    • 问:如果我每次启动都换一个新的 group_id 会怎样?
      • 答:每次都会被当作一个全新的消费者组,会从 job.reader.connector.auto_offset_reset 参数所指定的位置(如 earliestlatest)开始消费,而不是从上次的进度继续。这通常会导致数据重复处理或丢失。
  • 注意事项
    • group.id 是实现消费负载均衡和故障恢复的关键。除非有明确的业务需求,否则不要轻易修改或在不同任务间共用。
    • 一个稳定的 group.id 是保证数据不重不漏、断点续传的基础。
  • 总结说明job.reader.group_id 相当于您的数据读取任务在 Kafka 中的“身份标识”。它帮助 Kafka 记住您的任务读到了哪里,并在任务重启或扩容时,确保数据能够无缝衔接、不重不漏。通常,您无需关心此参数,使用系统默认值即可。

job.reader.format_type

  • 参数概述
    • 参数名job.reader.format_type(别名: content_type
    • 适用范围:Source
    • 单位:无
    • 默认值json
    • 核心作用:定义 Kafka 消息体(Value)的解析格式。系统需要根据这个参数,选择正确的反序列化器来将原始的字节数据转换成结构化的数据记录。
  • 适用场景
    • 场景一:消费 JSON 格式数据
      • 典型现象:上游系统产生的数据是标准的 JSON 字符串。
      • 处理方式:设置为 json 或保持默认。
    • 场景二:消费 CSV 格式数据
      • 典型现象:消息内容是逗号分隔的文本。
      • 处理方式:设置为 csv
  • 调参建议
    • 默认推荐:根据您的数据源格式准确设置。最常用的为 json
    • 如果您的场景比较复杂,例如一个 Topic 内混合了来自不同表的 CDC(变更数据捕获)数据,可能需要配合其他参数(如 job.reader.use_multi_table_record=true)使用更高级的格式类型。
  • 注意事项
    • 如果 format_type 与消息的实际格式不匹配,会导致数据解析失败,作业通常会报错退出。
    • 请确保上游数据格式的稳定性和一致性。
  • 总结说明job.reader.format_type 参数告诉数据处理系统“如何读懂”从 Kafka 传来的消息。就像您需要用正确的钥匙开锁一样,这里需要根据您的数据源格式(如 JSON 或 CSV)进行正确设置,以确保数据能被顺利解析。

job.reader.start_offset / job.reader.end_offset

  • 参数概述
    • 参数名job.reader.start_offset / job.reader.end_offset
    • 适用范围:Source
    • 单位:无
    • 默认值:-1(表示未设置)
    • 核心作用:在“Offset 模式”下,精确控制数据读取的起始和结束位置。start_offset 定义从哪个 Offset 开始读,end_offset 定义读到哪个 Offset 为止(包含该 Offset)。这两个参数主要用于批处理或数据重跑场景。
  • 适用场景
    • 场景一:精确重跑某段数据
      • 典型现象:由于下游系统故障,需要重新处理 Kafka 中某个精确 Offset 范围的数据。
      • 处理方式:设置 start_offsetend_offset 来框定需要重跑的数据区间。
    • 场景二:批处理作业读取固定数据量
      • 典型现象:每日执行的批处理任务,需要读取从某个已知位置开始的一段数据。
      • 处理方式:配置 start_offsetend_offset
  • 调参建议
    • 排障临时配置:当需要修复或回溯特定数据问题时,这是最精确的控制手段。
    • 不适用于流式任务:对于需要持续不断消费新数据的流式任务,不应设置 end_offset
  • 参数联动
    • 优先级:当 start_offset 被设置(值不为 -1)时,会优先于 start_timestamp(时间戳模式)。
    • 边界行为
      • start_offset 会被系统限制在分区的有效范围内(不会小于 earliest)。
      • end_offset 同样会被限制在分区的有效范围内(不会大于 latest)。
      • end_offset 的区间是闭合的,即包含 end_offset 本身。系统内部处理时会转换为右开区间 [start, end + 1)
  • 注意事项
    • 这两个参数是针对所有分区设置的。如果不同分区的 Offset 范围差异巨大,可能会导致某些分区早已读完而另一些还在读取。
    • 在流式计算(Unbounded)模式下,end_offset 会被忽略。
  • 总结说明start_offsetend_offset 提供了对数据读取范围最精确的控制,如同给书本加上了起始和结束的书签。它们非常适合一次性的数据重跑或批处理任务,但不应用于需要长期运行的实时数据流。

job.reader.start_timestamp / job.reader.end_timestamp

  • 参数概述
    • 参数名job.reader.start_timestamp / job.reader.end_timestamp
    • 适用范围:Source
    • 单位:秒(10 位)或毫秒(13 位)
    • 默认值:-1(表示未设置)
    • 核心作用:在“时间戳模式”下,通过时间来定义数据读取的范围。系统会根据提供的时间戳,查找 Kafka 分区中对应或之后的第一条消息的 Offset 作为起点或终点。
  • 适用场景
    • 场景一:从特定时间点开始消费
      • 典型现象:需要从某个业务事件发生的时间点(如“今天凌晨”)之后开始处理数据。
      • 处理方式:设置 start_timestamp 为该时间点对应的时间戳。
    • 场景二:处理某个时间窗口内的数据
      • 典型现象:进行数据对账或分析,只需要处理昨天一天内(如 2023-10-26 00:00:002023-10-26 23:59:59)的数据。
      • 处理方式:同时设置 start_timestampend_timestamp
  • 调参建议
    • 默认推荐:对于批处理任务,使用时间戳通常比使用 Offset 更直观、更符合业务逻辑。
    • 系统支持 10 位秒级或 13 位毫秒级时间戳,会自动进行转换。
  • 参数联动
    • 优先级:只有当 start_offset 未设置时,start_timestamp 才会生效。
    • 边界行为
      • 如果为某个时间戳找不到对应的 Offset(例如,那个时间段内没有数据写入),start_timestamp 的行为可能会退化为从 latest 开始,导致看似没有消费到数据。
      • end_timestamp 如果不合法(例如晚于当前时间,或早于等于 start_timestamp),作业会启动失败。
  • FAQ(用户提问)
    • 问:我设置了 start_timestamp 为一个小时前,为什么作业启动了但一条数据都没读到?
      • 答:这很可能是因为在您指定的时间点之后,Kafka Topic 中并没有新的数据写入。系统根据时间戳找不到对应的起始位置(Offset),其行为可能转为从最新的位置等待,因此看起来像没有数据。请检查该时间段内上游是否确实有数据生产。
  • 注意事项
    • 时间戳到 Offset 的转换依赖 Kafka Broker 的实现,且它定位的是 大于等于 该时间戳的第一条消息。
    • 时间戳模式的精度取决于消息在 Kafka 中的存储时间,可能与业务事件的发生时间有微小偏差。
  • 总结说明如果您更关心数据产生的业务时间而非其在 Kafka 中的物理存储位置,start_timestampend_timestamp 是更友好的选择。它们让您可以按“小时”、“天”等业务维度来读取数据,尤其适合批处理和数据分析场景。

job.reader.start_date / job.reader.end_date

  • 参数概述
    • 参数名job.reader.start_date / job.reader.end_date
    • 适用范围:Source
    • 单位:无
    • 默认值:空字符串
    • 核心作用:作为 start_timestampend_timestamp 的一种易用性封装,允许用户使用日期时间字符串来指定起止范围,而不是手动计算时间戳。
  • 适用场景
    • 场景一:不想手动转换时间戳
      • 典型现象:明确知道需要从 2023-10-27 08:00:00 开始消费,但不希望自己去查找这个时间对应的秒级或毫秒级时间戳。
      • 处理方式:设置 job.reader.start_date: "20231027080000" 并配合 job.reader.date_format 使用。
  • 调参建议
    • 默认推荐:与 job.reader.date_format 参数配合使用。date_format 的默认值为 yyyyMMddHHmmss
    • 这是一个便利性参数,其最终效果等同于设置了 start_timestamp / end_timestamp
  • 参数联动
    • start_timestamp / end_timestamp 未设置时,系统会尝试使用 start_date / end_datedate_format 来解析成时间戳。如果 timestampdate 都设置了,timestamp 优先。
  • 注意事项
    • 日期字符串的解析使用的是系统时区。如果您的业务涉及跨时区,请注意可能带来的偏差。
    • date_format 必须与您提供的 start_date / end_date 格式严格匹配,否则会导致解析失败。
  • 总结说明start_dateend_datestart_timestampend_timestamp 的“人性化”版本,让您能用熟悉的日期格式(如 20231027080000)来设定数据读取的时间范围,省去了计算时间戳的麻烦。

job.reader.discovery_interval_seconds

  • 参数概述
    • 参数名job.reader.discovery_interval_seconds
    • 适用范围:Source
    • 单位:秒
    • 默认值:-1
    • 核心作用:控制系统检查 Kafka Topic 是否有新分区加入的周期。
  • 适用场景
    • 场景一:批处理任务
      • 典型现象:任务只运行一次,处理固定范围内的数据,不关心 Topic 后续是否增加了分区。
      • 处理方式:保持默认值 -1。系统只会在作业启动时发现一次分区。
    • 场景二:流处理任务,且 Topic 分区会动态增加
      • 典型现象:一个长期运行的流处理任务,其上游 Topic 会根据流量动态扩容(增加分区)。任务需要能自动消费这些新增分区的数据。
      • 处理方式:设置一个正整数,例如 300(表示每 5 分钟检查一次)。
  • 调参建议
    • 吞吐优先 / 批处理:设置为 -1。避免了周期性检查带来的额外开销。
    • 实时性优先 / 流处理:设置为一个合理正值,如 60300。值越小,发现新分区的延迟越低,但对 Kafka Broker 的请求也越频繁。请根据实际分区变更的频率来权衡。
  • 参数联动
    • 此参数与作业的运行模式(批/流)紧密相关。对于流式作业,启用周期性发现是保证不错过动态新增分区数据的关键。
  • FAQ(用户提问)
    • 问:我的流处理任务运行得好好的,但昨天运维给 Topic 扩容加了几个分区,为什么我的任务没有自动去读这些新分区的数据?
      • 答:很可能是因为您的 discovery_interval_seconds 参数保持了默认值 -1,导致任务只在启动时扫描了一次分区。对于需要适应分区动态变化的流式任务,应将此参数设置为一个大于等于 0 的值(如 300 秒),以开启周期性分区发现功能。
  • 注意事项
    • 只有在您的业务场景中,Topic 分区会发生变化时,设置此参数才有意义。如果 Topic 分区数是固定的,保持默认值 -1 即可。
  • 总结说明discovery_interval_seconds 决定了您的数据读取任务是否具备“动态视觉”。对于一次性运行的批处理任务,默认值 -1(仅在启动时看一眼分区情况)是最高效的。而对于需要长期在线、并适应上游 Topic 分区数量变化的流式任务,您需要给它设置一个正数(如 300),让它每隔一段时间就“抬头”看看有没有新的分区需要处理。

job.reader.max_partition_num_per_consumer

  • 参数概述
    • 参数名job.reader.max_partition_num_per_consumer
    • 适用范围:Source
    • 单位:无
    • 默认值:1
    • 核心作用:在用户未明确指定读取并发度(job.reader.reader_parallelism_num)时,此参数用于“建议”一个合理的并发数。计算逻辑为:建议并发度 = ceil(总分区数 / 每个并发实例能处理的最大分区数)
  • 适用场景
    • 场景一:希望系统自动推荐并发
      • 典型现象:不确定为一个拥有 50 个分区的 Topic 设置多少并发比较合适。
      • 处理方式:可以不设置 job.reader.reader_parallelism_num,而是调整 max_partition_num_per_consumer。例如,设置为 5,系统就会建议 ceil(50 / 5) = 10 的并发度。
  • 调参建议
    • 默认推荐:默认值为 1,意味着系统会建议与分区数相等的并发度,最大化利用并行处理能力。
    • 资源有限场景:如果您的计算资源有限,不希望启动过多并发实例,可以适当调大此参数。例如,设置为 10,则一个并发实例最多会处理 10 个分区的数据。
  • 参数联动
    • 此参数是 job.reader.reader_parallelism_num 的“候补”。如果 reader_parallelism_num 被显式设置,则 max_partition_num_per_consumer 的建议值不会生效。
  • 注意事项
    • 这只是一个“建议值”,最终生效的并发度还受限于集群总资源和平台侧的其他配置。
    • 并发度并非越高越好,它直接关系到资源消耗。过高的并发可能导致资源浪费或任务因资源不足而无法启动。
  • 总结说明当您不确定为 Kafka 读取任务设置多少并发数时,max_partition_num_per_consumer 提供了一种间接的控制方式。它通过设定“每个工人(并发实例)最多能干多少活(处理几个分区)”,来帮助系统估算出总共需要多少工人。默认情况下,系统建议一个工人只干一个分区的活,以实现最大化的并行处理。

job.reader.commit_in_checkpoint

  • 参数概述
    • 参数名job.reader.commit_in_checkpoint
    • 适用范围:Source
    • 单位:无
    • 默认值FALSE
    • 核心作用:决定向 Kafka Broker 提交消费位点(Offset)的时机。
  • 适用场景
    • 场景一:默认行为(推荐)
      • 典型现象:希望消费位点的提交能与数据处理循环解耦,以获得较低的延迟。
      • 处理方式:保持默认值 FALSE。在这种模式下,每次从 Kafka 拉取一批数据(poll)后,系统会立即同步提交(commitSync)位点。
    • 场景二:严格的端到端 Exactly-Once 语义(配合特定 Sink)
      • 典型现象:数据处理的下游 Sink 是一个支持事务的系统(如另一个 Kafka Topic),并且需要保证从 Source 读取到 Sink 写入的端到端“恰好一次”处理。
      • 处理方式:设置为 TRUE。此时,位点提交会绑定在整个任务的检查点(Checkpoint)机制中。只有当一个 Checkpoint 成功完成(意味着该 Checkpoint 周期内的数据都已被下游正确处理),相应的 Kafka Offset 才会被提交。
  • 调参建议
    • 实时性优先 / 通用场景:保持默认值 FALSE。这是一种高效且广泛适用的模式。
    • 可靠性优先 / Exactly-Once 场景:当且仅当您完全理解 Flink 的 Checkpoint 机制,并且下游 Sink 支持事务时,才考虑设置为 TRUE
  • 参数联动
    • 此参数的行为与 Flink 的 Checkpoint 机制强相关。设置为 TRUE 时,Offset 提交的频率和时机完全由 Checkpoint 的间隔和成功与否决定。
  • FAQ(用户提问)
    • 问:为什么开启 Checkpoint 后,在 Kafka 监控里看到的 Consumer Lag(消费延迟)时高时低,呈周期性下降?
      • 答:如果您将 commit_in_checkpoint 设置为 TRUE,就可能观察到此现象。因为 Offset 只在 Checkpoint 成功时才提交,所以在两个 Checkpoint 之间的时间里,从 Kafka 的视角看,消费位点没有更新,Lag 似乎在持续增长。一旦 Checkpoint 完成,位点一次性向前跳跃,Lag 随之骤降。这是此模式下的正常表现。
  • 注意事项
    • 将此参数设置为 TRUE 会增加数据处理的端到端延迟,因为数据必须等待整个 Checkpoint 完成后,其位点才被确认。
    • 错误地使用 TRUE 模式(例如,下游 Sink 不支持事务)可能并不能带来 Exactly-Once 的保证,反而会使系统行为变得复杂。
  • 总结说明job.reader.commit_in_checkpoint 控制着任务向 Kafka“汇报”工作进度的时机。默认 FALSE 是“随手汇报”模式,效率高,延迟低。设置为 TRUE 则是“等工作阶段性总结(Checkpoint)完成后再统一汇报”的模式,它能提供更强的数据一致性保证(Exactly-Once),但会牺牲一定的实时性,且需要与支持事务的下游存储配合使用。对于绝大多数场景,保持默认即可。

job.reader.metadata_columns

  • 参数概述
    • 参数名job.reader.metadata_columns
    • 适用范围:Source
    • 单位:无
    • 默认值:空字符串
    • 核心作用:允许用户将 Kafka 消息自身的一些元信息(如时间戳、Offset、Key 等)作为独立的列,附加到输出的数据记录中。
  • 适用场景
    • 场景一:需要基于消息的 Offset 或时间戳进行后续处理
      • 典型现象:下游系统需要知道每条数据在 Kafka 中的精确位置(Offset)或其进入 Kafka 的时间(timestamp),用于数据审计、去重或问题排查。
      • 处理方式:设置为 offset,timestamp。这样,输出的每条数据都会额外增加 offsettimestamp 两个字段。
    • 场景二:需要获取消息的 Key 或 Headers
      • 典型现象:业务逻辑依赖于消息的 Key 来进行分区或关联,或者需要读取 Headers 中的追踪信息。
      • 处理方式:设置为 key,headers
  • 调参建议
    • 默认推荐:按需添加。常用的元信息字段包括:timestamp, offset, key, value, topic, partition, headers
    • 字段名之间用逗号 , 分隔。
    • 字段名的匹配是大小写不敏感的。
  • 注意事项
    • 请确保您在下游 schema 中也定义了这些额外的元信息列,否则可能导致 schema 不匹配的错误。
    • 字段名匹配采用的是“包含且忽略大小写”的模式,例如,如果您配置了 timestamp,一个名为 event_timestamp 的元字段也可能会被错误地匹配。因此,建议使用精确、完整的元字段名列表。
  • 总结说明job.reader.metadata_columns 参数像一个“附加信息”开关。当您不仅关心 Kafka 消息的内容(Value),还关心它的“身份证信息”(如来自哪个 Topic、哪个分区、在什么时间、什么位置)时,可以通过这个参数将它们提取出来,作为数据记录的一部分向下游传递。

job.reader.properties

  • 参数概述
    • 参数名job.reader.properties(别名: extra_raw_properties
    • 适用范围:Source / Sink / Common
    • 单位:无
    • 默认值:无
    • 核心作用:提供一个“后门”,允许用户直接配置 Kafka Consumer 的原生参数。这里设置的键值对会直接透传给 Kafka 客户端。
  • 适用场景
    • 场景一:配置平台未直接暴露的 Kafka 参数
      • 典型现象:需要调整 fetch.min.bytes, max.poll.records, receive.buffer.bytes 等具体的消费者行为参数,以进行深度性能调优。
      • 处理方式:在 properties 中以 key:value 对的形式添加,例如 {"fetch.min.bytes": "1048576"}
    • 场景二:SSL/TLS 加密传输的证书配置
      • 典型现象:Kafka 集群启用了 SSL/TLS 加密,需要客户端提供信任证书。
      • 处理方式:通过此参数配置 ssl.truststore.certificates 等相关参数。如果证书内容是 Base64 编码的,系统会自动尝试解码。
  • 调参建议
    • 吞吐优先
      • 增大 fetch.min.bytes:指定 consumer.poll() 返回的最小数据量,能减少 Broker 和 Consumer 的交互次数,提高吞吐,但会增加延迟。
      • 增大 fetch.max.wait.ms:配合 fetch.min.bytes 使用,表示为了达到最小数据量愿意等待多久。
      • 增大 max.poll.records:单次 poll() 返回的最大记录数。
    • 实时性优先
      • 减小 fetch.min.bytesfetch.max.wait.ms,甚至设为 01,让 Consumer 不等待,有数据就立即返回。
    • 排障临时配置
      • 可以临时调整 request.timeout.ms 等参数来应对网络不稳定的情况。
  • 注意事项
    • 此处的参数 key 必须是 Kafka Consumer 配置中定义的标准名称,如 max.poll.interval.ms
    • 通过 properties 设置的参数,其优先级高于其他 job.reader.connector.* 等快捷参数。请注意配置的覆盖关系。
    • 不当的配置可能会严重影响消费者性能和稳定性,请在充分了解参数含义后再进行调整。
  • 总结说明job.reader.properties 是一个面向高级用户的“专家模式”入口。当标准参数无法满足您精细的调优需求时,您可以通过这里直接向 Kafka 客户端传递任何原生配置指令。这为您提供了最大的灵活性,但同时也要求您对 Kafka 的消费者参数有深入的了解。

job.reader.connector.auto_offset_reset

  • 参数概述
    • 参数名job.reader.connector.auto_offset_reset
    • 适用范围:Source
    • 单位:无
    • 默认值latest(对于流式作业);earliest(对于批处理作业)
    • 核心作用:定义在一个新的消费者组首次启动,或者已提交的 Offset 在 Broker 上已失效(如数据被清理)的情况下,消费者从何处开始读取数据。
  • 适用场景
    • 场景一:流处理任务,只关心新产生的数据
      • 典型现象:一个实时看板或监控任务,只需要处理从它启动这一刻起新到达的数据,历史数据不重要。
      • 处理方式:设置为 latest(默认)。
    • 场景二:批处理任务或数据迁移,需要读取所有历史数据
      • 典型现象:需要将一个 Topic 的全部存量数据完整地同步到另一个系统。
      • 处理方式:设置为 earliest
    • 场景三:不希望程序自动决定,找不到 Offset 就报错
      • 典型现象:对数据读取的起始位置有严格要求,任何不确定性都应让任务失败。
      • 处理方式:设置为 none
  • 调参建议
    • 实时性优先 / 流处理:使用 latest,避免任务启动时处理大量无关的历史数据。
    • 吞吐优先 / 批处理 / 数据完整性要求高:使用 earliest,确保不会遗漏任何数据。
    • 排障临时配置:当怀疑有数据丢失时,可临时改为 earliest 并使用新的 group.id 来重跑数据进行验证。
  • 参数联动
    • 此参数只在“没有有效 Offset”的特定情况下才会触发。如果一个 group.id 已经有提交过的 Offset,则该参数不会生效。
    • 在批处理(Bounded)模式下,即使不设置,系统也会强制默认使用 earliest,以保证批处理任务能够读取到指定范围内的数据。
  • FAQ(用户提问)
    • 问:我一个流处理任务,每次重启都从头消费所有数据,导致下游数据严重重复,是什么原因?
      • 答:您可能错误地将 auto.offset.reset 设置为了 earliest。对于流处理任务,这通常是不期望的行为。请检查该参数是否被设为 latest。另外,也请检查 group.id 是否每次都发生了变化,一个全新的 group.id 也会触发此策略。
  • 注意事项
    • 这是最容易引起“数据重复”或“数据丢失”误解的参数之一。请务必根据您的业务需求(关心存量还是增量)进行正确配置。
  • 总结说明auto_offset_reset 决定了当您的读取任务“迷路”时(比如第一次启动,或者之前的进度记录丢失了),它应该从哪里重新开始。latest 表示“从最新的地方开始,只看未来”,适合实时流任务。earliest 表示“从最古老的地方开始,从头来过”,适合需要完整历史数据的批处理任务。

job.reader.read_from_earliest_offset / job.reader.read_to_latest_offset

  • 参数概述
    • 参数名job.reader.read_from_earliest_offset / job.reader.read_to_latest_offset
    • 适用范围:Source
    • 单位:无
    • 默认值false / false
    • 核心作用:设计意图是作为一种快捷方式来定义读取范围,但当前版本下这些参数仅被解析,并未在核心逻辑中生效
  • 注意事项
    • 当前版本无效:在目前的实现中,设置这两个参数为 true 不会产生预期的效果(即不会自动将读取范围设定为 Topic 的最早到最新)。
    • 如需实现从最早消费到最新,请使用 job.reader.connector.auto_offset_reset: earliest 配合流式(Unbounded)模式,或通过 start_offset/timestampend_offset/timestamp 精确控制批处理范围。
  • 总结说明请注意,job.reader.read_from_earliest_offsetjob.reader.read_to_latest_offset 这两个参数在当前版本中是无效的。如果您需要控制读取的起始位置,请使用 start_offsetstart_timestampauto_offset_reset 等其他参数。

job.reader.wait_poll_empty_times / job.reader.wait_poll_empty_interval

  • 参数概述
    • 参数名job.reader.wait_poll_empty_times / job.reader.wait_poll_empty_interval
    • 适用范围:Source
    • 单位:无 / 毫秒
    • 默认值:-1 / 1000
    • 核心作用:设计意图是在批处理模式下,当消费到分区末尾后,进行一定次数和间隔的重试等待,以应对生产者延迟等场景。但当前版本仅被解析,并未在核心逻辑中生效
  • 注意事项
    • 当前版本无效:在目前的实现中,设置这两个参数不会让任务在读完数据后进行额外的等待和轮询。批处理任务读到 endOffset 后就会结束。
  • 总结说明请注意,job.reader.wait_poll_empty_timesjob.reader.wait_poll_empty_interval 这两个参数在当前版本中是无效的。批处理作业在读取到其定义的结束位置后会直接完成,不会进行额外的等待。

Sink 参数(写入侧)

本章节详细介绍 Kafka Sink 在写入数据时可供配置的高级参数。

job.writer.bootstrap_servers

  • 参数概述
    • 参数名job.writer.bootstrap_servers(别名: kafka_servers
    • 适用范围:Source / Sink / Common
    • 单位:无
    • 默认值:无,必须填写。
    • 核心作用:指定用于建立与 Kafka 集群初始连接的 Producer Broker 地址列表。格式为 host1:port1,host2:port2,...
  • 适用场景
    • 与 Source 侧的 job.reader.bootstrap_servers 作用相同,是连接 Kafka 集群的基础核心配置。
  • 调参建议
    • 默认推荐:建议提供 2-3 个 Broker 地址,以提高连接的可用性。
  • 注意事项
    • 请确保作业运行环境与 Kafka Broker 之间网络策略通畅。
  • 总结说明与读取侧类似,job.writer.bootstrap_servers 是数据写入任务连接 Kafka 集群的入口。您需要在此提供一到两个 Kafka Broker 的地址和端口,以确保数据能够被成功发送。

job.writer.topic_name

  • 参数概述
    • 参数名job.writer.topic_name
    • 适用范围:Sink
    • 单位:无
    • 默认值:无,必须填写。
    • 核心作用:指定数据需要被写入的目标 Topic 名称。
  • 适用场景
    • 场景一:写入单个目标 Topic
      • 典型现象:所有处理完的数据都需要发送到同一个 Kafka Topic。
      • 处理方式:直接填写目标 Topic 名称。
  • 调参建议
    • 默认推荐:根据您的数据落地需求,明确指定目标 Topic。
  • 注意事项
    • 请确保填写的 Topic 真实存在,并且作业所使用的账号有权限写入。
    • 不同于 Source,Sink 通常一次只向一个固定的 Topic 写入(除非采用动态 Topic 路由的复杂模式)。
  • 总结说明job.writer.topic_name 明确了数据处理结果的目的地。所有数据将被发送到您在此指定的 Kafka Topic 中。

job.writer.format_type

  • 参数概述
    • 参数名job.writer.format_type(别名: content_type
    • 适用范围:Sink
    • 单位:无
    • 默认值json
    • 核心作用:定义输出到 Kafka 消息体(Value)的序列化格式。系统会根据这个参数,选择正确的序列化器来将结构化的数据记录转换成字节流。
  • 适用场景
    • 场景一:向下游系统提供 JSON 格式数据
      • 典型现象:下游的消费者期望接收到的是标准 JSON 字符串。
      • 处理方式:设置为 json 或保持默认。
    • 场景二:与其他系统(如 Debezium)兼容的 CDC 格式
      • 典型现象:需要模拟 Debezium 的输出格式,供下游进行增量数据处理。
      • 处理方式:可能会选择 DEBEZIUM_JSON 等特定格式。
  • 调参建议
    • 默认推荐json 是最通用和广泛支持的格式,建议优先使用。
  • 注意事项
    • format_type 必须与下游消费者的期望格式一致,否则会导致下游解析失败。
  • 总结说明job.writer.format_type 决定了您的数据将以何种“语言”(格式)写入 Kafka。json 是最通用的选择,能确保最好的兼容性。

job.writer.key_fields

  • 参数概述
    • 参数名job.writer.key_fields
    • 适用范围:Sink
    • 单位:无
    • 默认值:无
    • 核心作用:从输出的数据记录中,提取一个或多个字段的值,拼接成 Kafka 消息的 Key。
  • 适用场景
    • 场景一:保证同一业务主键的数据有序
      • 典型现象:例如,同一个用户 user_id 的所有相关操作事件,必须按顺序被处理。
      • 处理方式:设置 key_fields: "user_id"。Kafka Producer 会确保相同 Key 的消息被发送到同一个分区,从而保证了分区内的消费顺序。
    • 场景二:利用 Key 进行数据分区
      • 典型现象:希望基于某个字段(如 city_id)将数据均匀地分布到不同的分区,以便下游按城市并行处理。
      • 处理方式:设置 key_fields: "city_id"
  • 调参建议
    • 默认推荐:如果业务逻辑对消息顺序有要求,或者需要按特定业务维度进行数据分区,强烈建议配置此参数。
    • 多个字段可以用逗号 , 分隔,它们的值会拼接在一起作为 Key。
  • 注意事项
    • 不设置 key_fields 会导致消息的 Key 为 null,此时 Producer 会以轮询(Round-Robin)的方式将消息随机发送到不同分区。这会打乱数据的顺序。
    • 一个设计良好的 Key 是保证数据有序性和均匀分布的关键。
  • 总结说明job.writer.key_fields 用于从您的数据中提取“业务主键”,并将其作为 Kafka 消息的 Key。这非常重要,因为它能确保例如“同一个订单的所有更新事件”会被发送到同一个分区并被顺序处理。如果您关心数据的处理顺序,就应该配置它。

job.writer.partition_field

  • 参数概述
    • 参数名job.writer.partition_field
    • 适用范围:Sink
    • 单位:无
    • 默认值:无
    • 核心作用:从数据记录中提取一个字段的值,通过计算哈希值来决定消息应被发送到哪个具体的分区。
  • 适用场景
    • 场景一:需要手动控制分区逻辑
      • 典型现象:相比于让 Producer 自动根据 Key 的哈希来分区,您希望基于数据中一个完全不同的字段来决定分区。
      • 处理方式:设置 partition_field。例如,partition_field: "hash_code_field",系统会用这个字段的值来计算分区号。
  • 调参建议
    • 不推荐常规使用:大多数场景下,使用 job.writer.key_fields 让 Producer 自动处理分区是更简单和推荐的做法。
    • 只有在您有非常特殊的自定义分区策略时,才考虑使用此参数。
  • 参数联动
    • 如果同时设置了 key_fieldspartition_field,分区逻辑以 partition_field 为准。
  • 注意事项
    • 这是一个底层且不常用的参数。错误的使用可能导致数据严重倾斜(即大量数据集中在少数几个分区)。
    • 如果 partition_field 对应的值分布不均匀,会导致下游分区负载不均衡。
  • 总结说明job.writer.partition_field 提供了一种手动指定分区逻辑的底层方法。对于绝大多数用户,我们推荐使用更上层的 key_fields 参数来自动处理数据分区和排序,而无需关心此参数。

job.writer.enable_headers

  • 参数概述
    • 参数名job.writer.enable_headers
    • 适用范围:Sink
    • 单位:无
    • 默认值TRUE
    • 核心作用:在特定格式(如 DEBEZIUM_JSON)下,此参数决定是否将数据记录中的元信息(metadata)提取出来,并作为 Kafka 消息的 Headers 发送。
  • 适用场景
    • 场景一:传递 CDC 元数据
      • 典型现象:使用 DEBEZIUM_JSON 格式进行数据同步时,希望将上游数据库的表名、操作类型(I/U/D)等元信息通过 Headers 传递给下游。
      • 处理方式:保持默认值 TRUE
  • 调参建议
    • 默认推荐:保持默认值 TRUE 即可。
  • 注意事项
    • 此参数仅在特定的序列化格式下有意义。对于普通的 json 格式,此开关可能不起作用。
  • 总结说明job.writer.enable_headers 主要用于 CDC(变更数据捕获)场景,它允许将数据的“附加信息”(如来源表、变更类型)放在消息的 Headers 中一并发送。通常保持默认即可。

job.writer.log_failures_only

  • 参数概述
    • 参数名job.writer.log_failures_only
    • 适用范围:Sink
    • 单位:无
    • 默认值FALSE
    • 核心作用:定义当 Kafka Producer 发送消息失败时的行为。
  • 适用场景
    • 场景一:高可靠性要求,任何写入失败都不能接受
      • 典型现象:数据同步任务的完整性至关重要,一条数据写入失败就意味着整个链路有问题,需要任务立即失败并告警。
      • 处理方式:保持默认值 FALSE。一旦 Producer 发送失败(在重试之后仍然失败),会向上抛出异常,导致整个任务失败。
    • 场景二:允许部分数据发送失败
      • 典型现象:一个非核心的日志或指标同步任务,偶尔丢失几条数据是可以接受的,不希望因为暂时的网络抖动等问题导致整个任务中断。
      • 处理方式:设置为 TRUE。此时,发送失败的消息只会被记录一条错误日志,任务会继续运行处理后续数据。
  • 调参建议
    • 可靠性优先:保持默认值 FALSE。这是最安全的选择,确保您能第一时间感知到写入问题。
    • 可用性优先 / 非核心业务:设置为 TRUE
  • 注意事项
    • 设置为 TRUE 意味着您接受了“数据可能丢失”的风险。请务必确保有相应的日志监控和告警机制,来发现那些被“跳过”的失败消息。
    • 此参数控制的是最终失败后的行为,它不影响 Producer 自身的重试机制(由 retries 参数控制)。
  • 总结说明job.writer.log_failures_only 是一个“容错开关”。默认 FALSE 表示“零容忍”,任何数据写入失败都会让整个任务停下来并告警。设置为 TRUE 则表示“有一定容忍度”,允许任务在记录一条错误日志后,跳过失败的数据继续前进。请根据您的业务对数据完整性的要求来选择。

job.writer.properties

  • 参数概述
    • 参数名job.writer.properties(别名: extra_raw_properties
    • 适用范围:Source / Sink / Common
    • 单位:无
    • 默认值:无
    • 核心作用:提供一个“后门”,允许用户直接配置 Kafka Producer 的原生参数。这里设置的键值对会直接透传给 Kafka 客户端。
  • 适用场景
    • 场景一:性能调优
      • 典型现象:需要调整 batch.size, linger.ms, compression.type, acks 等核心 Producer 参数以平衡吞吐量和延迟。
      • 处理方式:在 properties 中添加,例如 {"batch.size": "65536", "compression.type": "snappy"}
    • 场景二:配置平台未直接暴露的 Kafka 参数
      • 典型现象:需要配置 buffer.memory, max.in.flight.requests.per.connection 等参数。
      • 处理方式:在此处进行配置。
  • 调参建议
    • 吞吐优先
      • 增大 batch.size:Producer 会尝试将发往同一分区的多条消息打包成一个更大的批次,能显著提高压缩率和网络效率,但会增加延迟。
      • 增大 linger.ms:Producer 发送前愿意为收集更多消息而等待的时间。与 batch.size 配合,可以进一步提升批量效果。
      • 设置 compression.type:如 snappy, lz4, zstd。压缩可以大幅减少网络传输和磁盘占用,是提升吞吐的利器。
    • 实时性优先
      • 减小 linger.ms(例如设为 0):让 Producer 不等待,有消息就立刻发送。
      • 减小 batch.size:减少批量带来的延迟。
    • 可靠性优先
      • 设置 acks=all (或 -1):确保消息不仅被 Leader Broker 接收,还被同步到了所有 ISR(In-Sync Replicas)副本后,才算发送成功。这是最高的数据可靠性保证,但会牺牲性能。
  • 参数联动
    • 通过 properties 设置的参数,会覆盖 job.writer.retries, job.writer.retry_backoff_ms, job.writer.linger_ms 等快捷参数的默认值。
  • 注意事项
    • 此处的参数 key 必须是 Kafka Producer 配置中定义的标准名称,如 batch.size
    • 不当的配置可能会严重影响 Producer 性能和数据可靠性,请在充分了解参数含义后再进行调整。
  • 总结说明job.writer.properties 是写入侧的“专家模式”入口,赋予您完全控制 Kafka Producer 行为的能力。通过调整批量大小(batch.size)、等待时间(linger.ms)、压缩方式(compression.type)和确认机制(acks),您可以在吞吐量、实时性和可靠性之间做出最符合您业务需求的权衡。

job.writer.retries

  • 参数概述
    • 参数名job.writer.retries
    • 适用范围:Sink
    • 单位:无
    • 默认值:10
    • 核心作用:当 Producer 发送消息遇到可恢复的错误(如网络抖动、Leader 选举)时,自动进行重试的次数。
  • 适用场景
    • 场景一:提高发送成功率
      • 典型现象:在不稳定的网络环境中,或在 Kafka 集群进行运维操作(如 Broker 重启)期间,希望任务能自动克服暂时的发送失败。
      • 处理方式:保持或适当调大默认值。
  • 调参建议
    • 默认推荐10 是一个比较合理的默认值,能应对大多数临时性故障。
    • 可靠性优先:可以设置一个更大的值,甚至 Integer.MAX_VALUE,以尽可能地确保消息被送达。
  • 参数联动
    • 此参数作为 job.writer.propertiesretries 参数的默认值。如果在 properties 中显式设置了 retries,则此参数无效。
    • retry.backoff.ms 联动,后者定义了每次重试之间的等待间隔。
  • 注意事项
    • 在 Kafka 0.11.0.0 之后的版本,Producer 的重试不会引起消息乱序(即使 max.in.flight.requests.per.connection > 1)。因此,开启重试是安全的。
    • 重试仅对“可恢复”异常有效。对于配置错误等“不可恢复”异常,不会进行重试。
  • 总结说明job.writer.retries 是一个自动的“重试小助手”。当数据发送遇到网络抖动等临时问题时,它会自动尝试再次发送,最多重试 10 次。这大大提高了数据写入的成功率和任务的健壮性,通常您无需修改此参数。

job.writer.retry_backoff_ms

  • 参数概述
    • 参数名job.writer.retry_backoff_ms
    • 适用范围:Sink
    • 单位:毫秒
    • 默认值:1000
    • 核心作用:定义了 Producer 在每次重试发送之间需要等待的时间。
  • 适用场景
    • 场景一:避免在故障期间进行无效的密集重试
      • 典型现象:Kafka Broker 正在重启,需要几秒钟才能恢复服务。
      • 处理方式:设置一个合理的退避间隔(如默认的 1000 毫秒),可以让 Producer 在重试前“冷静”一下,而不是疯狂地冲击尚未恢复的 Broker。
  • 调参建议
    • 默认推荐1000 毫秒是一个适中的值。
    • 如果您的网络环境或集群恢复时间较长,可以适当增大此值,以减少重试带来的资源消耗。
  • 参数联动
    • 此参数作为 job.writer.propertiesretry.backoff.ms 参数的默认值。如果在 properties 中显式设置了该参数,则此参数无效。
  • 总结说明retry_backoff_ms 告诉“重试小助手”在每次失败后要“休息”多久再试。默认 1 秒的间隔,有助于避免在 Broker 繁忙或故障时进行无效的密集攻击,让重试更智能、更高效。

job.writer.linger_ms

  • 参数概述
    • 参数名job.writer.linger_ms
    • 适用范围:Sink
    • 单位:毫秒
    • 默认值:5000
    • 核心作用:这是提升 Producer 吞吐量最核心的参数之一。它定义了 Producer 在发送一个批次(Batch)前,愿意等待多长时间以收集更多的消息。
  • 适用场景
    • 场景一:高吞吐量的数据写入
      • 典型现象:数据写入量巨大,需要最大化网络和 Broker 的处理效率。对端到端延迟不敏感(例如,分钟级的延迟可以接受)。
      • 处理方式:保持或适当增大默认值 5000(5秒)。更长的等待时间意味着 Producer 有更多机会将发往同一分区的消息打包成一个大的 Batch,从而获得更好的压缩率和吞-吐量。
    • 场景二:低延迟的实时数据写入
      • 典型现象:每一条数据都需要尽快被下游消费,延迟是关键指标。
      • 处理方式:减小 linger.ms,例如设置为 1000 表示不等待,有消息就尝试立即发送。
  • 调参建议
    • 吞吐优先5000 或更高。
    • 实时性优先0100 之间。
  • 参数联动
    • 此参数作为 job.writer.propertieslinger.ms 参数的默认值。如果在 properties 中显式设置了该参数,则此参数无效。
    • batch.size 共同作用。Producer 会在“批次大小达到 batch.size”或“等待时间超过 linger.ms”这两个条件任意一个满足时,发送该批次。
  • FAQ(用户提问)
    • 问:为什么我的数据写入任务,数据总要等好几秒才会在下游出现?
      • 答:很可能是因为 linger.ms 设置得比较大(如默认的 5000ms)。Producer 为了凑齐一个更大的批次,会把消息在本地缓存中“扣留”最多 5 秒。如果您对实时性要求很高,可以尝试将此参数调小,甚至设为 0
  • 注意事项
    • linger.ms 是在吞吐量和延迟之间做权衡的关键。没有绝对的“最优值”,只有最适合您业务场景的值。
  • 总结说明linger.ms 决定了 Producer 的“耐心程度”。一个有耐心的 Producer(高 linger.ms)会多等一会儿,把更多消息打包在一起再出发,这样更省力(吞吐高),但到达目的地会稍晚(延迟高)。一个没耐心的 Producer(低 linger.ms)则会立即出发,到达快(延迟低),但频繁出发会更累(吞吐低)。

job.writer.compression_type

  • 参数概述
    • 参数名job.writer.compression_type
    • 适用范围:Sink
    • 单位:无
    • 默认值SNAPPY
    • 核心作用:设计意图是用于指定消息的压缩算法,但当前版本仅在配置中定义,并未显式写入 Producer 配置
  • 注意事项
    • 当前版本建议通过 properties 设置:虽然此参数存在,但为了确保压缩配置能稳定生效,强烈建议通过 job.writer.properties 来设置,例如 {"compression.type": "snappy"}
    • 可用的压缩算法通常包括 none, gzip, snappy, lz4, zstd
  • 总结说明请注意,为了确保消息压缩功能生效,建议您不要直接使用 job.writer.compression_type 参数,而是在 job.writer.properties 中通过 compression.type 键来指定您想要的压缩算法(如 snappylz4)。开启压缩是提升 Kafka 写入吞吐量、节省网络和磁盘资源的最有效手段之一。

job.writer.big_message_threshold

  • 参数概述
    • 参数名job.writer.big_message_threshold
    • 适用范围:Sink
    • 单位:字节
    • 默认值:10485760 (10 MB)
    • 核心作用:设计意图是用于监控大消息,但当前版本仅在配置中定义,并未在发送链路中被引用
  • 注意事项
    • 当前版本无效:在目前的实现中,此参数不会触发任何大消息监控或处理逻辑。
  • 总结说明请注意,job.writer.big_message_threshold 参数在当前版本中是无效的,不会提供大消息监控功能。
最近更新时间:2026.05.07 14:11:24
这个页面对您有帮助吗?
有用
有用
无用
无用