本文将为您介绍 DataSail 数据集成任务中 Doris Writer(Sink)侧的高级参数配置,包括各参数的适用场景、默认值、调优建议等,以帮助您在数据同步的实时性、吞吐量和稳定性之间实现最佳平衡。
job.writer.sink_flush_interval_ms job.writer.sink_flush_interval_ms典型现象 | 常见报错关键字 | 是否相关 | 建议调大/调小 |
|---|---|---|---|
数据写入延迟较高,实时性不满足要求:数据在源端产生后,需要等待较长时间才能在 Doris 中查询到。 | 是 | 调小 | |
Doris 端出现大量小文件或小批次导入:监控显示 Stream Load 的频率过高,但每次导入的数据量很小,增加了 Doris 的合并(Compaction)压力。 |
| 是 | 调大 |
任务在低流量时频繁刷写空数据或极少量数据,造成资源浪费。 | 是 | 调大 |
stream load errorStreamLoad errordoris sink error, retry timesFailed to load data into dorisCommit failed1000 -> 500。10000 -> 30000。sink_flush_interval_ms 与控制批次大小的参数 sink_record_size 和 sink_record_count 共同决定了数据刷写的时机。刷写操作会在 任意一个 阈值达到时触发。
sink_flush_interval_ms (时间): 无论数据量多少,时间一到就刷写。sink_record_count (行数): 无论耗时多久,达到指定行数就刷写。sink_record_size (字节数): 无论耗时多久,达到指定字节数就刷写。联动建议:
sink_record_count 或 sink_record_size 主导刷写,此时 sink_flush_interval_ms 主要作为“兜底”机制,防止数据因流量降低而长时间滞留在缓存中。sink_flush_interval_ms 成为主要触发条件。此时应适当调小该值,同时确保 sink_record_count 和 sink_record_size 不会设置得过大,以免时间到了但数据量一直达不到阈值。sink_flush_interval_ms 调到 1000ms,但数据延迟还是有 5-6 秒?sink_record_count (默认10万行) 或 sink_record_size (默认20MB) 的阈值,导致提前刷写。此外,Doris 端的数据导入和生效本身也需要时间 (通常是秒级),这也是延迟的一部分。job.writer.sink_flush_interval_ms 控制数据写入 Doris 的时间间隔,默认 5000 毫秒。若您需要更高的实时性,可适当调小此值(如 1000ms),但这会增加 Doris 的写入压力;若追求更高吞吐,可适当调大(如 10000ms),以合并更大批次的数据。请根据业务需求在延迟和吞吐之间进行权衡。
job.writer.sink_max_retries job.writer.sink_max_retries典型现象 | 常见报错关键字 | 是否相关 | 建议调大/调小 |
|---|---|---|---|
网络偶发抖动导致任务失败:任务日志中出现连接超时、读取超时或 |
| 是 | 适当调大 |
Doris 集群因临时高负载(如 Compaction 或大查询)导致写入请求被拒绝或超时。 |
| 是 | 适当调大 |
任务因 2PC 提交阶段的瞬时失败而中断,例如 Pre-commit 或 Commit 阶段与 FE 通信失败。 |
| 是 | 适当调大 |
排查问题时,希望任务快速失败,而不是长时间等待重试。 | 是 | 调小 |
doris sink error, retry timesStreamLoad errorstream load error, reason=Commit failedcommit transaction failedsink_max_retries 通常与控制超时的参数 request_connect_timeouts, request_read_timeouts 以及 request_retries 协同工作。
sink_max_retries: 控制的是整个写入批次(一个 Stream Load 周期)的重试逻辑。request_retries: 控制的是在发起单次 HTTP 请求(如获取 BE 节点、提交 2PC 事务)失败后的内部重试。联动关系:一个批次的写入失败(由 sink_max_retries 控制),其内部可能已经执行了多次 HTTP 请求的重试(由 request_retries 控制)。如果网络非常不稳定,同时调大 sink_max_retries 和 request_retries 能够提供更强的容错性。但请注意,总的等待时间会是两者与超时时间的乘积,可能导致任务卡顿时间很长。
label already exists 错误,增加重试次数有用吗?sink_label_prefix 有关。它表示上一个批次的 Commit 请求可能成功了,但系统没有收到确认,导致重试时使用了相同的 Label。Doris 的幂等性保证了数据不会重复,但这个错误本身可能会导致重试失败。增加重试次数对此类特定逻辑错误帮助有限,更应关注 sink_label_prefix 的配置和网络稳定性。schema error)等确定性的、无法通过重试解决的问题,系统会直接判定为失败而不会触发重试。重试主要针对网络问题、Doris 临时不可用等瞬时性、非确定性故障。job.writer.sink_max_retries 控制写入 Doris 失败后的最大重试次数,默认为 3 次,用于保障数据写入的稳定性。如果您的网络环境不稳定或 Doris 集群偶有抖动,可适当调大此值(如 5 次)以增强容错性;在排查问题时可调小至 0 或 1 以实现快速失败。
job.writer.sink_record_size job.writer.sink_record_size典型现象 | 常见报错关键字 | 是否相关 | 建议调大/调小 |
|---|---|---|---|
Doris 端出现大量小文件或小批次导入,增加了合并压力。通常伴随着高频的 Stream Load。 |
| 是 | 调大 |
任务吞吐量未达到预期,尤其是在高流量场景下,网络和 Doris 均有空闲资源。 | 是 | 调大 | |
任务内存占用过高,导致 OOM (Out of Memory) 风险。 |
| 是 | 调小 |
单行数据非常大(例如包含大文本字段),导致单行就超过默认阈值,批次效果差。 | 是 | 调大 |
stream load errorStreamLoad errordoris sink error, retry timesFailed to load data into dorisFailed parse responsesink_flush_interval_ms 配合,更快地触发刷写,略微提升实时性。sink_record_size 与 sink_record_count (行数) 和 sink_flush_interval_ms (时间) 共同决定刷写时机。刷写操作在三者中任意一个达到阈值时触发。
联动建议:
sink_record_size 和 sink_record_count 是最直接的批次大小控制器。通常,您只需要关心并调整这两个参数中的一个,另一个可以保持默认或设置为一个非常大的值,以避免干扰。sink_record_count 更直观。sink_record_size 能更好地控制内存使用和批次大小的稳定性。sink_buffer_size、sink_buffer_count 一起调整。批次大小 (sink_record_size) 不应远大于总缓冲区大小 (sink_buffer_size * sink_buffer_count),否则可能导致内存压力。job.writer.sink_record_count job.writer.sink_record_count典型现象 | 常见报错关键字 | 是否相关 | 建议调大/调小 |
|---|---|---|---|
Doris 端出现大量小文件或小批次导入,增加了合并压力。 |
| 是 | 调大 |
任务吞吐量未达到预期,尤其是在数据行本身较小的高流量场景。 | 是 | 调大 | |
任务内存占用过高,尤其是当 |
| 是 | 调小 |
stream load errorStreamLoad errordoris sink error, retry timesFailed to load data into dorisCommit failedsink_record_size & sink_record_count)sink_record_size 和 sink_record_count 我应该同时调整吗?sink_record_count 更直观。如果行大小不定,调 sink_record_size 更能稳定控制内存和批次。可以将另一个参数设置得非常大,使其基本不起作用。sink_flush_interval_ms 设置得过小,可能导致频繁的时间触发,而批次大小实际未达到;2) 批次大小超过了 Doris Stream Load 的推荐上限(通常建议单次导入在 100MB-1GB 之间),导致 Doris 处理效率下降;3) 增大了内存压力,可能引发 GC,影响任务性能。stream_load_max_mb 等参数,过大的批次可能被 Doris 拒绝。job.writer.sink_record_size (默认20MB) 和 job.writer.sink_record_count (默认10万行) 共同决定了向 Doris 写入数据时每个批次的大小。为了提升吞吐量和降低 Doris 压力,您可以适当调大这两个值中的一个;为了控制内存或在低流量下提升一点实时性,可以适当调小。通常建议您只调整其中一个,并将另一个设置得很大以避免干扰。
job.writer.sink_buffer_size job.writer.sink_buffer_size典型现象 | 常见报错关键字 | 是否相关 | 建议调大/调小 |
|---|---|---|---|
任务内存占用过高(OOM),尤其是在高并发或 |
| 是 | 调小 |
单行数据记录本身非常大,超过了 1MB,导致一条记录都无法装入一个缓冲区。 |
| 是 | 调大 |
系统吞吐量达到瓶颈,且 CPU 和网络资源有富余,可能是缓冲区设置不当导致的数据流转不畅。 | 可能相关 | 适当调大 |
failed to stream load datastream load errorStreamLoad errorOutOfMemoryError4194304) 或 8MB (8388608),以容纳更多的记录,减少缓冲区交换的频率。524288) 或 256KB (262144)。job.writer.sink_buffer_count job.writer.sink_buffer_count典型现象 | 常见报错关键字 | 是否相关 | 建议调大/调小 |
|---|---|---|---|
任务因数据处理(序列化)与网络发送速度不匹配而出现卡顿。例如,网络发送慢,导致处理线程无可用缓冲区可写。 | 是 | 调大 | |
任务内存占用过高(OOM),尤其是在 |
| 是 | 调小 |
希望在不增加单个缓冲区大小的情况下,提升系统的缓冲能力。 | 是 | 调大 |
failed to stream load datastream load errorStreamLoad errorOutOfMemoryErrorsink_buffer_size & sink_buffer_count)这两个参数直接决定了写入端的总缓冲能力:总缓冲内存 ≈ sink_buffer_size × sink_buffer_count × 并发度(parallelism)
sink_record_size 的关系:一个批次的数据(由 sink_record_size 或 sink_record_count 决定)在发送前会被写入这些 Buffer。理论上,总缓冲内存应至少能容纳一个完整的批次,并有一定余量。如果 sink_record_size 远大于 sink_buffer_size,意味着一个批次的数据需要跨越多个 Buffer,这会增加管理的复杂性。sink_buffer_size,同时调大 sink_buffer_count。sink_buffer_count 不变的情况下,主要调小 sink_buffer_size。sink_record_size 有什么区别?sink_record_size 定义了“包裹”的大小(我要寄多大一箱货),而 sink_buffer_size 和 sink_buffer_count 定义了“仓库”的容量和隔间(我有多少个、多大的临时货架来放这些包裹)。“包裹”打包好后,会放到“货架”上,等待卡车(网络线程)来拉走。sink_buffer_size × sink_buffer_count × 4。此外,JVM 的内存管理机制也可能导致您在监控中看到的内存变化有一定延迟。jmap 或其他 JVM 工具了解当前任务的内存分配情况。调整后,密切关注 GC 日志和内存监控,确保没有引入新的稳定性问题。sink_buffer_size (单个缓冲区大小) 和 sink_buffer_count (缓冲区数量) 共同决定了数据写入 Doris 前的内存缓冲能力,其总大小约为 size * count * 并发数。若任务因内存溢出(OOM)而失败,应适当调小它们;若网络延迟高或处理/发送速度不均,可适当增加 sink_buffer_count 以平滑数据流。请在调整时密切关注任务的内存使用情况。
job.writer.sink_label_prefix job.writer.sink_label_prefix"" (空字符串)典型现象 | 常见报错关键字 | 是否相关 | 建议调大/调小 |
|---|---|---|---|
任务因网络或超时问题重试时,出现 |
| 是 | 建议配置 |
需要从外部系统排查或关联 DTS 写入的特定批次。例如,希望在 Doris 的导入历史中,能快速识别出由某个特定任务生成的导入作业。 | 是 | 建议配置 | |
多个不同的任务可能在极短时间内向同一个 Doris 表写入数据,存在极低概率的 Label 冲突风险(当默认的随机生成策略恰好产生相同 Label 时)。 | 是 | 建议配置 |
Label already existLabel already exist and load job finished, change you label prefix or restore from latest savepoint!has already been used, relate to txnis already COMMITTED, not pre-committed.try abort committed transaction, do you recover from old savepoint?my_project_user_table_sync_[sink_label_prefix]_[checkpoint_id]_[subtask_id] 这样的结构。当任务发生 Checkpoint 级别的重试(Failover)时,它会使用相同的 checkpoint_id 和前缀来重新生成 Label。如果上一次的导入已经成功,Doris 会直接返回成功,任务可以继续,有效避免了 Label already used 导致的失败。SHOW LOAD 结果或 fe.log 中,可以根据这个固定的前缀快速筛选和定位到特定任务的导入记录,极大地方便了问题排查。sink_label_prefix 与 sink_enable_2PC (两阶段提交) 紧密相关。
sink_enable_2PC 启用时,配置一个固定的 sink_label_prefix 是强烈推荐的。这确保了从 Pre-commit 到 Commit 再到重试的整个过程中,事务的幂等性标识是稳定和可预测的。sink_label_prefix,虽然 2PC 机制本身能保证事务的最终一致性,但在某些异常恢复场景下,日志中可能会出现与 Label 相关的告警或错误,增加排查难度。[项目名]_[业务]_[表名] 的组合,并确保它在您的所有 Doris 写入任务中是唯一的。这样既能避免冲突,也方便后续运维。sink_label_prefix 后,是不是就不会再有 Label already used 的错误了?sink_label_prefix)并同时运行,它们可能会因为生成完全相同的 Label 而产生冲突。因此,确保前缀的唯一性非常重要。Label already used 错误而中断,尽管数据在 Doris 端实际已经成功。配置前缀能让任务在这种情况下更平滑地恢复。sink_label_prefix 的核心价值在于其在多个任务之间的唯一性。请建立命名规范,确保不会在不同的并发任务中使用了相同的前缀写入同一个表。job.writer.sink_label_prefix 用于为 Doris 导入任务设置一个固定的、唯一的 Label 前缀,以增强写入操作的幂等性(防止数据重复)。强烈建议在所有生产任务中,将此参数配置为任务的唯一标识(如任务ID或作业名),这能有效避免因网络重试导致的 Label already used 错误,并极大地方便问题排查。
request_*) 这组参数控制着写入任务与 Doris FE(Frontend)节点进行 REST API 通信时的网络行为,包括建连超时、读取超时和内部重试。它们是应对网络不稳、FE 繁忙等问题的关键调节旋钮。
job.writer.request_connect_timeouts: 建立到 Doris FE HTTP 服务的连接时,允许等待的最长时间。job.writer.request_read_timeouts: 连接建立后,等待 FE 返回响应数据的最长时间。job.writer.request_retries: 当上述请求发生可重试的失败(如超时)时,内部执行的最大重试次数。job.writer.request_connect_timeoutsjob.writer.request_read_timeoutsjob.writer.request_retries*_timeouts: 毫秒 (ms)*_retries: 次request_connect_timeouts: 30,000 (30秒)request_read_timeouts: 30,000 (30秒)request_retries: 3典型现象 | 常见报错关键字 | 是否相关 | 建议调大/调小 |
|---|---|---|---|
跨机房/公网等高延迟网络环境下,任务因连接超时失败。 |
| 是 | 调大 |
Doris FE 节点因元数据加载、大查询规划或 GC 暂停,导致 API 响应缓慢,任务读取超时。 |
| 是 | 调大 |
网络偶尔丢包或抖动,导致与 FE 的通信瞬间失败。 |
| 是 | 调大 |
希望在 FE 彻底无响应时能快速失败,不等太久。 | 是 | 调小全部三个参数 |
Failed to connected dorisFailed to get response from Doris FEFailed to get response from DorisDoris FE nodeNo Doris FE is available, please check configurationFailed to get backendFe nodes not availablerequest_connect_timeouts: 建议调大至 60000 (60秒)。request_read_timeouts: 建议调大至 60000 (60秒) 或更高。request_retries: 建议调大至 5 次。request_read_timeouts。如果 FE 经常因为负载问题导致响应慢,可以大胆地将其设置为 120000 (2分钟) 甚至更长。*_timeouts: 调小至 5000 (5秒)。*_retries: 调小至 0 或 1。这三个参数紧密耦合,共同决定了单次 API 交互的最大容忍时间。
request_connect_timeouts + request_read_timeouts) × (request_retries + 1)job.writer.sink_max_retries 的关系:
request_retries 是内部 HTTP 请求级别的重试。sink_max_retries 是整个数据批次写入流程(Stream Load)级别的重试。request_* 系列参数来应对网络和 FE 的瞬时问题。sink_max_retries 作为更上一层的保障。connect_timeouts 和 read_timeouts 有什么区别?connect_timeouts 是指从发起连接请求到成功建立 TCP 连接的最长等待时间,主要受网络延迟影响。read_timeouts 是指连接成功后,发送数据到开始接收到第一个字节响应的最长等待时间,主要受对端服务器(这里是 Doris FE)处理请求的速度影响。request_retries,任务还是因为超时失败了?request_retries 只在发生可重试的异常(如超时)后生效。如果您的超时时间设置得过短,每次重试都可能因为同样的原因再次超时,最终耗尽重试次数。在这种情况下,您应该首先考虑延长超时时间 (*_timeouts)。stream_load_properties 中的 timeout 参数控制。request_connect_timeouts (连接超时), request_read_timeouts (读取超时) 和 request_retries (重试次数) 控制着与 Doris FE 节点的网络通信行为。当您的网络环境延迟高或 FE 节点负载重时,可适当调大这些值(如超时时间至 60 秒,重试次数至 5 次)以增强任务稳定性;反之,在排障时可调小以实现快速失败。
job.writer.stream_load_properties job.writer.stream_load_properties{} (空 Map)这个参数非常灵活,其适用场景取决于您想使用哪个具体的 Stream Load 原生参数。以下是一些常见的例子:
典型现象 / 需求 | 相关的 Stream Load 参数 | 是否相关 | 建议配置 |
|---|---|---|---|
数据传输本身耗时很长,导致写入因超时失败,尤其是在写入大批量数据或跨公网时。 |
| 是 | 增加 |
源数据中存在少量脏数据(如格式错误),不希望因此导致整个批次失败。 |
| 是 | 设置合理的 |
需要对写入的数据进行一些过滤或转换,例如使用 |
| 是 | 配置 |
希望关闭部分列的严格模式或进行时区转换等。 |
| 是 | 按需配置 |
需要指定写入特定的存储序列(Sequence Column),用于 UNIQUE KEY 模型的行序控制。 |
| 是 | 指定 |
stream load error:StreamLoad errorFailed to load data into dorisETIMEDOUTtimeoutmax_filter_ratio配置格式为 JSON Map 形式的字符串,例如: {"timeout":"600", "max_filter_ratio":"0.1"}
timeout (单位: 秒)sink_record_size 估算传输和处理时间,并设置一个更长的超时。
{"timeout": "600"} (10分钟)ETIMEDOUT 或 gateway timeout 错误。max_filter_ratio (0.0 到 1.0 之间的小数){"max_filter_ratio": "0.05"} (允许 5% 的数据被过滤)whereWHERE 子句中的表达式,用于在数据写入前进行过滤。
{"where": "age > 18 and city = 'Beijing'"}where 条件的数据才会被最终写入 Doris,实现了在 Sink 端的行级过滤。stream_load_properties 中的参数是直接透传给 Doris 的,因此它们的行为和联动关系遵循 Doris Stream Load 的官方文档。
timeout vs. request_read_timeouts:
request_read_timeouts 控制的是与 FE 的元数据交互超时。stream_load_properties 中的 timeout 控制的是向 BE 传输和处理数据的整个 Stream Load 过程的超时。max_filter_ratio 之后,在哪里能看到哪些数据被过滤了?fe.log 中,或通过 SHOW LOAD 命令查看特定 Label 的导入结果,其中会有一个 URL。访问该 URL 可以获取到详细的错误信息和被过滤的脏数据行。sink_max_retries)是 DTS 引擎层面的封装。stream_load_properties 则是一个“逃生舱”或“高级通道”,它允许您使用那些 DTS 尚未直接封装的、更底层的 Doris 原生功能。如果同一个功能既有 DTS 参数,又可以在这里配置,通常 DTS 参数的优先级更高或两者会合并,具体行为可能因版本而异,建议避免重复配置。"true" 或 "false",数值也是字符串形式)。curl 命令手动构造 Stream Load 请求进行验证,确保参数行为符合预期。job.writer.stream_load_properties 是一个高级配置项,允许您直接向 Doris 写入任务透传其原生的导入参数(如 timeout, max_filter_ratio 等)。当您需要容忍少量脏数据、延长数据导入超时时间或使用其他 DTS 未直接提供的 Doris 高级功能时,可以通过这个参数灵活实现。使用前请参考您 Doris 版本的官方文档以确保参数的有效性。