本文介绍如何通过 Checker 节点配置跨 Pipeline 依赖,实现多条 Pipeline 之间的协同调度。
在数据开发中,一条 Pipeline 的输入常常依赖另一条 Pipeline 的输出。NewIDE 提供两种跨 Pipeline 依赖方式:
方式 | 配置位置 | 说明 |
|---|---|---|
Pipeline 级别依赖 |
| 在 Activity 的 dependsOn 中声明对上游 Pipeline 的依赖。 |
Checker 节点 | 独立 Activity( | 作为 DAG 中的一个检查节点,等待上游满足条件后放行。 |
对比维度 | Pipeline 级别依赖 | Checker 节点 |
|---|---|---|
配置方式 |
| 独立的 Activity 节点 |
粒度 | 整个 Pipeline 成功 | 可检查 Pipeline 或指定 Activity。 |
业务日期偏移 | 支持 offsets | 支持 offsets |
跨项目 | 支持 | 支持 |
可视化 | 依赖关系在 DAG 图中不可见。 | 在 DAG 中显示为独立节点,直观清晰。 |
灵活性 | 较低,仅检查整体成功。 | 较高,支持混合检查多个上游。 |
说明
跨 Pipeline 依赖建议优先使用 Checker 节点。Checker 在 DAG 图中可见,状态可追踪,便于运维排查。
activities: - name: wait_for_upstream type: checker checkerConfig: checks: - type: pipeline # 检查类型 projectName: my_project # 目标项目 pipelineName: upstream_etl # 目标 Pipeline activityName: "" # 为空表示检查整个 Pipeline offsetsType: set # 偏移类型 offsets: - "0" # 偏移值 timeout: 7200 # 超时时间(秒) checkInterval: 60 # 检查间隔(秒)
字段 | 必填 | 说明 |
|---|---|---|
| 是 | 检查类型,固定为 |
| 是 | 目标 Pipeline 所在项目。 |
| 是 | 目标 Pipeline 名称。 |
| 否 | 目标 Activity 名称。为空检查整个 Pipeline;指定值则检查具体 Activity。 |
| 是 | 偏移类型: |
| 是 | 偏移值列表 |
| 否 | 等待超时时间(秒),默认 7200(2 小时)。 |
| 否 | 轮询检查间隔(秒),默认 60。 |
等待上游 Pipeline 实例整体执行成功:
checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: upstream_etl activityName: "" # 为空 → 检查整个 Pipeline offsetsType: set offsets: ["0"]
仅等待上游 Pipeline 中特定 Activity 完成,不需要等待整个 Pipeline 执行完毕。适用于上游 Pipeline 较长但当前 Pipeline 只依赖其中某个节点输出的场景:
checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: upstream_etl activityName: transform_orders # 仅检查此 Activity offsetsType: set offsets: ["0"]
偏移用于指定依赖上游的哪个业务日期的实例。偏移值基于当前实例的业务日期计算。
指定一组固定的偏移值,Checker 等待所有指定偏移的上游实例完成:
offsetsType: set offsets: - "0" # 当天实例 - "-1" # 前一天实例 - "-2" # 前两天实例
"0" 表示与当前实例业务日期相同。"-1" 表示当前业务日期前一天。示例:当前实例 bizDate = 2026-06-05
偏移值 | 等待的上游 bizDate |
|---|---|
| 2026-06-05 |
| 2026-06-04 |
| 2026-06-03 |
指定偏移的起止范围,Checker 等待范围内所有上游实例完成:
offsetsType: interval offsets: - "-6" # 起始偏移 - "0" # 结束偏移
上例表示等待当前业务日期前 6 天到当天共 7 个实例全部完成。适用于周级汇总依赖日级数据的场景。
一个 Checker 节点可同时检查多个上游 Pipeline 或 Activity,所有检查项均满足后才放行:
activities: - name: wait_for_all_upstream type: checker checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: order_etl activityName: "" offsetsType: set offsets: ["0"] - type: pipeline projectName: my_project pipelineName: user_etl activityName: "" offsetsType: set offsets: ["0"] - type: pipeline projectName: analytics_project pipelineName: traffic_etl activityName: aggregate_traffic offsetsType: set offsets: ["0"] timeout: 10800 # 3 小时超时 checkInterval: 120
上例中 Checker 同时等待 3 个上游条件:
order_etl 整体成功(同项目)。user_etl 整体成功(同项目)。traffic_etl 的 aggregate_traffic 节点完成(跨项目)。Checker 支持跨项目依赖,通过 projectName 指定目标 Pipeline 所在的项目:
checkerConfig: checks: - type: pipeline projectName: data_warehouse_project # 其他项目 pipelineName: dim_table_etl activityName: "" offsetsType: set offsets: ["0"]
跨项目依赖要求:
要求 | 说明 |
|---|---|
项目可见性 | 目标项目需对当前项目开放跨项目依赖权限。 |
权限 | 当前用户需拥有目标项目的 Viewer 及以上角色。 |
频率匹配 | 当前 Pipeline 与上游 Pipeline 的调度频率需兼容。 |
Checker 节点开始执行 → 按 checkInterval 间隔轮询 → 查询所有 checks 中上游实例的状态 → 全部成功 → Checker 标记为 SUCCEEDED → 下游继续执行 → 部分未完成 → 继续等待 → 上游失败 → Checker 标记为 FAILED → 超过 timeout → Checker 标记为 TIMEOUT
状态 | 说明 |
|---|---|
WAITING_DEPENDENCY | 初始状态,等待自身依赖 |
RUNNING | 正在轮询检查上游 |
SUCCEEDED | 所有上游检查通过 |
FAILED | 上游实例失败 |
TIMEOUT | 等待超时 |
先通过 Checker 等待所有上游就绪,再执行下游处理逻辑:
activities: - name: wait_for_upstream type: checker checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: order_etl activityName: "" offsetsType: set offsets: ["0"] - name: generate_report type: sql engineType: emr_serverless_spark sqlScript: generate_report.sql dependsOn: activities: - wait_for_upstream # 等 Checker 通过后执行
多个 Checker 分别等待不同上游,全部就绪后汇聚到同一下游节点:
activities: - name: wait_order type: checker checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: order_etl activityName: "" offsetsType: set offsets: ["0"] - name: wait_user type: checker checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: user_etl activityName: "" offsetsType: set offsets: ["0"] - name: join_and_analyze type: sql engineType: emr_serverless_spark sqlScript: join_and_analyze.sql dependsOn: activities: - wait_order - wait_user # 两个 Checker 都通过后执行
部分处理与 Checker 并行,最终汇聚:
activities: - name: prepare_config type: notebook notebookPath: prepare_config.ipynb - name: wait_for_upstream type: checker checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: upstream_etl activityName: "" offsetsType: set offsets: ["0"] - name: process_data type: sql engineType: emr_serverless_spark sqlScript: process_data.sql dependsOn: activities: - prepare_config # 配置准备完成 - wait_for_upstream # 上游数据就绪
实践 | 说明 |
|---|---|
设置合理超时 | 根据上游 Pipeline 历史耗时设置 timeout,避免无限等待。 |
调整检查间隔 | 高频任务减小 checkInterval(如 30 秒),低频任务可增大(如 300 秒)。 |
检查具体 Activity | 如果只依赖上游某个节点的输出,指定 activityName 可提前开始。 |
关注频率匹配 | 确保上下游 Pipeline 的调度频率兼容,避免找不到对应的上游实例。 |
统一项目间约定 | 跨项目依赖需提前与上游团队沟通,确认 Pipeline 名称和调度时间稳定。 |
Checker 命名规范 | 使用 |