本文介绍 Until / Do While 类型的 Activity 配置,用于在 Pipeline 中实现循环执行逻辑。
Until / Do While Activity(YAML 类型标识为 do_while)循环执行一组内部 Activity,每轮执行完毕后检查条件,条件满足则继续下一轮,条件不满足则退出循环。
- name: retry_until_success type: do_while condition: left: "{{variables.retry_flag}}" op: EQUAL_TO right: "1" activities: - name: attempt_sync type: notebook source: WORKSPACE path: /Workspace/Users/zhang3/notebooks/sync_data.notebook generalComputingResourceGroupName: default_py_group parameterValues: attempt: "{{variables.attempt_count}}" position: x: "100" y: "50" - name: update_counter type: set_variable variableType: PIPELINE_INNER variableName: attempt_count value: calculateValue: left: "{{variables.attempt_count}}" op: PLUS right: "1" dependsOn: activities: - name: attempt_sync position: x: "300" y: "50" position: x: "200" y: "100"
字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| String | 是 | 固定为 |
| Object | 是 | 循环条件(结构同 If/Else 的 condition) |
| String | 是 | 左值(支持参数/变量引用) |
| Enum | 是 | 比较运算符 |
| String | 是 | 右值 |
| Array | 是 | 每次循环执行的 Activity 列表 |
与 If/Else 共享同一套运算符:
op 值 | 含义 |
|---|---|
| 等于 |
| 不等于 |
| 大于 |
| 大于等于 |
| 小于 |
| 小于等于 |
Do While 节点开始 → 第 1 轮:执行 activities → 检查 condition → 条件满足 → 第 2 轮:执行 activities → 检查 condition → 条件满足 → 第 3 轮... → 条件不满足 → 退出循环 → SUCCEEDED → 条件不满足 → 退出循环 → SUCCEEDED
关键特性:
循环体内必须有更新条件变量的逻辑,否则可能陷入死循环。推荐使用 set_variable 节点更新变量。
# Pipeline 变量定义 variables: - name: attempt_count type: Integer defaultValue: "0" # 循环体 - name: loop_with_counter type: do_while condition: left: "{{variables.attempt_count}}" op: LESS_THAN right: "5" # 最多执行 5 轮 activities: - name: do_work type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/work.sql engineType: emr_serverless_spark engineQueue: default position: x: "100" y: "50" - name: increment type: set_variable variableType: PIPELINE_INNER variableName: attempt_count value: calculateValue: left: "{{variables.attempt_count}}" op: PLUS right: "1" dependsOn: activities: - name: do_work position: x: "300" y: "50"
# Pipeline 变量定义 variables: - name: should_continue type: String defaultValue: "1" # 内部 Notebook 根据业务逻辑设置 should_continue 为 "0" 退出循环 - name: loop_until_done type: do_while condition: left: "{{variables.should_continue}}" op: EQUAL_TO right: "1" activities: - name: check_and_process type: notebook source: WORKSPACE path: /Workspace/Users/zhang3/notebooks/iterative_process.notebook generalComputingResourceGroupName: default_py_group position: x: "100" y: "50"
场景 | 行为 |
|---|---|
某轮内部 Activity 执行失败 | 该轮停止,Do While 节点标记为 FAILED。 |
内部 Activity 配置了 retryPolicy | 内部 Activity 先尝试重试,重试失败后才算该轮失败。 |
死循环 | 如果未配置超时且条件始终为 true,循环将无限执行。 |
建议 | 说明 |
|---|---|
必须设置退出条件 | 循环体内务必更新条件变量,避免死循环。 |
配合计数器使用 | 设置最大循环次数作为安全兜底。 |
配置 retryPolicy.timeoutSeconds | 在外层或内层设置超时,防止循环无限执行。 |
简化循环体 | 每轮循环的 Activity 数量不宜过多,保持逻辑清晰。 |
日志记录 | 在循环体中记录每轮的执行状态,便于排查。 |