本文介绍 For Each 类型的 Activity 配置,用于在 Pipeline 中实现集合遍历。
For Each Activity 对集合中的每个元素执行一组内部 Activity,适用于需要批量处理多个同类对象的场景。
适用场景:
- name: process_regions type: for_each items: '["cn-beijing", "cn-shanghai", "cn-guangzhou", "cn-shenzhen"]' concurrency: 3 activities: - name: etl_per_region type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/region_etl.sql engineType: emr_serverless_spark engineQueue: default parameterValues: region_id: "{{items.each}}" position: x: "100" y: "50" position: x: "200" y: "100"
字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| String | 是 | 固定为 |
| String | 是 | 遍历对象,JSON 数组或动态值引用。 |
| Array | 是 | 每次迭代执行的 Activity 列表。 |
| Integer | 否 | 迭代并发数,默认为 1(串行)。 |
直接以 JSON 数组格式写入遍历元素:
items: '["cn-beijing", "cn-shanghai", "cn-guangzhou"]'
引用前序 Activity 的输出作为遍历集合:
items: "{{activities.get_regions.output.rows}}"
前序 Activity 的输出需要是 JSON 数组格式。
引用 Pipeline 参数或变量:
items: "{{pipeline.parameters.region_list}}"
activities 数组定义每次迭代要执行的 Activity 列表。内部 Activity 的配置方式与普通 Activity 相同,但额外支持 {{items.each}} 引用当前迭代元素。
activities: - name: etl_per_region type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/region_etl.sql engineType: emr_serverless_spark engineQueue: default parameterValues: region_id: "{{items.each}}" # 当前迭代元素
每次迭代可包含多个按 DAG 编排的 Activity:
- name: process_each_region type: for_each items: '["cn-beijing", "cn-shanghai"]' concurrency: 2 activities: - name: extract type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/extract_by_region.sql engineType: emr_serverless_spark engineQueue: default parameterValues: region: "{{items.each}}" position: x: "100" y: "50" - name: transform type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/transform_by_region.sql engineType: emr_serverless_spark engineQueue: default parameterValues: region: "{{items.each}}" dependsOn: activities: - extract position: x: "300" y: "50"
通过 concurrency 控制同时运行的迭代数量:
concurrency | 行为 |
|---|---|
1(默认) | 串行执行,逐个遍历。 |
N (N > 1) | 最多 N 个迭代同时运行。 |
# 最多 3 个地区并行处理 concurrency: 3
说明
当 items 数量大于 concurrency 时,多余的迭代排队等待。
For Each 节点开始 → 解析 items 得到元素列表 [e1, e2, e3, e4] → 按 concurrency 控制并行度 → 迭代 1 (e1): 执行 activities → 完成 → 迭代 2 (e2): 执行 activities → 完成 → 迭代 3 (e3): 执行 activities → 完成 → 迭代 4 (e4): 执行 activities → 完成 → 所有迭代完成 → For Each 节点 SUCCEEDED
失败行为:
建议 | 说明 |
|---|---|
控制 items 规模 | 元素数量过大时(如 1000+),考虑在 SQL 中用 UNION ALL 替代 For Each。 |
合理设置并发 | 并发数不宜过大,需考虑计算资源和目标系统的承载能力。 |
内部 Activity 幂等 | 每个迭代应可独立重跑,确保幂等性。 |
动态 items 验证 | 使用动态数组时,确保前序 Activity 输出的格式为合法 JSON 数组。 |