本文将为您介绍 DataSail Kafka Connector(Source/Sink)的高级参数配置,包括连接信息、消费策略、序列化格式等,可以帮助您提升数据同步的实时性、优化吞吐量,并解决日常运维中遇到的问题。
本章节详细介绍 Kafka Source 在读取数据时可供配置的高级参数。
job.reader.bootstrap_servers(别名: kafka_servers)host1:port1,host2:port2,...。Failed to connect to broker, Connection refused, No resolvable bootstrap urls given in bootstrap.serversjob.reader.bootstrap_servers 是连接 Kafka 集群的“敲门砖”,您需要在此提供至少一到两个 Kafka Broker 的地址和端口,以确保数据同步任务能够成功启动。为提高连接的可靠性,推荐配置 2-3 个地址。job.reader.topics, 分隔。topic_A,topic_B。job.reader.topics 参数,您可以精确指定任务需要读取的一个或多个数据源(Topic)。如果您的业务数据分散在不同的 Topic 中,可以使用逗号将它们隔开,实现一次性订阅。job.reader.group_idgroup.id。group.id,可以确保每个 Topic 分区只被一个实例消费,避免数据重复处理。group.id 不变。Kafka Broker 会为每个消费者组记录其消费进度(Offset),重启时消费者可以根据 group.id 找到之前的进度并继续。group.id。但请谨慎操作,确保您了解其影响。job.reader.group_id 与 Kafka 的 Offset 提交机制紧密相关。消费位点的记录是基于消费者组的。job.reader.connector.auto_offset_reset 参数联动,决定了当为一个新的 group.id 首次启动消费,或当已记录的 Offset 失效时,从何处开始读取数据。group_id 会怎样?job.reader.connector.auto_offset_reset 参数所指定的位置(如 earliest 或 latest)开始消费,而不是从上次的进度继续。这通常会导致数据重复处理或丢失。group.id 是实现消费负载均衡和故障恢复的关键。除非有明确的业务需求,否则不要轻易修改或在不同任务间共用。group.id 是保证数据不重不漏、断点续传的基础。job.reader.group_id 相当于您的数据读取任务在 Kafka 中的“身份标识”。它帮助 Kafka 记住您的任务读到了哪里,并在任务重启或扩容时,确保数据能够无缝衔接、不重不漏。通常,您无需关心此参数,使用系统默认值即可。job.reader.format_type(别名: content_type)jsonjson 或保持默认。csv。json。job.reader.use_multi_table_record=true)使用更高级的格式类型。format_type 与消息的实际格式不匹配,会导致数据解析失败,作业通常会报错退出。job.reader.format_type 参数告诉数据处理系统“如何读懂”从 Kafka 传来的消息。就像您需要用正确的钥匙开锁一样,这里需要根据您的数据源格式(如 JSON 或 CSV)进行正确设置,以确保数据能被顺利解析。job.reader.start_offset / job.reader.end_offsetstart_offset 定义从哪个 Offset 开始读,end_offset 定义读到哪个 Offset 为止(包含该 Offset)。这两个参数主要用于批处理或数据重跑场景。start_offset 和 end_offset 来框定需要重跑的数据区间。start_offset 和 end_offset。end_offset。start_offset 被设置(值不为 -1)时,会优先于 start_timestamp(时间戳模式)。start_offset 会被系统限制在分区的有效范围内(不会小于 earliest)。end_offset 同样会被限制在分区的有效范围内(不会大于 latest)。end_offset 的区间是闭合的,即包含 end_offset 本身。系统内部处理时会转换为右开区间 [start, end + 1)。end_offset 会被忽略。start_offset 和 end_offset 提供了对数据读取范围最精确的控制,如同给书本加上了起始和结束的书签。它们非常适合一次性的数据重跑或批处理任务,但不应用于需要长期运行的实时数据流。job.reader.start_timestamp / job.reader.end_timestampstart_timestamp 为该时间点对应的时间戳。2023-10-26 00:00:00 到 2023-10-26 23:59:59)的数据。start_timestamp 和 end_timestamp。start_offset 未设置时,start_timestamp 才会生效。start_timestamp 的行为可能会退化为从 latest 开始,导致看似没有消费到数据。end_timestamp 如果不合法(例如晚于当前时间,或早于等于 start_timestamp),作业会启动失败。start_timestamp 为一个小时前,为什么作业启动了但一条数据都没读到?start_timestamp 和 end_timestamp 是更友好的选择。它们让您可以按“小时”、“天”等业务维度来读取数据,尤其适合批处理和数据分析场景。job.reader.start_date / job.reader.end_datestart_timestamp 和 end_timestamp 的一种易用性封装,允许用户使用日期时间字符串来指定起止范围,而不是手动计算时间戳。2023-10-27 08:00:00 开始消费,但不希望自己去查找这个时间对应的秒级或毫秒级时间戳。job.reader.start_date: "20231027080000" 并配合 job.reader.date_format 使用。job.reader.date_format 参数配合使用。date_format 的默认值为 yyyyMMddHHmmss。start_timestamp / end_timestamp。start_timestamp / end_timestamp 未设置时,系统会尝试使用 start_date / end_date 和 date_format 来解析成时间戳。如果 timestamp 和 date 都设置了,timestamp 优先。date_format 必须与您提供的 start_date / end_date 格式严格匹配,否则会导致解析失败。start_date 和 end_date 是 start_timestamp 和 end_timestamp 的“人性化”版本,让您能用熟悉的日期格式(如 20231027080000)来设定数据读取的时间范围,省去了计算时间戳的麻烦。job.reader.discovery_interval_seconds-1。系统只会在作业启动时发现一次分区。300(表示每 5 分钟检查一次)。-1。避免了周期性检查带来的额外开销。60 或 300。值越小,发现新分区的延迟越低,但对 Kafka Broker 的请求也越频繁。请根据实际分区变更的频率来权衡。discovery_interval_seconds 参数保持了默认值 -1,导致任务只在启动时扫描了一次分区。对于需要适应分区动态变化的流式任务,应将此参数设置为一个大于等于 0 的值(如 300 秒),以开启周期性分区发现功能。-1 即可。discovery_interval_seconds 决定了您的数据读取任务是否具备“动态视觉”。对于一次性运行的批处理任务,默认值 -1(仅在启动时看一眼分区情况)是最高效的。而对于需要长期在线、并适应上游 Topic 分区数量变化的流式任务,您需要给它设置一个正数(如 300),让它每隔一段时间就“抬头”看看有没有新的分区需要处理。job.reader.max_partition_num_per_consumerjob.reader.reader_parallelism_num)时,此参数用于“建议”一个合理的并发数。计算逻辑为:建议并发度 = ceil(总分区数 / 每个并发实例能处理的最大分区数)。job.reader.reader_parallelism_num,而是调整 max_partition_num_per_consumer。例如,设置为 5,系统就会建议 ceil(50 / 5) = 10 的并发度。1,意味着系统会建议与分区数相等的并发度,最大化利用并行处理能力。10,则一个并发实例最多会处理 10 个分区的数据。job.reader.reader_parallelism_num 的“候补”。如果 reader_parallelism_num 被显式设置,则 max_partition_num_per_consumer 的建议值不会生效。max_partition_num_per_consumer 提供了一种间接的控制方式。它通过设定“每个工人(并发实例)最多能干多少活(处理几个分区)”,来帮助系统估算出总共需要多少工人。默认情况下,系统建议一个工人只干一个分区的活,以实现最大化的并行处理。job.reader.commit_in_checkpointFALSEFALSE。在这种模式下,每次从 Kafka 拉取一批数据(poll)后,系统会立即同步提交(commitSync)位点。TRUE。此时,位点提交会绑定在整个任务的检查点(Checkpoint)机制中。只有当一个 Checkpoint 成功完成(意味着该 Checkpoint 周期内的数据都已被下游正确处理),相应的 Kafka Offset 才会被提交。FALSE。这是一种高效且广泛适用的模式。TRUE。TRUE 时,Offset 提交的频率和时机完全由 Checkpoint 的间隔和成功与否决定。commit_in_checkpoint 设置为 TRUE,就可能观察到此现象。因为 Offset 只在 Checkpoint 成功时才提交,所以在两个 Checkpoint 之间的时间里,从 Kafka 的视角看,消费位点没有更新,Lag 似乎在持续增长。一旦 Checkpoint 完成,位点一次性向前跳跃,Lag 随之骤降。这是此模式下的正常表现。TRUE 会增加数据处理的端到端延迟,因为数据必须等待整个 Checkpoint 完成后,其位点才被确认。TRUE 模式(例如,下游 Sink 不支持事务)可能并不能带来 Exactly-Once 的保证,反而会使系统行为变得复杂。job.reader.commit_in_checkpoint 控制着任务向 Kafka“汇报”工作进度的时机。默认 FALSE 是“随手汇报”模式,效率高,延迟低。设置为 TRUE 则是“等工作阶段性总结(Checkpoint)完成后再统一汇报”的模式,它能提供更强的数据一致性保证(Exactly-Once),但会牺牲一定的实时性,且需要与支持事务的下游存储配合使用。对于绝大多数场景,保持默认即可。job.reader.metadata_columnstimestamp),用于数据审计、去重或问题排查。offset,timestamp。这样,输出的每条数据都会额外增加 offset 和 timestamp 两个字段。key,headers。timestamp, offset, key, value, topic, partition, headers。, 分隔。timestamp,一个名为 event_timestamp 的元字段也可能会被错误地匹配。因此,建议使用精确、完整的元字段名列表。job.reader.metadata_columns 参数像一个“附加信息”开关。当您不仅关心 Kafka 消息的内容(Value),还关心它的“身份证信息”(如来自哪个 Topic、哪个分区、在什么时间、什么位置)时,可以通过这个参数将它们提取出来,作为数据记录的一部分向下游传递。job.reader.properties(别名: extra_raw_properties)fetch.min.bytes, max.poll.records, receive.buffer.bytes 等具体的消费者行为参数,以进行深度性能调优。properties 中以 key:value 对的形式添加,例如 {"fetch.min.bytes": "1048576"}。ssl.truststore.certificates 等相关参数。如果证书内容是 Base64 编码的,系统会自动尝试解码。fetch.min.bytes:指定 consumer.poll() 返回的最小数据量,能减少 Broker 和 Consumer 的交互次数,提高吞吐,但会增加延迟。fetch.max.wait.ms:配合 fetch.min.bytes 使用,表示为了达到最小数据量愿意等待多久。max.poll.records:单次 poll() 返回的最大记录数。fetch.min.bytes 和 fetch.max.wait.ms,甚至设为 0 或 1,让 Consumer 不等待,有数据就立即返回。request.timeout.ms 等参数来应对网络不稳定的情况。max.poll.interval.ms。properties 设置的参数,其优先级高于其他 job.reader.connector.* 等快捷参数。请注意配置的覆盖关系。job.reader.properties 是一个面向高级用户的“专家模式”入口。当标准参数无法满足您精细的调优需求时,您可以通过这里直接向 Kafka 客户端传递任何原生配置指令。这为您提供了最大的灵活性,但同时也要求您对 Kafka 的消费者参数有深入的了解。job.reader.connector.auto_offset_resetlatest(对于流式作业);earliest(对于批处理作业)latest(默认)。earliest。none。latest,避免任务启动时处理大量无关的历史数据。earliest,确保不会遗漏任何数据。earliest 并使用新的 group.id 来重跑数据进行验证。group.id 已经有提交过的 Offset,则该参数不会生效。earliest,以保证批处理任务能够读取到指定范围内的数据。auto.offset.reset 设置为了 earliest。对于流处理任务,这通常是不期望的行为。请检查该参数是否被设为 latest。另外,也请检查 group.id 是否每次都发生了变化,一个全新的 group.id 也会触发此策略。auto_offset_reset 决定了当您的读取任务“迷路”时(比如第一次启动,或者之前的进度记录丢失了),它应该从哪里重新开始。latest 表示“从最新的地方开始,只看未来”,适合实时流任务。earliest 表示“从最古老的地方开始,从头来过”,适合需要完整历史数据的批处理任务。job.reader.read_from_earliest_offset / job.reader.read_to_latest_offsetfalse / falsetrue 不会产生预期的效果(即不会自动将读取范围设定为 Topic 的最早到最新)。job.reader.connector.auto_offset_reset: earliest 配合流式(Unbounded)模式,或通过 start_offset/timestamp 和 end_offset/timestamp 精确控制批处理范围。job.reader.read_from_earliest_offset 和 job.reader.read_to_latest_offset 这两个参数在当前版本中是无效的。如果您需要控制读取的起始位置,请使用 start_offset、start_timestamp 或 auto_offset_reset 等其他参数。job.reader.wait_poll_empty_times / job.reader.wait_poll_empty_intervalendOffset 后就会结束。job.reader.wait_poll_empty_times 和 job.reader.wait_poll_empty_interval 这两个参数在当前版本中是无效的。批处理作业在读取到其定义的结束位置后会直接完成,不会进行额外的等待。本章节详细介绍 Kafka Sink 在写入数据时可供配置的高级参数。
job.writer.bootstrap_servers(别名: kafka_servers)host1:port1,host2:port2,...。job.reader.bootstrap_servers 作用相同,是连接 Kafka 集群的基础核心配置。job.writer.bootstrap_servers 是数据写入任务连接 Kafka 集群的入口。您需要在此提供一到两个 Kafka Broker 的地址和端口,以确保数据能够被成功发送。job.writer.topic_namejob.writer.topic_name 明确了数据处理结果的目的地。所有数据将被发送到您在此指定的 Kafka Topic 中。job.writer.format_type(别名: content_type)jsonjson 或保持默认。DEBEZIUM_JSON 等特定格式。json 是最通用和广泛支持的格式,建议优先使用。format_type 必须与下游消费者的期望格式一致,否则会导致下游解析失败。job.writer.format_type 决定了您的数据将以何种“语言”(格式)写入 Kafka。json 是最通用的选择,能确保最好的兼容性。job.writer.key_fieldsuser_id 的所有相关操作事件,必须按顺序被处理。key_fields: "user_id"。Kafka Producer 会确保相同 Key 的消息被发送到同一个分区,从而保证了分区内的消费顺序。city_id)将数据均匀地分布到不同的分区,以便下游按城市并行处理。key_fields: "city_id"。, 分隔,它们的值会拼接在一起作为 Key。key_fields 会导致消息的 Key 为 null,此时 Producer 会以轮询(Round-Robin)的方式将消息随机发送到不同分区。这会打乱数据的顺序。job.writer.key_fields 用于从您的数据中提取“业务主键”,并将其作为 Kafka 消息的 Key。这非常重要,因为它能确保例如“同一个订单的所有更新事件”会被发送到同一个分区并被顺序处理。如果您关心数据的处理顺序,就应该配置它。job.writer.partition_fieldpartition_field。例如,partition_field: "hash_code_field",系统会用这个字段的值来计算分区号。job.writer.key_fields 让 Producer 自动处理分区是更简单和推荐的做法。key_fields 和 partition_field,分区逻辑以 partition_field 为准。partition_field 对应的值分布不均匀,会导致下游分区负载不均衡。job.writer.partition_field 提供了一种手动指定分区逻辑的底层方法。对于绝大多数用户,我们推荐使用更上层的 key_fields 参数来自动处理数据分区和排序,而无需关心此参数。job.writer.enable_headersTRUEDEBEZIUM_JSON)下,此参数决定是否将数据记录中的元信息(metadata)提取出来,并作为 Kafka 消息的 Headers 发送。DEBEZIUM_JSON 格式进行数据同步时,希望将上游数据库的表名、操作类型(I/U/D)等元信息通过 Headers 传递给下游。TRUE。TRUE 即可。json 格式,此开关可能不起作用。job.writer.enable_headers 主要用于 CDC(变更数据捕获)场景,它允许将数据的“附加信息”(如来源表、变更类型)放在消息的 Headers 中一并发送。通常保持默认即可。job.writer.log_failures_onlyFALSEFALSE。一旦 Producer 发送失败(在重试之后仍然失败),会向上抛出异常,导致整个任务失败。TRUE。此时,发送失败的消息只会被记录一条错误日志,任务会继续运行处理后续数据。FALSE。这是最安全的选择,确保您能第一时间感知到写入问题。TRUE。TRUE 意味着您接受了“数据可能丢失”的风险。请务必确保有相应的日志监控和告警机制,来发现那些被“跳过”的失败消息。retries 参数控制)。job.writer.log_failures_only 是一个“容错开关”。默认 FALSE 表示“零容忍”,任何数据写入失败都会让整个任务停下来并告警。设置为 TRUE 则表示“有一定容忍度”,允许任务在记录一条错误日志后,跳过失败的数据继续前进。请根据您的业务对数据完整性的要求来选择。job.writer.properties(别名: extra_raw_properties)batch.size, linger.ms, compression.type, acks 等核心 Producer 参数以平衡吞吐量和延迟。properties 中添加,例如 {"batch.size": "65536", "compression.type": "snappy"}。buffer.memory, max.in.flight.requests.per.connection 等参数。batch.size:Producer 会尝试将发往同一分区的多条消息打包成一个更大的批次,能显著提高压缩率和网络效率,但会增加延迟。linger.ms:Producer 发送前愿意为收集更多消息而等待的时间。与 batch.size 配合,可以进一步提升批量效果。compression.type:如 snappy, lz4, zstd。压缩可以大幅减少网络传输和磁盘占用,是提升吞吐的利器。linger.ms(例如设为 0):让 Producer 不等待,有消息就立刻发送。batch.size:减少批量带来的延迟。acks=all (或 -1):确保消息不仅被 Leader Broker 接收,还被同步到了所有 ISR(In-Sync Replicas)副本后,才算发送成功。这是最高的数据可靠性保证,但会牺牲性能。properties 设置的参数,会覆盖 job.writer.retries, job.writer.retry_backoff_ms, job.writer.linger_ms 等快捷参数的默认值。batch.size。job.writer.properties 是写入侧的“专家模式”入口,赋予您完全控制 Kafka Producer 行为的能力。通过调整批量大小(batch.size)、等待时间(linger.ms)、压缩方式(compression.type)和确认机制(acks),您可以在吞吐量、实时性和可靠性之间做出最符合您业务需求的权衡。job.writer.retries10 是一个比较合理的默认值,能应对大多数临时性故障。Integer.MAX_VALUE,以尽可能地确保消息被送达。job.writer.properties 中 retries 参数的默认值。如果在 properties 中显式设置了 retries,则此参数无效。retry.backoff.ms 联动,后者定义了每次重试之间的等待间隔。max.in.flight.requests.per.connection > 1)。因此,开启重试是安全的。job.writer.retries 是一个自动的“重试小助手”。当数据发送遇到网络抖动等临时问题时,它会自动尝试再次发送,最多重试 10 次。这大大提高了数据写入的成功率和任务的健壮性,通常您无需修改此参数。job.writer.retry_backoff_ms1000 毫秒是一个适中的值。job.writer.properties 中 retry.backoff.ms 参数的默认值。如果在 properties 中显式设置了该参数,则此参数无效。retry_backoff_ms 告诉“重试小助手”在每次失败后要“休息”多久再试。默认 1 秒的间隔,有助于避免在 Broker 繁忙或故障时进行无效的密集攻击,让重试更智能、更高效。job.writer.linger_ms5000(5秒)。更长的等待时间意味着 Producer 有更多机会将发往同一分区的消息打包成一个大的 Batch,从而获得更好的压缩率和吞-吐量。linger.ms,例如设置为 10 或 0。0 表示不等待,有消息就尝试立即发送。5000 或更高。0 到 100 之间。job.writer.properties 中 linger.ms 参数的默认值。如果在 properties 中显式设置了该参数,则此参数无效。batch.size 共同作用。Producer 会在“批次大小达到 batch.size”或“等待时间超过 linger.ms”这两个条件任意一个满足时,发送该批次。linger.ms 设置得比较大(如默认的 5000ms)。Producer 为了凑齐一个更大的批次,会把消息在本地缓存中“扣留”最多 5 秒。如果您对实时性要求很高,可以尝试将此参数调小,甚至设为 0。linger.ms 是在吞吐量和延迟之间做权衡的关键。没有绝对的“最优值”,只有最适合您业务场景的值。linger.ms 决定了 Producer 的“耐心程度”。一个有耐心的 Producer(高 linger.ms)会多等一会儿,把更多消息打包在一起再出发,这样更省力(吞吐高),但到达目的地会稍晚(延迟高)。一个没耐心的 Producer(低 linger.ms)则会立即出发,到达快(延迟低),但频繁出发会更累(吞吐低)。job.writer.compression_typeSNAPPYproperties 设置:虽然此参数存在,但为了确保压缩配置能稳定生效,强烈建议通过 job.writer.properties 来设置,例如 {"compression.type": "snappy"}。none, gzip, snappy, lz4, zstd。job.writer.compression_type 参数,而是在 job.writer.properties 中通过 compression.type 键来指定您想要的压缩算法(如 snappy 或 lz4)。开启压缩是提升 Kafka 写入吞吐量、节省网络和磁盘资源的最有效手段之一。job.writer.big_message_thresholdjob.writer.big_message_threshold 参数在当前版本中是无效的,不会提供大消息监控功能。