本文介绍 Checker 类型的 Activity 配置,用于在 Pipeline 中感知外部数据或上游 Pipeline 的就绪状态。
Checker Activity(YAML 类型标识为 checker)通过轮询检测外部条件是否满足,满足后放行下游 Activity 继续执行。Checker 是实现跨 Pipeline 依赖的核心节点类型。
适用场景:
说明
关于 Checker 在跨 Pipeline 依赖中的详细编排方式,请参见跨 Pipeline Checker 依赖。
- name: wait_for_order_etl type: checker checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: daily_order_etl activityName: "" offsetsType: set offsets: - "0" timeout: 7200 checkInterval: 60 position: x: "200" y: "100"
- name: wait_for_transform type: checker checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: daily_order_etl activityName: transform_orders # 仅检查此 Activity offsetsType: set offsets: - "0" timeout: 7200 checkInterval: 60 position: x: "200" y: "100"
字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| String | 是 | 固定为 |
| Object | 是 | 检查配置 |
| Array | 是 | 检查项列表(可配置多个) |
| Integer | 否 | 等待超时时间(秒),默认 7200 |
| Integer | 否 | 轮询检查间隔(秒),默认 60 |
字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| String | 是 | 检查类型,固定为 |
| String | 是 | 目标 Pipeline 所在项目 |
| String | 是 | 目标 Pipeline 名称 |
| String | 否 | 目标 Activity 名称。为空检查整个 Pipeline,指定值则检查具体 Activity。 |
| Enum | 是 | 偏移类型: |
| Array | 是 | 偏移值列表 |
偏移用于指定依赖上游的哪个业务日期的实例。
等待指定偏移的所有上游实例完成:
offsetsType: set offsets: - "0" # 同一业务日期 - "-1" # 前一天
偏移值 | 含义(以 bizDate=2026-06-05 为例) |
|---|---|
| 等待 2026-06-05 的上游实例 |
| 等待 2026-06-04 的上游实例 |
| 等待 2026-06-03 的上游实例 |
等待偏移范围内所有上游实例完成:
offsetsType: interval offsets: - "-6" # 起始偏移 - "0" # 结束偏移
上例表示等待当前业务日期前 6 天到当天共 7 个实例全部成功。
一个 Checker 可同时检查多个上游,所有检查项均通过后才放行:
checkerConfig: checks: - type: pipeline projectName: my_project pipelineName: order_etl activityName: "" offsetsType: set offsets: ["0"] - type: pipeline projectName: my_project pipelineName: user_etl activityName: "" offsetsType: set offsets: ["0"] - type: pipeline projectName: analytics_project # 跨项目 pipelineName: traffic_etl activityName: aggregate_traffic offsetsType: set offsets: ["0"] timeout: 10800 checkInterval: 120
除了检查上游 Pipeline 状态外,NewIDE 还支持通过 Sensor 类型节点感知外部数据就绪状态。
检测 LAS Catalog 中的表分区是否就绪:
- name: wait_partition_ready type: sensor sensorType: las_catalog dataSource: las_catalog catalog: default database: ods table: orders partitions: - key: dt value: "${date}" checkIntervalInSeconds: 300 timeoutSeconds: 7200 position: x: "200" y: "100"
检测 TOS(对象存储)中的文件或路径是否存在:
- name: wait_file_ready type: sensor sensorType: tos bucket: data-bucket path: "/data/input/${date}/_SUCCESS" checkIntervalInSeconds: 300 timeoutSeconds: 7200 position: x: "200" y: "100"
字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| String | 是 | 固定为 |
| Enum | 是 | 感知类型: |
| String | 条件必填 | 数据源标识( |
| String | 否 | 数据目录 |
| String | 条件必填 | 数据库名( |
| String | 条件必填 | 表名( |
| Array | 否 | 分区条件列表(key-value 格式) |
| String | 条件必填 | TOS 桶名称( |
| String | 条件必填 | TOS 文件路径( |
| Integer | 否 | 轮询间隔(秒),默认 300 |
| Integer | 否 | 超时时间(秒) |
Checker / Sensor 节点开始 → 按 checkInterval 间隔轮询 → 检查所有条件是否满足 → 全部满足 → SUCCEEDED → 下游继续执行 → 部分未满足 → 继续等待 → 上游失败 → FAILED → 超过 timeout → TIMEOUT
建议 | 说明 |
|---|---|
设置合理超时 | 根据上游历史耗时设置 timeout,避免无限等待。 |
调整检查间隔 | 高频任务缩短间隔(30s),低频任务加大间隔(300s)。 |
检查具体 Activity | 如果只依赖上游某个节点输出,指定 activityName 可提前开始。 |
命名规范 | 使用 |
Sensor 替代轮询 | 用 Sensor 感知数据就绪,比固定延迟等待更精确高效。 |