You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件

大数据研发治理套件

复制全文
下载 pdf
调度与依赖
并发控制、重试、回溯
复制全文
下载 pdf
并发控制、重试、回溯

本文为您介绍如何在 Pipeline 中配置、使用并发控制、重试机制以及数据回溯操作。

并发控制

并发的含义

当同一条 Pipeline 在同一时间有多个实例需要运行时(如调度堆积、手动触发与自动调度并行、数据回溯批量生成),需要通过并发控制管理实例的同时执行数量。

并发配置

在 Pipeline 配置中通过 concurrency 字段设置:

spec:
  trigger:
    type: scheduled
    cronExpression:
      expression: "0 * * * *"     # 每小时
  concurrency: 3                   # 最多 3 个实例同时运行

配置项

说明

concurrency

允许同时运行的最大实例数(默认为 1)

并发运行机制

新实例生成
  → 检查当前运行中的实例数
    → < concurrency → 立即执行
    → ≥ concurrency → 进入等待队列
      → 有实例完成后,队列中按优先级+业务日期顺序出队执行

等待队列排序

当多个实例在队列中等待时,按以下顺序决定执行先后:

  1. 优先级:高优先级实例优先执行。
  2. 业务日期:同优先级下,较早的业务日期优先。

并发配置建议

场景

推荐并发数

说明

核心 SLA 链路

1

严格串行,避免资源竞争和数据冲突。

一般日常 ETL

1–3

适度并行,平衡效率与资源占用。

数据回溯场景

3–5

提高回溯速度,按实际资源调整。

无状态计算

5–10

无数据依赖冲突时可放大并行。

自依赖 vs 并发控制

自依赖(dependsOn.self)和 concurrency: 1 都能实现串行执行,但机制不同:

对比维度

自依赖 (dependsOn.self)

并发控制 (concurrency: 1)

控制层级

依赖层(DAG 调度前)

执行层(实例排队)

上游失败影响

当天实例因依赖未满足而阻塞

上游失败后当天实例正常出队执行

适用场景

数据有严格先后依赖(增量处理)

仅控制资源并发,数据无先后关系

队列管理

无队列,实例在依赖阶段等待

有显式等待队列

重试机制

Activity 级别重试

当 Activity 执行失败时,系统可自动重试。重试在 Activity 级别配置,通过 retryPolicy 字段定义:

activities:
  - name: extract_orders
    type: sql
    engineType: emr_serverless_spark
    sqlScript: extract_orders.sql
    retryPolicy:
      maxRetries: 3                        # 最大重试次数
      minRetryIntervalMillis: 60000        # 重试间隔(毫秒)
      timeoutSeconds: 3600                 # 单次执行超时(秒)
      retryOnTimeout: true                 # 超时是否触发重试

retryPolicy 字段说明

字段

类型

必填

默认值

说明

maxRetries

Integer

0

最大重试次数。0 表示不重试

minRetryIntervalMillis

Integer

60000

两次重试之间的最小间隔(毫秒)

timeoutSeconds

Integer

0

单次执行超时时间(秒)。0 表示不限制

retryOnTimeout

Boolean

false

超时后是否重试。为 true 时超时也触发重试

重试流程

Activity 执行
  → 执行成功 → SUCCEEDED
  → 执行失败(或超时)
    → 已重试次数 < maxRetries?
      → 是 → 等待 minRetryIntervalMillis → 重新执行(状态标记为 RETRYING)
      → 否 → 标记为 FAILED → 根据 dependencyConditions 决定下游行为

重试策略建议

失败类型

推荐策略

说明

网络超时 / 资源不足

重试 2–3 次,间隔 60 秒

偶发问题通常重试可恢复

SQL 语法错误

不重试 (maxRetries: 0)

重试不会修复代码错误

上游数据延迟

重试 3–5 次,间隔 300 秒

等待上游数据就绪

外部 API 限流

重试 3 次,间隔 120 秒

等待限流窗口过去

超时控制

Activity 执行超时

通过 retryPolicy.timeoutSeconds 设置单次执行的超时时间:

retryPolicy:
  timeoutSeconds: 3600         # 1 小时超时
  retryOnTimeout: true         # 超时后重试
  maxRetries: 2

超时行为:

retryOnTimeout

超时后行为

true

终止当前执行 → 触发重试(受 maxRetries 限制)

false

终止当前执行 → 直接标记为 TIMEOUT

注意

retryPolicy.timeoutSeconds强制终止,与告警中的"执行超时"不同。告警的执行超时仅发送通知而不终止任务。

Pipeline 超时

Pipeline 级别没有单独的超时配置。Pipeline 的实际执行时间取决于所有 Activity 的执行时间之和(串行部分)和最长并行路径。
如需控制 Pipeline 整体完成时间,推荐使用告警中的"未按时完成"指标(参见第 11 章),在预期时间前发出预警。

数据回溯

什么是数据回溯

数据回溯(Backfill)是指为历史业务日期重新生成 Pipeline 实例并执行,用于修复历史数据或补充处理遗漏的数据。
常见回溯场景:

  • 发现历史数据错误,修正 ETL 逻辑后重新处理历史数据。
  • 新上线的 Pipeline 需要回填历史周期的数据。
  • 上游数据延迟导致某些日期的任务失败,需要批量补跑。

回溯时间范围

通过 earliestBackfillTime 限制允许回溯的最早时间:

trigger:
  type: scheduled
  cronExpression:
    expression: "0 2 * * *"
  earliestBackfillTime: "2026-01-01T00:00:00+08:00"   # 最早可回溯到 2026-01-01

