You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件

大数据研发治理套件

复制全文
下载 pdf
参数、变量与表达式
参数试用示例
复制全文
下载 pdf
参数试用示例

本文通过典型场景演示参数系统的设计模式,每个示例包含完整的 Pipeline YAML 配置和脚本代码,可直接参考使用。

示例一:参数查找优先级

以下示例展示当同名参数出现在不同层级时,系统如何按优先级解析。
配置:

# 项目参数
project:
  parameters:
    - name: env
      default: "production"
    - name: db_host
      default: "10.0.1.100"

# Pipeline 定义
spec:
  parameters:
    - name: env
      default: "staging"
    - name: biz_date
      default: "${date}"

  activities:
    - name: my_task
      type: sql
      source: WORKSPACE
      path: /Workspace/Users/zhang3/sql/query.sql
      engineType: emr_serverless_spark
      engineQueue: default
      parameterValues:
        env: "dev"

脚本中引用的解析结果:

脚本中的引用

查找结果

来源

{{env}}

"dev"

Activity parameterValues(最高优先级)

{{biz_date}}

"${date}" 的具体值

Pipeline parameters(Activity 未定义,上溯)

{{db_host}}

"10.0.1.100"

项目参数(Pipeline 也未定义,继续上溯)

{{undefined_param}}

报错

所有层级均未找到

示例二:多环境 ETL 管道

需求: 同一套 ETL 代码需要在开发、测试、生产三个环境中运行,每个环境使用不同的数据库和存储路径。
项目参数定义(默认值为开发环境):

# project.yml
project:
  parameters:
    - name: source_database
      default: "ods_dev"
    - name: target_database
      default: "dwd_dev"
    - name: storage_base_path
      default: "tos://bucket-dev/warehouse"
    - name: code_owner
      default: "zhang3"

Pipeline 配置:

# daily_etl.pipeline.yml
apiVersion: newide.studio.dataleap.volc/v1
kind: Pipeline
metadata:
  name: daily_etl_orders
spec:
  displayName: "每日订单 ETL"
  runAs: User/100001
  schedulingResourceGroupName: default_scheduler

  parameters:
    - name: biz_date
      default: "${date}"

  activities:
    - name: extract
      type: sql
      source: WORKSPACE
      path: /Workspace/Users/{{project.parameters.code_owner}}/sql/extract.sql
      engineType: emr_serverless_spark
      engineQueue: default
      parameterValues:
        biz_date: "${date}"
        source_db: "{{project.parameters.source_database}}"

    - name: transform
      type: sql
      source: WORKSPACE
      path: /Workspace/Users/{{project.parameters.code_owner}}/sql/transform.sql
      engineType: emr_serverless_spark
      engineQueue: default
      parameterValues:
        biz_date: "${date}"
        source_db: "{{project.parameters.source_database}}"
        target_db: "{{project.parameters.target_database}}"
      dependsOn:
        activities:
          - name: extract

  trigger:
    type: scheduled
    cronExpression: "0 2 * * *"
    frequency: daily

脚本代码(保持环境无关):

-- extract.sql
-- 脚本中只使用名称引用,不关心值从哪来
INSERT OVERWRITE TABLE {{target_db}}.orders PARTITION (dt='${date}')
SELECT *
FROM {{source_db}}.raw_orders
WHERE dt = '${date}';

关键设计点:

  • 所有环境差异通过项目参数管理,脚本代码零修改。
  • path 中使用 {{project.parameters.code_owner}},使不同用户的工作空间路径自动适配。
  • 脚本中只使用 {{参数名}},通过 parameterValues 桥接到项目参数。

示例三:父子 Pipeline 参数传递

需求: 主 Pipeline 调用数据加工和质量检查两个子 Pipeline,需要向子 Pipeline 传递参数并根据执行结果决定后续流程。
主 Pipeline 配置:

# main_pipeline.pipeline.yml
apiVersion: newide.studio.dataleap.volc/v1
kind: Pipeline
metadata:
  name: main_daily_pipeline
