本文通过典型场景演示参数系统的设计模式,每个示例包含完整的 Pipeline YAML 配置和脚本代码,可直接参考使用。
以下示例展示当同名参数出现在不同层级时,系统如何按优先级解析。
配置:
# 项目参数 project: parameters: - name: env default: "production" - name: db_host default: "10.0.1.100" # Pipeline 定义 spec: parameters: - name: env default: "staging" - name: biz_date default: "${date}" activities: - name: my_task type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/query.sql engineType: emr_serverless_spark engineQueue: default parameterValues: env: "dev"
脚本中引用的解析结果:
脚本中的引用 | 查找结果 | 来源 |
|---|---|---|
|
| Activity |
|
| Pipeline |
|
| 项目参数(Pipeline 也未定义,继续上溯) |
| 报错 | 所有层级均未找到 |
需求: 同一套 ETL 代码需要在开发、测试、生产三个环境中运行,每个环境使用不同的数据库和存储路径。
项目参数定义(默认值为开发环境):
# project.yml project: parameters: - name: source_database default: "ods_dev" - name: target_database default: "dwd_dev" - name: storage_base_path default: "tos://bucket-dev/warehouse" - name: code_owner default: "zhang3"
Pipeline 配置:
# daily_etl.pipeline.yml apiVersion: newide.studio.dataleap.volc/v1 kind: Pipeline metadata: name: daily_etl_orders spec: displayName: "每日订单 ETL" runAs: User/100001 schedulingResourceGroupName: default_scheduler parameters: - name: biz_date default: "${date}" activities: - name: extract type: sql source: WORKSPACE path: /Workspace/Users/{{project.parameters.code_owner}}/sql/extract.sql engineType: emr_serverless_spark engineQueue: default parameterValues: biz_date: "${date}" source_db: "{{project.parameters.source_database}}" - name: transform type: sql source: WORKSPACE path: /Workspace/Users/{{project.parameters.code_owner}}/sql/transform.sql engineType: emr_serverless_spark engineQueue: default parameterValues: biz_date: "${date}" source_db: "{{project.parameters.source_database}}" target_db: "{{project.parameters.target_database}}" dependsOn: activities: - name: extract trigger: type: scheduled cronExpression: "0 2 * * *" frequency: daily
脚本代码(保持环境无关):
-- extract.sql -- 脚本中只使用名称引用,不关心值从哪来 INSERT OVERWRITE TABLE {{target_db}}.orders PARTITION (dt='${date}') SELECT * FROM {{source_db}}.raw_orders WHERE dt = '${date}';
关键设计点:
path 中使用 {{project.parameters.code_owner}},使不同用户的工作空间路径自动适配。{{参数名}},通过 parameterValues 桥接到项目参数。需求: 主 Pipeline 调用数据加工和质量检查两个子 Pipeline,需要向子 Pipeline 传递参数并根据执行结果决定后续流程。
主 Pipeline 配置:
# main_pipeline.pipeline.yml apiVersion: newide.studio.dataleap.volc/v1 kind: Pipeline metadata: name: main_daily_pipeline spec: displayName: "主调度管道" runAs: User/100001 schedulingResourceGroupName: default_scheduler parameters: - name: biz_date default: "${date}" - name: check_level default: "standard" variables: - name: quality_passed default: "false" activities: # 步骤 1:调用数据加工子 Pipeline - name: run_etl type: execute_pipeline pipelineName: data_etl_pipeline pipelineParameterValues: biz_date: "${date}" run_mode: "incremental" waitingSuccess: true bizDate: "${date}" # 步骤 2:调用数据质量检查子 Pipeline - name: run_quality_check type: execute_pipeline pipelineName: quality_check_pipeline pipelineParameterValues: biz_date: "${date}" check_level: "{{pipeline.parameters.check_level}}" source_table: "{{activities.run_etl.output.output_table}}" waitingSuccess: true bizDate: "${date}" dependsOn: activities: - name: run_etl # 步骤 3:根据质量检查结果决定后续流程 - name: check_result type: if_condition condition: left: "{{activities.run_quality_check.output.check_status}}" op: EQUAL_TO right: "passed" dependsOn: activities: - name: run_quality_check # 步骤 4a:质量通过 → 发布数据 - name: publish_data type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/publish.sql engineType: emr_serverless_spark engineQueue: default parameterValues: biz_date: "${date}" dependsOn: activities: - name: check_result outcome: true # 步骤 4b:质量不通过 → 发送告警 - name: send_alert type: python source: WORKSPACE path: /Workspace/Users/zhang3/scripts/alert.py computingResourceGroupName: default_python_group parameterValues: biz_date: "${date}" error_detail: "{{activities.run_quality_check.output.error_message}}" dependsOn: activities: - name: check_result outcome: false trigger: type: scheduled cronExpression: "0 2 * * *" frequency: daily priority: d3
子 Pipeline(设置输出值):
# quality_check_pipeline.pipeline.yml apiVersion: newide.studio.dataleap.volc/v1 kind: Pipeline metadata: name: quality_check_pipeline spec: displayName: "数据质量检查" runAs: User/100001 schedulingResourceGroupName: default_scheduler parameters: - name: biz_date default: "${date}" - name: check_level default: "standard" - name: source_table default: "" variables: - name: check_result default: "unknown" - name: error_msg default: "" activities: - name: run_checks type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/quality_checks.sql engineType: emr_serverless_spark engineQueue: default parameterValues: biz_date: "${date}" table_name: "{{pipeline.parameters.source_table}}" # 将结果设为 Pipeline 输出 - name: set_output type: set_variable variableType: PIPELINE_OUTPUT outputVariables: - name: check_status default: "{{variables.check_result}}" - name: error_message default: "{{variables.error_msg}}" dependsOn: activities: - name: run_checks trigger: type: manual
关键设计点:
trigger.type 设为 manual,仅通过父 Pipeline 的 execute_pipeline 触发。pipelineParameterValues 向子 Pipeline 传参。set_variable(PIPELINE_OUTPUT)设置返回值。{{activities.<execute_pipeline_name>.output.<field>}} 读取返回值。waitingSuccess: true 是获取子 Pipeline 输出的前提条件。需求: 从上游获取一组区域 ID,对每个区域分别执行 ETL 处理。
apiVersion: newide.studio.dataleap.volc/v1 kind: Pipeline metadata: name: multi_region_etl spec: displayName: "多区域 ETL 处理" runAs: User/100001 schedulingResourceGroupName: default_scheduler activities: # 1. 查询需要处理的区域列表 - name: get_regions type: look_up dataSource: las_catalog catalog: default database: dim table: regions where: "is_active=1" limit: 100 # 2. 对每个区域并行执行 ETL - name: process_regions type: for_each items: "{{activities.get_regions.output.rows}}" concurrency: 5 activities: - name: region_etl type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/region_etl.sql engineType: emr_serverless_spark engineQueue: default parameterValues: region_id: "{{items.each}}" biz_date: "${date}" dependsOn: activities: - name: get_regions trigger: type: scheduled cronExpression: "0 3 * * *" frequency: daily
关键设计点:
look_up 查询结果通过 {{activities.get_regions.output.rows}} 传给 for_each。for_each 内部通过 {{items.each}} 引用当前迭代元素。concurrency: 5 控制最多 5 个区域并行处理。需求: 执行数据同步任务,失败时自动重试,并在最终输出处理结果给下游 Pipeline。
apiVersion: newide.studio.dataleap.volc/v1 kind: Pipeline metadata: name: sync_with_retry spec: displayName: "带重试的数据同步" runAs: User/100001 schedulingResourceGroupName: default_scheduler variables: - name: retry_count default: "0" - name: sync_success default: "false" - name: total_records default: "0" activities: # 1. 循环重试同步任务 - name: retry_loop type: do_while condition: left: "{{variables.sync_success}}" op: EQUAL_TO right: "false" activities: - name: attempt_sync type: python source: WORKSPACE path: /Workspace/Users/zhang3/scripts/sync.py computingResourceGroupName: default_python_group - name: increment_retry type: set_variable variableType: PIPELINE_INNER variableName: retry_count value: calculateValue: left: "{{variables.retry_count}}" op: PLUS right: "1" dependsOn: activities: - name: attempt_sync # 2. 同步成功后查询处理记录数 - name: count_records type: look_up dataSource: las_catalog catalog: default database: dwd table: orders where: "dt=${date}" limit: 1 dependsOn: activities: - name: retry_loop # 3. 将查询结果存入变量 - name: update_total type: set_variable variableType: PIPELINE_INNER variableName: total_records value: rawValue: "{{activities.count_records.output.row_count}}" dependsOn: activities: - name: count_records # 4. 设置 Pipeline 输出供下游引用 - name: set_output type: set_variable variableType: PIPELINE_OUTPUT outputVariables: - name: total_processed description: "本次总处理记录数" default: "{{variables.total_records}}" - name: retry_times description: "实际重试次数" default: "{{variables.retry_count}}" dependsOn: activities: - name: update_total trigger: type: scheduled cronExpression: "0 2 * * *" frequency: daily
关键设计点:
do_while 通过变量 sync_success 控制循环退出条件。calculateValue 的 PLUS 运算符实现计数器自增。look_up 查询结果通过 set_variable 存入变量,再通过 PIPELINE_OUTPUT 传给下游。以下示例汇总展示精确引用语法在不同配置字段中的使用方式:
apiVersion: newide.studio.dataleap.volc/v1 kind: Pipeline metadata: name: reference_demo spec: displayName: "引用语法综合示例" runAs: User/100001 schedulingResourceGroupName: default_scheduler parameters: - name: biz_date default: "${date}" # 系统变量 - name: upstream_result default: "{{pipeline(upstream_etl).output.status}}" # 上游 Pipeline 输出 variables: - name: processed_count default: "0" activities: # sql — parameterValues 中引用项目参数和 Pipeline 参数 - name: extract type: sql source: WORKSPACE path: /Workspace/Users/{{project.parameters.code_owner}}/sql/extract.sql # path 中引用项目参数 engineType: emr_serverless_spark engineQueue: default parameterValues: source_db: "{{project.parameters.source_database}}" # 项目参数 biz_date: "{{pipeline.parameters.biz_date}}" # Pipeline 参数 # python — arguments 中引用 Pipeline 参数 - name: transform type: python source: WORKSPACE path: /Workspace/Users/zhang3/scripts/transform.py computingResourceGroupName: default_python_group arguments: - "--biz_date" - "{{pipeline.parameters.biz_date}}" # arguments 中引用 parameterValues: upstream_status: "{{pipeline.parameters.upstream_result}}" # parameterValues 中引用 dependsOn: activities: - name: extract # if_condition — condition 中引用变量 - name: check_count type: if_condition condition: left: "{{variables.processed_count}}" # 变量引用 op: GREATER_THAN right: "0" dependsOn: activities: - name: transform # for_each — items 中引用上游 Activity 输出 - name: process_items type: for_each items: "{{activities.extract.output.rows}}" # Activity 输出引用 concurrency: 3 activities: - name: item_task type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/item.sql engineType: emr_serverless_spark engineQueue: default parameterValues: item_id: "{{items.each}}" # 迭代元素引用 dependsOn: activities: - name: check_count outcome: true # execute_pipeline — pipelineParameterValues 中引用 - name: call_sub type: execute_pipeline pipelineName: sub_pipeline pipelineParameterValues: biz_date: "${date}" # 系统变量 source_table: "{{project.parameters.source_database}}" # 项目参数 waitingSuccess: true bizDate: "${date}" dependsOn: activities: - name: check_count outcome: true # set_variable — rawValue / calculateValue / outputVariables 中引用 - name: update_count type: set_variable variableType: PIPELINE_INNER variableName: processed_count value: calculateValue: left: "{{variables.processed_count}}" # 变量引用 op: PLUS right: "{{activities.extract.output.row_count}}" # Activity 输出引用 dependsOn: activities: - name: process_items - name: set_output type: set_variable variableType: PIPELINE_OUTPUT outputVariables: - name: total default: "{{variables.processed_count}}" # 变量引用 dependsOn: activities: - name: update_count dependsOn: pipelines: - name: upstream_etl offsetsType: set offsets: - "0" trigger: type: scheduled cronExpression: "0 2 * * *" frequency: daily
规范 | 说明 | 正例 | 反例 |
|---|---|---|---|
使用 snake_case | 小写字母 + 下划线 |
|
|
语义清晰 | 名称应体现参数含义 |
|
|
添加前缀区分来源 | 避免不同类型参数混淆 |
|
|
统一日期命名 | 项目内保持一致 |
|
|
原则 | 说明 |
|---|---|
作用域最小化 | 仅脚本内部使用的参数在 Activity 级传值,不要提升到 Pipeline 级。 |
环境配置上提 | 环境相关的值(数据库、路径)定义为项目参数。 |
业务参数中间化 | 运行时传入的业务参数(日期、模式)定义为 Pipeline 参数。 |
状态传递用变量 | Activity 间传递动态状态使用 Pipeline 变量。 |
跨 Pipeline 用输出 | 需要传给下游 Pipeline 的值使用 PIPELINE_OUTPUT。 |
实践 | 说明 |
|---|---|
所有参数必须提供默认值 | 确保手动触发时可直接运行,无需额外指定。 |
日期参数使用系统变量 |
|
环境参数引用项目参数 |
|
避免硬编码 | 可变的值不应写死在脚本或配置中。 |
实践 | 说明 |
|---|---|
敏感信息用 Secret 类型 | 密码、Token、密钥必须使用 |
不在脚本中明文写密码 | 通过参数传入,保持代码可安全提交到 Git。 |
权限最小化 | Secret 参数仅 Admin 角色可创建和修改。 |