字段

说明

earliestBackfillTime

允许回溯的最早业务日期。早于此日期的实例不可生成

如果未设置 earliestBackfillTime,默认为 Pipeline 的 effectiveTime

回溯操作流程

  1. 创建一个 Pipeline,使用 Excute Pipeline 串联要回溯的 Pipeline。
  2. 通过执行参数传入要回溯的数据范围。
  3. 执行回溯。

回溯与依赖

回溯实例同样遵循依赖规则:

依赖类型

回溯行为

Activity 依赖

同实例内 Activity 按 DAG 顺序执行

Pipeline 依赖 / Checker

检查上游对应 bizDate 的实例是否成功

自依赖

回溯实例按业务日期从早到晚串行执行

关键场景——自依赖回溯:
当 Pipeline 配置了自依赖 dependsOn.self 时,回溯实例会严格按业务日期顺序串行执行:

回溯范围:06-01 ~ 06-05

06-01 实例执行 → 成功
  → 06-02 实例执行 → 成功
    → 06-03 实例执行 → 失败(阻塞后续)
      → 06-04 等待...
      → 06-05 等待...

此时需手动处理 06-03 的失败实例(修复后重跑或标记成功),后续实例才能继续。

回溯与并发

回溯的并行度受以下因素共同影响:

因素

说明

回溯并行度设置

回溯操作时指定的并行度

Pipeline concurrency

Pipeline 的最大并发数

自依赖

有自依赖时强制串行,回溯并行度无效

实际并行度 = min(回溯并行度, Pipeline concurrency)

典型回溯场景

  • 场景一:修复历史数据
    发现 ETL 逻辑错误,修复后需重新处理最近 7 天的数据:
    1. 修改 SQL 脚本并发布新版本。
    2. 发起回溯:2026-05-30 ~ 2026-06-05
    3. 回溯实例使用最新版本的 Pipeline 配置执行。
  • 场景二:新 Pipeline 补历史数据
    新上线的 Pipeline 需要生成历史 30 天的数据:
    1. 设置 earliestBackfillTime 为 30 天前。
    2. 发起回溯:整个历史范围。
    3. 设置较高的回溯并行度(如 5)加速执行。
    4. 如有自依赖,只能串行回溯。
  • 场景三:上游数据就绪后批量补跑
    上游 Pipeline 某天延迟导致下游多天失败:
    1. 确认上游数据已全部就绪。
    2. 对下游 Pipeline 发起回溯,覆盖所有失败日期。
    3. 回溯实例的 Checker 节点会检测到上游已成功,直接放行。

综合配置示例

以下示例展示了并发控制、重试和回溯相关配置的完整用法:

apiVersion: newide.studio.dataleap.volc/v1
kind: Pipeline
metadata:
  name: daily_order_etl
  displayName: 每日订单 ETL
  runAs: data_engineer
spec:
  trigger:
    type: scheduled
    cronExpression:
      expression: "0 2 * * *"
    frequency: daily
    priority: d2
    effectiveTime: "2026-01-01T00:00:00+08:00"
    earliestBackfillTime: "2026-01-01T00:00:00+08:00"  # 最早可回溯至上线日

  concurrency: 1                   # 同一时间仅运行 1 个实例

  activities:
    - name: wait_for_upstream
      type: checker
      checkerConfig:
        checks:
          - type: pipeline
            projectName: my_project
            pipelineName: raw_data_ingestion
            activityName: ""
            offsetsType: set
            offsets: ["0"]
        timeout: 7200
        checkInterval: 60

    - name: extract_orders
      type: sql
      engineType: emr_serverless_spark
      sqlScript: extract_orders.sql
      retryPolicy:
        maxRetries: 3              # 最多重试 3 次
        minRetryIntervalMillis: 60000
        timeoutSeconds: 1800       # 30 分钟超时
        retryOnTimeout: true
      dependsOn:
        activities:
          - wait_for_upstream

    - name: transform_orders
      type: sql
      engineType: emr_serverless_spark
      sqlScript: transform_orders.sql
      retryPolicy:
        maxRetries: 2
        minRetryIntervalMillis: 120000
        timeoutSeconds: 3600       # 1 小时超时
        retryOnTimeout: false
      dependsOn:
        activities:
          - extract_orders

    - name: load_to_warehouse
      type: sql
      engineType: emr_serverless_spark
      sqlScript: load_to_warehouse.sql
      retryPolicy:
        maxRetries: 2
        minRetryIntervalMillis: 60000
      dependsOn:
        activities:
          - transform_orders
        self:                      # 自依赖:等待前一天实例的此节点完成
          offsetsType: set
          offsets: ["-1"]

最佳实践

实践

说明

核心链路设 concurrency: 1

避免多实例并行写入导致数据冲突。

区分可重试与不可重试错误

仅对偶发错误(网络、资源)配置重试,逻辑错误无需重试。

重试间隔逐步增大

多次重试时适当增大 minRetryIntervalMillis,避免短时间内反复失败。

回溯前确认上游

回溯前确认所有上游依赖的历史实例均已成功。

注意自依赖对回溯的影响

有自依赖的 Pipeline 回溯只能串行,需预估总耗时。

设置 earliestBackfillTime

限制可回溯的最早时间,防止误操作产生大量历史实例。

监控回溯进度

大批量回溯时关注失败实例,及时修复以免阻塞后续实例

最近更新时间:2026.06.12 11:44:14
这个页面对您有帮助吗?
有用
有用
无用
无用