本文为您介绍如何在 Pipeline 中配置、使用并发控制、重试机制以及数据回溯操作。
当同一条 Pipeline 在同一时间有多个实例需要运行时(如调度堆积、手动触发与自动调度并行、数据回溯批量生成),需要通过并发控制管理实例的同时执行数量。
在 Pipeline 配置中通过 concurrency 字段设置:
spec: trigger: type: scheduled cronExpression: expression: "0 * * * *" # 每小时 concurrency: 3 # 最多 3 个实例同时运行
配置项 | 说明 |
|---|---|
| 允许同时运行的最大实例数(默认为 1) |
新实例生成 → 检查当前运行中的实例数 → < concurrency → 立即执行 → ≥ concurrency → 进入等待队列 → 有实例完成后,队列中按优先级+业务日期顺序出队执行
当多个实例在队列中等待时,按以下顺序决定执行先后:
场景 | 推荐并发数 | 说明 |
|---|---|---|
核心 SLA 链路 | 1 | 严格串行,避免资源竞争和数据冲突。 |
一般日常 ETL | 1–3 | 适度并行,平衡效率与资源占用。 |
数据回溯场景 | 3–5 | 提高回溯速度,按实际资源调整。 |
无状态计算 | 5–10 | 无数据依赖冲突时可放大并行。 |
自依赖(dependsOn.self)和 concurrency: 1 都能实现串行执行,但机制不同:
对比维度 | 自依赖 (dependsOn.self) | 并发控制 (concurrency: 1) |
|---|---|---|
控制层级 | 依赖层(DAG 调度前) | 执行层(实例排队) |
上游失败影响 | 当天实例因依赖未满足而阻塞 | 上游失败后当天实例正常出队执行 |
适用场景 | 数据有严格先后依赖(增量处理) | 仅控制资源并发,数据无先后关系 |
队列管理 | 无队列,实例在依赖阶段等待 | 有显式等待队列 |
当 Activity 执行失败时,系统可自动重试。重试在 Activity 级别配置,通过 retryPolicy 字段定义:
activities: - name: extract_orders type: sql engineType: emr_serverless_spark sqlScript: extract_orders.sql retryPolicy: maxRetries: 3 # 最大重试次数 minRetryIntervalMillis: 60000 # 重试间隔(毫秒) timeoutSeconds: 3600 # 单次执行超时(秒) retryOnTimeout: true # 超时是否触发重试
字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| Integer | 否 | 0 | 最大重试次数。0 表示不重试 |
| Integer | 否 | 60000 | 两次重试之间的最小间隔(毫秒) |
| Integer | 否 | 0 | 单次执行超时时间(秒)。0 表示不限制 |
| Boolean | 否 | false | 超时后是否重试。为 true 时超时也触发重试 |
Activity 执行 → 执行成功 → SUCCEEDED → 执行失败(或超时) → 已重试次数 < maxRetries? → 是 → 等待 minRetryIntervalMillis → 重新执行(状态标记为 RETRYING) → 否 → 标记为 FAILED → 根据 dependencyConditions 决定下游行为
失败类型 | 推荐策略 | 说明 |
|---|---|---|
网络超时 / 资源不足 | 重试 2–3 次,间隔 60 秒 | 偶发问题通常重试可恢复 |
SQL 语法错误 | 不重试 (maxRetries: 0) | 重试不会修复代码错误 |
上游数据延迟 | 重试 3–5 次,间隔 300 秒 | 等待上游数据就绪 |
外部 API 限流 | 重试 3 次,间隔 120 秒 | 等待限流窗口过去 |
通过 retryPolicy.timeoutSeconds 设置单次执行的超时时间:
retryPolicy: timeoutSeconds: 3600 # 1 小时超时 retryOnTimeout: true # 超时后重试 maxRetries: 2
超时行为:
| 超时后行为 |
|---|---|
| 终止当前执行 → 触发重试(受 maxRetries 限制) |
| 终止当前执行 → 直接标记为 TIMEOUT |
注意
retryPolicy.timeoutSeconds 是强制终止,与告警中的"执行超时"不同。告警的执行超时仅发送通知而不终止任务。
Pipeline 级别没有单独的超时配置。Pipeline 的实际执行时间取决于所有 Activity 的执行时间之和(串行部分)和最长并行路径。
如需控制 Pipeline 整体完成时间,推荐使用告警中的"未按时完成"指标(参见第 11 章),在预期时间前发出预警。
数据回溯(Backfill)是指为历史业务日期重新生成 Pipeline 实例并执行,用于修复历史数据或补充处理遗漏的数据。
常见回溯场景:
通过 earliestBackfillTime 限制允许回溯的最早时间:
trigger: type: scheduled cronExpression: expression: "0 2 * * *" earliestBackfillTime: "2026-01-01T00:00:00+08:00" # 最早可回溯到 2026-01-01
字段 | 说明 |
|---|---|
| 允许回溯的最早业务日期。早于此日期的实例不可生成 |
如果未设置
earliestBackfillTime,默认为 Pipeline 的effectiveTime。
回溯实例同样遵循依赖规则:
依赖类型 | 回溯行为 |
|---|---|
Activity 依赖 | 同实例内 Activity 按 DAG 顺序执行 |
Pipeline 依赖 / Checker | 检查上游对应 bizDate 的实例是否成功 |
自依赖 | 回溯实例按业务日期从早到晚串行执行 |
关键场景——自依赖回溯:
当 Pipeline 配置了自依赖 dependsOn.self 时,回溯实例会严格按业务日期顺序串行执行:
回溯范围:06-01 ~ 06-05 06-01 实例执行 → 成功 → 06-02 实例执行 → 成功 → 06-03 实例执行 → 失败(阻塞后续) → 06-04 等待... → 06-05 等待...
此时需手动处理 06-03 的失败实例(修复后重跑或标记成功),后续实例才能继续。
回溯的并行度受以下因素共同影响:
因素 | 说明 |
|---|---|
回溯并行度设置 | 回溯操作时指定的并行度 |
Pipeline | Pipeline 的最大并发数 |
自依赖 | 有自依赖时强制串行,回溯并行度无效 |
实际并行度 = min(回溯并行度, Pipeline concurrency)
earliestBackfillTime 为 30 天前。以下示例展示了并发控制、重试和回溯相关配置的完整用法:
apiVersion: newide.studio.dataleap.volc/v1 kind: Pipeline metadata: name: daily_order_etl displayName: 每日订单 ETL runAs: data_engineer spec: trigger: type: scheduled cronExpression: expression: "0 2 * * *" frequency: daily priority: d2 effectiveTime: "2026-01-01T00:00:00+08:00" earliestBackfillTime: "2026-01-01T00:00:00+08:00" # 最早可回溯至上线日 concurrency: 1 # 同一时间仅运行 1 个实例 activities: - name: wait_for_upstream type: checker checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: raw_data_ingestion activityName: "" offsetsType: set offsets: ["0"] timeout: 7200 checkInterval: 60 - name: extract_orders type: sql engineType: emr_serverless_spark sqlScript: extract_orders.sql retryPolicy: maxRetries: 3 # 最多重试 3 次 minRetryIntervalMillis: 60000 timeoutSeconds: 1800 # 30 分钟超时 retryOnTimeout: true dependsOn: activities: - wait_for_upstream - name: transform_orders type: sql engineType: emr_serverless_spark sqlScript: transform_orders.sql retryPolicy: maxRetries: 2 minRetryIntervalMillis: 120000 timeoutSeconds: 3600 # 1 小时超时 retryOnTimeout: false dependsOn: activities: - extract_orders - name: load_to_warehouse type: sql engineType: emr_serverless_spark sqlScript: load_to_warehouse.sql retryPolicy: maxRetries: 2 minRetryIntervalMillis: 60000 dependsOn: activities: - transform_orders self: # 自依赖:等待前一天实例的此节点完成 offsetsType: set offsets: ["-1"]
实践 | 说明 |
|---|---|
核心链路设 concurrency: 1 | 避免多实例并行写入导致数据冲突。 |
区分可重试与不可重试错误 | 仅对偶发错误(网络、资源)配置重试,逻辑错误无需重试。 |
重试间隔逐步增大 | 多次重试时适当增大 minRetryIntervalMillis,避免短时间内反复失败。 |
回溯前确认上游 | 回溯前确认所有上游依赖的历史实例均已成功。 |
注意自依赖对回溯的影响 | 有自依赖的 Pipeline 回溯只能串行,需预估总耗时。 |
设置 earliestBackfillTime | 限制可回溯的最早时间,防止误操作产生大量历史实例。 |
监控回溯进度 | 大批量回溯时关注失败实例,及时修复以免阻塞后续实例 |