You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件

大数据研发治理套件

复制全文
下载 pdf
Activity (任务节点)类型
For Each
复制全文
下载 pdf
For Each

本文介绍 For Each 类型的 Activity 配置,用于在 Pipeline 中实现集合遍历。

概述

For Each Activity 对集合中的每个元素执行一组内部 Activity,适用于需要批量处理多个同类对象的场景。
适用场景:

  • 按地区/租户/品类分别执行相同的 ETL 逻辑。
  • 遍历文件列表逐个处理。
  • 动态数量的并行任务。

配置示例

- name: process_regions
  type: for_each
  items: '["cn-beijing", "cn-shanghai", "cn-guangzhou", "cn-shenzhen"]'
  concurrency: 3
  activities:
    - name: etl_per_region
      type: sql
      source: WORKSPACE
      path: /Workspace/Users/zhang3/sql/region_etl.sql
      engineType: emr_serverless_spark
      engineQueue: default
      parameterValues:
        region_id: "{{items.each}}"
      position:
        x: "100"
        y: "50"
  position:
    x: "200"
    y: "100"

字段说明

字段

类型

必填

说明

type

String

固定为 for_each

items

String

遍历对象,JSON 数组或动态值引用。

activities

Array

每次迭代执行的 Activity 列表。

concurrency

Integer

迭代并发数,默认为 1(串行)。

items 定义

静态数组

直接以 JSON 数组格式写入遍历元素:

items: '["cn-beijing", "cn-shanghai", "cn-guangzhou"]'

动态数组

引用前序 Activity 的输出作为遍历集合:

items: "{{activities.get_regions.output.rows}}"

前序 Activity 的输出需要是 JSON 数组格式。

参数引用

引用 Pipeline 参数或变量:

items: "{{pipeline.parameters.region_list}}"

内部 Activity

activities 数组定义每次迭代要执行的 Activity 列表。内部 Activity 的配置方式与普通 Activity 相同,但额外支持 {{items.each}} 引用当前迭代元素。

引用当前元素

activities:
  - name: etl_per_region
    type: sql
    source: WORKSPACE
    path: /Workspace/Users/zhang3/sql/region_etl.sql
    engineType: emr_serverless_spark
    engineQueue: default
    parameterValues:
      region_id: "{{items.each}}"          # 当前迭代元素

多步骤迭代

每次迭代可包含多个按 DAG 编排的 Activity:

- name: process_each_region
  type: for_each
  items: '["cn-beijing", "cn-shanghai"]'
  concurrency: 2
  activities:
    - name: extract
      type: sql
      source: WORKSPACE
      path: /Workspace/Users/zhang3/sql/extract_by_region.sql
      engineType: emr_serverless_spark
      engineQueue: default
      parameterValues:
        region: "{{items.each}}"
      position:
        x: "100"
        y: "50"

    - name: transform
      type: sql
      source: WORKSPACE
      path: /Workspace/Users/zhang3/sql/transform_by_region.sql
      engineType: emr_serverless_spark
      engineQueue: default
      parameterValues:
        region: "{{items.each}}"
      dependsOn:
        activities:
          - extract
      position:
        x: "300"
        y: "50"

并发控制

通过 concurrency 控制同时运行的迭代数量:

concurrency

行为

1(默认)

串行执行,逐个遍历。

N (N > 1)

最多 N 个迭代同时运行。

# 最多 3 个地区并行处理
concurrency: 3

说明

当 items 数量大于 concurrency 时,多余的迭代排队等待。

执行流程

For Each 节点开始
  → 解析 items 得到元素列表 [e1, e2, e3, e4]
    → 按 concurrency 控制并行度
      → 迭代 1 (e1): 执行 activities → 完成
      → 迭代 2 (e2): 执行 activities → 完成
      → 迭代 3 (e3): 执行 activities → 完成
      → 迭代 4 (e4): 执行 activities → 完成
  → 所有迭代完成 → For Each 节点 SUCCEEDED

失败行为:

  • 某次迭代中的 Activity 执行失败时,该迭代标记为失败。
  • 已启动的其他迭代继续执行(不会提前终止)。
  • 所有迭代完成后,如果有任意迭代失败,For Each 节点整体标记为 FAILED。

使用建议

建议

说明

控制 items 规模

元素数量过大时(如 1000+),考虑在 SQL 中用 UNION ALL 替代 For Each。

合理设置并发

并发数不宜过大,需考虑计算资源和目标系统的承载能力。

内部 Activity 幂等

每个迭代应可独立重跑,确保幂等性。

动态 items 验证

使用动态数组时,确保前序 Activity 输出的格式为合法 JSON 数组。

最近更新时间:2026.06.12 11:44:17
这个页面对您有帮助吗?
有用
有用
无用
无用