spec:
  displayName: "主调度管道"
  runAs: User/100001
  schedulingResourceGroupName: default_scheduler

  parameters:
    - name: biz_date
      default: "${date}"
    - name: check_level
      default: "standard"

  variables:
    - name: quality_passed
      default: "false"

  activities:
    # 步骤 1:调用数据加工子 Pipeline
    - name: run_etl
      type: execute_pipeline
      pipelineName: data_etl_pipeline
      pipelineParameterValues:
        biz_date: "${date}"
        run_mode: "incremental"
      waitingSuccess: true
      bizDate: "${date}"

    # 步骤 2:调用数据质量检查子 Pipeline
    - name: run_quality_check
      type: execute_pipeline
      pipelineName: quality_check_pipeline
      pipelineParameterValues:
        biz_date: "${date}"
        check_level: "{{pipeline.parameters.check_level}}"
        source_table: "{{activities.run_etl.output.output_table}}"
      waitingSuccess: true
      bizDate: "${date}"
      dependsOn:
        activities:
          - name: run_etl

    # 步骤 3:根据质量检查结果决定后续流程
    - name: check_result
      type: if_condition
      condition:
        left: "{{activities.run_quality_check.output.check_status}}"
        op: EQUAL_TO
        right: "passed"
      dependsOn:
        activities:
          - name: run_quality_check

    # 步骤 4a:质量通过 → 发布数据
    - name: publish_data
      type: sql
      source: WORKSPACE
      path: /Workspace/Users/zhang3/sql/publish.sql
      engineType: emr_serverless_spark
      engineQueue: default
      parameterValues:
        biz_date: "${date}"
      dependsOn:
        activities:
          - name: check_result
            outcome: true

    # 步骤 4b:质量不通过 → 发送告警
    - name: send_alert
      type: python
      source: WORKSPACE
      path: /Workspace/Users/zhang3/scripts/alert.py
      computingResourceGroupName: default_python_group
      parameterValues:
        biz_date: "${date}"
        error_detail: "{{activities.run_quality_check.output.error_message}}"
      dependsOn:
        activities:
          - name: check_result
            outcome: false

  trigger:
    type: scheduled
    cronExpression: "0 2 * * *"
    frequency: daily
    priority: d3

子 Pipeline(设置输出值):

# quality_check_pipeline.pipeline.yml
apiVersion: newide.studio.dataleap.volc/v1
kind: Pipeline
metadata:
  name: quality_check_pipeline
spec:
  displayName: "数据质量检查"
  runAs: User/100001
  schedulingResourceGroupName: default_scheduler

  parameters:
    - name: biz_date
      default: "${date}"
    - name: check_level
      default: "standard"
    - name: source_table
      default: ""

  variables:
    - name: check_result
      default: "unknown"
    - name: error_msg
      default: ""

  activities:
    - name: run_checks
      type: sql
      source: WORKSPACE
      path: /Workspace/Users/zhang3/sql/quality_checks.sql
      engineType: emr_serverless_spark
      engineQueue: default
      parameterValues:
        biz_date: "${date}"
        table_name: "{{pipeline.parameters.source_table}}"

    # 将结果设为 Pipeline 输出
    - name: set_output
      type: set_variable
      variableType: PIPELINE_OUTPUT
      outputVariables:
        - name: check_status
          default: "{{variables.check_result}}"
        - name: error_message
          default: "{{variables.error_msg}}"
      dependsOn:
        activities:
          - name: run_checks

  trigger:
    type: manual

关键设计点:

  • 子 Pipeline 的 trigger.type 设为 manual,仅通过父 Pipeline 的 execute_pipeline 触发。
  • 父 Pipeline 通过 pipelineParameterValues 向子 Pipeline 传参。
  • 子 Pipeline 通过 set_variable(PIPELINE_OUTPUT)设置返回值。
  • 父 Pipeline 通过 {{activities.<execute_pipeline_name>.output.<field>}} 读取返回值。
  • waitingSuccess: true 是获取子 Pipeline 输出的前提条件。

示例四:循环处理与动态参数

需求: 从上游获取一组区域 ID,对每个区域分别执行 ETL 处理。

apiVersion: newide.studio.dataleap.volc/v1
kind: Pipeline
metadata:
  name: multi_region_etl
