You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件

大数据研发治理套件

复制全文
下载 pdf
调度与依赖
跨 Pipeline Checker 依赖
复制全文
下载 pdf
跨 Pipeline Checker 依赖

本文介绍如何通过 Checker 节点配置跨 Pipeline 依赖,实现多条 Pipeline 之间的协同调度。

跨 Pipeline 依赖概述

在数据开发中,一条 Pipeline 的输入常常依赖另一条 Pipeline 的输出。NewIDE 提供两种跨 Pipeline 依赖方式:

方式

配置位置

说明

Pipeline 级别依赖

dependsOn.pipelines

在 Activity 的 dependsOn 中声明对上游 Pipeline 的依赖。

Checker 节点

独立 Activity(type: checker

作为 DAG 中的一个检查节点,等待上游满足条件后放行。

两种方式的对比

对比维度

Pipeline 级别依赖

Checker 节点

配置方式

dependsOn.pipelines 字段

独立的 Activity 节点

粒度

整个 Pipeline 成功

可检查 Pipeline 或指定 Activity。

业务日期偏移

支持 offsets

支持 offsets

跨项目

支持

支持

可视化

依赖关系在 DAG 图中不可见。

在 DAG 中显示为独立节点,直观清晰。

灵活性

较低,仅检查整体成功。

较高,支持混合检查多个上游。

说明

跨 Pipeline 依赖建议优先使用 Checker 节点。Checker 在 DAG 图中可见,状态可追踪,便于运维排查。

Checker 节点配置

基本结构

activities:
  - name: wait_for_upstream
    type: checker
    checkerConfig:
      checks:
        - type: pipeline            # 检查类型
          projectName: my_project   # 目标项目
          pipelineName: upstream_etl   # 目标 Pipeline
          activityName: ""          # 为空表示检查整个 Pipeline
          offsetsType: set          # 偏移类型
          offsets:
            - "0"                   # 偏移值
      timeout: 7200                 # 超时时间(秒)
      checkInterval: 60            # 检查间隔(秒)

配置字段说明

字段

必填

说明

type

检查类型,固定为 pipeline

projectName

目标 Pipeline 所在项目。

pipelineName

目标 Pipeline 名称。

activityName

目标 Activity 名称。为空检查整个 Pipeline;指定值则检查具体 Activity。

offsetsType

偏移类型:set(固定偏移列表)或 interval(区间偏移)。

offsets

偏移值列表

timeout

等待超时时间(秒),默认 7200(2 小时)。

checkInterval

轮询检查间隔(秒),默认 60。

检查粒度

检查整个 Pipeline

等待上游 Pipeline 实例整体执行成功:

checkerConfig:
  checks:
    - type: pipeline
      projectName: my_project
      pipelineName: upstream_etl
      activityName: ""             # 为空 → 检查整个 Pipeline
      offsetsType: set
      offsets: ["0"]

检查指定 Activity

仅等待上游 Pipeline 中特定 Activity 完成,不需要等待整个 Pipeline 执行完毕。适用于上游 Pipeline 较长但当前 Pipeline 只依赖其中某个节点输出的场景:

checkerConfig:
  checks:
    - type: pipeline
      projectName: my_project
      pipelineName: upstream_etl
      activityName: transform_orders   # 仅检查此 Activity
      offsetsType: set
      offsets: ["0"]

偏移配置 (offsets)

偏移用于指定依赖上游的哪个业务日期的实例。偏移值基于当前实例的业务日期计算。

固定偏移 (set)

指定一组固定的偏移值,Checker 等待所有指定偏移的上游实例完成:

offsetsType: set
offsets:
  - "0"        # 当天实例
  - "-1"       # 前一天实例
  - "-2"       # 前两天实例
  • 偏移 "0" 表示与当前实例业务日期相同。
  • 偏移 "-1" 表示当前业务日期前一天。
  • 多个偏移值时,所有指定实例均需成功才放行。

示例:当前实例 bizDate = 2026-06-05

偏移值

等待的上游 bizDate

"0"

2026-06-05

"-1"

2026-06-04

"-2"

2026-06-03

区间偏移 (interval)

指定偏移的起止范围,Checker 等待范围内所有上游实例完成:

offsetsType: interval
offsets:
  - "-6"       # 起始偏移
  - "0"        # 结束偏移

上例表示等待当前业务日期前 6 天到当天共 7 个实例全部完成。适用于周级汇总依赖日级数据的场景。

混合检查

一个 Checker 节点可同时检查多个上游 Pipeline 或 Activity,所有检查项均满足后才放行:

activities:
  - name: wait_for_all_upstream
    type: checker
    checkerConfig:
      checks:
        - type: pipeline
          projectName: my_project
          pipelineName: order_etl
          activityName: ""
          offsetsType: set
          offsets: ["0"]
        - type: pipeline
          projectName: my_project
          pipelineName: user_etl
          activityName: ""
          offsetsType: set
          offsets: ["0"]
        - type: pipeline
          projectName: analytics_project
          pipelineName: traffic_etl
          activityName: aggregate_traffic
          offsetsType: set
          offsets: ["0"]
      timeout: 10800               # 3 小时超时
      checkInterval: 120

上例中 Checker 同时等待 3 个上游条件:

  1. order_etl 整体成功(同项目)。
  2. user_etl 整体成功(同项目)。
  3. traffic_etlaggregate_traffic 节点完成(跨项目)。

跨项目依赖

Checker 支持跨项目依赖,通过 projectName 指定目标 Pipeline 所在的项目:

checkerConfig:
  checks:
    - type: pipeline
      projectName: data_warehouse_project    # 其他项目
      pipelineName: dim_table_etl
      activityName: ""
      offsetsType: set
      offsets: ["0"]

跨项目依赖要求:

要求

说明

项目可见性

目标项目需对当前项目开放跨项目依赖权限。

权限

当前用户需拥有目标项目的 Viewer 及以上角色。

频率匹配

当前 Pipeline 与上游 Pipeline 的调度频率需兼容。

Checker 运行机制

执行流程

Checker 节点开始执行
  → 按 checkInterval 间隔轮询
    → 查询所有 checks 中上游实例的状态
      → 全部成功 → Checker 标记为 SUCCEEDED → 下游继续执行
      → 部分未完成 → 继续等待
      → 上游失败 → Checker 标记为 FAILED
      → 超过 timeout → Checker 标记为 TIMEOUT

Checker 状态

状态

说明

WAITING_DEPENDENCY

初始状态,等待自身依赖

RUNNING

正在轮询检查上游

SUCCEEDED

所有上游检查通过

FAILED

上游实例失败

TIMEOUT

等待超时

编排模式

模式一:先等后做

先通过 Checker 等待所有上游就绪,再执行下游处理逻辑:

activities:
  - name: wait_for_upstream
    type: checker
    checkerConfig:
      checks:
        - type: pipeline
          projectName: my_project
          pipelineName: order_etl
          activityName: ""
          offsetsType: set
          offsets: ["0"]

  - name: generate_report
    type: sql
    engineType: emr_serverless_spark
    sqlScript: generate_report.sql
    dependsOn:
      activities:
        - wait_for_upstream        # 等 Checker 通过后执行

模式二:多路汇聚

多个 Checker 分别等待不同上游,全部就绪后汇聚到同一下游节点:

activities:
  - name: wait_order
    type: checker
    checkerConfig:
      checks:
        - type: pipeline
          projectName: my_project
          pipelineName: order_etl
          activityName: ""
          offsetsType: set
          offsets: ["0"]

  - name: wait_user
    type: checker
    checkerConfig:
      checks:
        - type: pipeline
          projectName: my_project
          pipelineName: user_etl
          activityName: ""
          offsetsType: set
          offsets: ["0"]

  - name: join_and_analyze
    type: sql
    engineType: emr_serverless_spark
    sqlScript: join_and_analyze.sql
    dependsOn:
      activities:
        - wait_order
        - wait_user               # 两个 Checker 都通过后执行

模式三:并行处理 + 等待

部分处理与 Checker 并行,最终汇聚:

activities:
  - name: prepare_config
    type: notebook
    notebookPath: prepare_config.ipynb

  - name: wait_for_upstream
    type: checker
    checkerConfig:
      checks:
        - type: pipeline
          projectName: my_project
          pipelineName: upstream_etl
          activityName: ""
          offsetsType: set
          offsets: ["0"]

  - name: process_data
    type: sql
    engineType: emr_serverless_spark
    sqlScript: process_data.sql
    dependsOn:
      activities:
        - prepare_config           # 配置准备完成
        - wait_for_upstream        # 上游数据就绪

最佳实践

实践

说明

设置合理超时

根据上游 Pipeline 历史耗时设置 timeout,避免无限等待。

调整检查间隔

高频任务减小 checkInterval(如 30 秒),低频任务可增大(如 300 秒)。

检查具体 Activity

如果只依赖上游某个节点的输出,指定 activityName 可提前开始。

关注频率匹配

确保上下游 Pipeline 的调度频率兼容,避免找不到对应的上游实例。

统一项目间约定

跨项目依赖需提前与上游团队沟通,确认 Pipeline 名称和调度时间稳定。

Checker 命名规范

使用 wait_for_<上游名称> 格式,便于 DAG 阅读和运维排查。

最近更新时间:2026.06.12 11:44:16
这个页面对您有帮助吗?
有用
有用
无用
无用