You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件

大数据研发治理套件

复制全文
下载 pdf
Activity (任务节点)类型
Until / Do While
复制全文
下载 pdf
Until / Do While

本文介绍 Until / Do While 类型的 Activity 配置,用于在 Pipeline 中实现循环执行逻辑。

概述

Until / Do While Activity(YAML 类型标识为 do_while)循环执行一组内部 Activity,每轮执行完毕后检查条件,条件满足则继续下一轮,条件不满足则退出循环。

  • 执行语义:Do → Check → Loop(至少执行一轮)
  • 适用场景:
    • 轮询等待某个条件满足后退出。
    • 重复尝试某个操作直到成功。
    • 迭代计算直到收敛。

配置示例

- name: retry_until_success
  type: do_while
  condition:
    left: "{{variables.retry_flag}}"
    op: EQUAL_TO
    right: "1"
  activities:
    - name: attempt_sync
      type: notebook
      source: WORKSPACE
      path: /Workspace/Users/zhang3/notebooks/sync_data.notebook
      generalComputingResourceGroupName: default_py_group
      parameterValues:
        attempt: "{{variables.attempt_count}}"
      position:
        x: "100"
        y: "50"

    - name: update_counter
      type: set_variable
      variableType: PIPELINE_INNER
      variableName: attempt_count
      value:
        calculateValue:
          left: "{{variables.attempt_count}}"
          op: PLUS
          right: "1"
      dependsOn:
        activities:
          - name: attempt_sync
      position:
        x: "300"
        y: "50"
  position:
    x: "200"
    y: "100"

字段说明

字段

类型

必填

说明

type

String

固定为 do_while

condition

Object

循环条件(结构同 If/Else 的 condition)

condition.left

String

左值(支持参数/变量引用)

condition.op

Enum

比较运算符

condition.right

String

右值

activities

Array

每次循环执行的 Activity 列表

条件运算符

与 If/Else 共享同一套运算符:

op 值

含义

EQUAL_TO

等于

NOT_EQUAL

不等于

GREATER_THAN

大于

GREATER_THAN_OR_EQUAL

大于等于

LESS_THAN

小于

LESS_THAN_OR_EQUAL

小于等于

执行流程

Do While 节点开始
  → 第 1 轮:执行 activities
    → 检查 condition
      → 条件满足 → 第 2 轮:执行 activities
        → 检查 condition
          → 条件满足 → 第 3 轮...
          → 条件不满足 → 退出循环 → SUCCEEDED
      → 条件不满足 → 退出循环 → SUCCEEDED

关键特性:

  • 至少执行一轮(Do 语义)。
  • 每轮执行完毕后才检查条件。
  • 条件不满足时退出,节点标记为 SUCCEEDED。

条件变量更新

循环体内必须有更新条件变量的逻辑,否则可能陷入死循环。推荐使用 set_variable 节点更新变量。

示例:计数器退出

# Pipeline 变量定义
variables:
  - name: attempt_count
    type: Integer
    defaultValue: "0"

# 循环体
- name: loop_with_counter
  type: do_while
  condition:
    left: "{{variables.attempt_count}}"
    op: LESS_THAN
    right: "5"                             # 最多执行 5 轮
  activities:
    - name: do_work
      type: sql
      source: WORKSPACE
      path: /Workspace/Users/zhang3/sql/work.sql
      engineType: emr_serverless_spark
      engineQueue: default
      position:
        x: "100"
        y: "50"

    - name: increment
      type: set_variable
      variableType: PIPELINE_INNER
      variableName: attempt_count
      value:
        calculateValue:
          left: "{{variables.attempt_count}}"
          op: PLUS
          right: "1"
      dependsOn:
        activities:
          - name: do_work
      position:
        x: "300"
        y: "50"

示例:标志位退出

# Pipeline 变量定义
variables:
  - name: should_continue
    type: String
    defaultValue: "1"

# 内部 Notebook 根据业务逻辑设置 should_continue 为 "0" 退出循环
- name: loop_until_done
  type: do_while
  condition:
    left: "{{variables.should_continue}}"
    op: EQUAL_TO
    right: "1"
  activities:
    - name: check_and_process
      type: notebook
      source: WORKSPACE
      path: /Workspace/Users/zhang3/notebooks/iterative_process.notebook
      generalComputingResourceGroupName: default_py_group
      position:
        x: "100"
        y: "50"

失败行为

场景

行为

某轮内部 Activity 执行失败

该轮停止,Do While 节点标记为 FAILED。

内部 Activity 配置了 retryPolicy

内部 Activity 先尝试重试,重试失败后才算该轮失败。

死循环

如果未配置超时且条件始终为 true,循环将无限执行。

使用建议

建议

说明

必须设置退出条件

循环体内务必更新条件变量,避免死循环。

配合计数器使用

设置最大循环次数作为安全兜底。

配置 retryPolicy.timeoutSeconds

在外层或内层设置超时,防止循环无限执行。

简化循环体

每轮循环的 Activity 数量不宜过多,保持逻辑清晰。

日志记录

在循环体中记录每轮的执行状态,便于排查。

最近更新时间:2026.06.12 11:44:17
这个页面对您有帮助吗?
有用
有用
无用
无用