spec:
  displayName: "多区域 ETL 处理"
  runAs: User/100001
  schedulingResourceGroupName: default_scheduler

  activities:
    # 1. 查询需要处理的区域列表
    - name: get_regions
      type: look_up
      dataSource: las_catalog
      catalog: default
      database: dim
      table: regions
      where: "is_active=1"
      limit: 100

    # 2. 对每个区域并行执行 ETL
    - name: process_regions
      type: for_each
      items: "{{activities.get_regions.output.rows}}"
      concurrency: 5
      activities:
        - name: region_etl
          type: sql
          source: WORKSPACE
          path: /Workspace/Users/zhang3/sql/region_etl.sql
          engineType: emr_serverless_spark
          engineQueue: default
          parameterValues:
            region_id: "{{items.each}}"
            biz_date: "${date}"
      dependsOn:
        activities:
          - name: get_regions

  trigger:
    type: scheduled
    cronExpression: "0 3 * * *"
    frequency: daily

关键设计点:

  • look_up 查询结果通过 {{activities.get_regions.output.rows}} 传给 for_each
  • for_each 内部通过 {{items.each}} 引用当前迭代元素。
  • concurrency: 5 控制最多 5 个区域并行处理。

示例五:变量累加与循环重试

需求: 执行数据同步任务,失败时自动重试,并在最终输出处理结果给下游 Pipeline。

apiVersion: newide.studio.dataleap.volc/v1
kind: Pipeline
metadata:
  name: sync_with_retry
spec:
  displayName: "带重试的数据同步"
  runAs: User/100001
  schedulingResourceGroupName: default_scheduler

  variables:
    - name: retry_count
      default: "0"
    - name: sync_success
      default: "false"
    - name: total_records
      default: "0"

  activities:
    # 1. 循环重试同步任务
    - name: retry_loop
      type: do_while
      condition:
        left: "{{variables.sync_success}}"
        op: EQUAL_TO
        right: "false"
      activities:
        - name: attempt_sync
          type: python
          source: WORKSPACE
          path: /Workspace/Users/zhang3/scripts/sync.py
          computingResourceGroupName: default_python_group

        - name: increment_retry
          type: set_variable
          variableType: PIPELINE_INNER
          variableName: retry_count
          value:
            calculateValue:
              left: "{{variables.retry_count}}"
              op: PLUS
              right: "1"
          dependsOn:
            activities:
              - name: attempt_sync

    # 2. 同步成功后查询处理记录数
    - name: count_records
      type: look_up
      dataSource: las_catalog
      catalog: default
      database: dwd
      table: orders
      where: "dt=${date}"
      limit: 1
      dependsOn:
        activities:
          - name: retry_loop

    # 3. 将查询结果存入变量
    - name: update_total
      type: set_variable
      variableType: PIPELINE_INNER
      variableName: total_records
      value:
        rawValue: "{{activities.count_records.output.row_count}}"
      dependsOn:
        activities:
          - name: count_records

    # 4. 设置 Pipeline 输出供下游引用
    - name: set_output
      type: set_variable
      variableType: PIPELINE_OUTPUT
      outputVariables:
        - name: total_processed
          description: "本次总处理记录数"
          default: "{{variables.total_records}}"
        - name: retry_times
          description: "实际重试次数"
          default: "{{variables.retry_count}}"
      dependsOn:
        activities:
          - name: update_total

  trigger:
    type: scheduled
    cronExpression: "0 2 * * *"
    frequency: daily

关键设计点:

  • do_while 通过变量 sync_success 控制循环退出条件。
  • calculateValuePLUS 运算符实现计数器自增。
  • look_up 查询结果通过 set_variable 存入变量,再通过 PIPELINE_OUTPUT 传给下游。
  • 完整展示了「变量修改 → 变量引用 → 输出传递」的全链路。

示例六:精确引用在配置各处的使用

以下示例汇总展示精确引用语法在不同配置字段中的使用方式:

apiVersion: newide.studio.dataleap.volc/v1
kind: Pipeline
metadata:
  name: reference_demo
spec:
  displayName: "引用语法综合示例"
  runAs: User/100001
  schedulingResourceGroupName: default_scheduler

  parameters:
    - name: biz_date
      default: "${date}"                                          # 系统变量
    - name: upstream_result
      default: "{{pipeline(upstream_etl).output.status}}"         # 上游 Pipeline 输出

  variables:
    - name: processed_count
      default: "0"

  activities:
    # sql — parameterValues 中引用项目参数和 Pipeline 参数
    - name: extract
      type: sql
      source: WORKSPACE
      path: /Workspace/Users/{{project.parameters.code_owner}}/sql/extract.sql  # path 中引用项目参数
      engineType: emr_serverless_spark
      engineQueue: default
      parameterValues:
        source_db: "{{project.parameters.source_database}}"       # 项目参数
        biz_date: "{{pipeline.parameters.biz_date}}"              # Pipeline 参数

    # python — arguments 中引用 Pipeline 参数
    - name: transform
      type: python
      source: WORKSPACE
      path: /Workspace/Users/zhang3/scripts/transform.py
      computingResourceGroupName: default_python_group
      arguments:
        - "--biz_date"
        - "{{pipeline.parameters.biz_date}}"                      # arguments 中引用
      parameterValues:
        upstream_status: "{{pipeline.parameters.upstream_result}}" # parameterValues 中引用
      dependsOn:
        activities:
          - name: extract

    # if_condition — condition 中引用变量
    - name: check_count
      type: if_condition
      condition:
        left: "{{variables.processed_count}}"                     # 变量引用
        op: GREATER_THAN
        right: "0"
      dependsOn:
        activities:
          - name: transform

    # for_each — items 中引用上游 Activity 输出
    - name: process_items
      type: for_each
      items: "{{activities.extract.output.rows}}"                 # Activity 输出引用
      concurrency: 3
      activities:
        - name: item_task
          type: sql
          source: WORKSPACE
          path: /Workspace/Users/zhang3/sql/item.sql
          engineType: emr_serverless_spark
          engineQueue: default
          parameterValues:
            item_id: "{{items.each}}"                             # 迭代元素引用
      dependsOn:
        activities:
          - name: check_count
            outcome: true

    # execute_pipeline — pipelineParameterValues 中引用
    - name: call_sub
      type: execute_pipeline
      pipelineName: sub_pipeline
      pipelineParameterValues:
        biz_date: "${date}"                                       # 系统变量
        source_table: "{{project.parameters.source_database}}"    # 项目参数
      waitingSuccess: true
      bizDate: "${date}"
      dependsOn:
        activities:
          - name: check_count
            outcome: true

    # set_variable — rawValue / calculateValue / outputVariables 中引用
    - name: update_count
      type: set_variable
      variableType: PIPELINE_INNER
      variableName: processed_count
      value:
        calculateValue:
          left: "{{variables.processed_count}}"                   # 变量引用
          op: PLUS
          right: "{{activities.extract.output.row_count}}"        # Activity 输出引用
      dependsOn:
        activities:
          - name: process_items

    - name: set_output
      type: set_variable
      variableType: PIPELINE_OUTPUT
      outputVariables:
        - name: total
          default: "{{variables.processed_count}}"                # 变量引用
      dependsOn:
        activities:
          - name: update_count

  dependsOn:
    pipelines:
      - name: upstream_etl
        offsetsType: set
        offsets:
          - "0"

  trigger:
    type: scheduled
    cronExpression: "0 2 * * *"
    frequency: daily

参数设计最佳实践

命名规范

规范

说明

正例

反例

使用 snake_case

小写字母 + 下划线

biz_date

bizDate

语义清晰

名称应体现参数含义

source_database

src_db

添加前缀区分来源

避免不同类型参数混淆

target_region

region

统一日期命名

项目内保持一致

biz_date

dt / date / day

分层原则

原则

说明

作用域最小化

仅脚本内部使用的参数在 Activity 级传值,不要提升到 Pipeline 级。

环境配置上提

环境相关的值(数据库、路径)定义为项目参数。

业务参数中间化

运行时传入的业务参数(日期、模式)定义为 Pipeline 参数。

状态传递用变量

Activity 间传递动态状态使用 Pipeline 变量。

跨 Pipeline 用输出

需要传给下游 Pipeline 的值使用 PIPELINE_OUTPUT。

默认值设计

实践

说明

所有参数必须提供默认值

确保手动触发时可直接运行,无需额外指定。

日期参数使用系统变量

default: "${date}" 自动取当前业务日期。

环境参数引用项目参数

default: "{{project.parameters.xxx}}" 自动适配环境。

避免硬编码

可变的值不应写死在脚本或配置中。

安全实践

实践

说明

敏感信息用 Secret 类型

密码、Token、密钥必须使用 secret 类型项目参数。

不在脚本中明文写密码

通过参数传入,保持代码可安全提交到 Git。

权限最小化

Secret 参数仅 Admin 角色可创建和修改。

最近更新时间:2026.06.12 11:44:15
这个页面对您有帮助吗?
有用
有用
无用
无用