本文为您介绍 Dataleap NewIDE Pipeline 参数体系的层级结构、定义方式、引用语法、同名优先级规则以及在各类任务代码中的使用方式,帮助您正确配置和引用参数。
DataLeap NewIDE Pipeline 参数系统提供多层级的参数体系,覆盖从全局配置到单个 Activity 内部的所有传值需求。不同层级的参数具有不同的作用域、可变性和使用场景。
参数类型 | 作用域 | 读写权限 | 生命周期 | 配置入口 |
|---|---|---|---|---|
系统变量 | System | 只读 | 运行时 | 平台内置,无需配置。 |
项目参数 | Project | 只读 | 长期 | 项目设置 → 参数管理 |
Pipeline 参数 | Pipeline | 只读(触发时可覆盖) | 单次运行 | Pipeline 定义页 |
Pipeline 变量 | Pipeline | 可变 | 单次运行 | Pipeline 定义页 |
Pipeline 输出 | Pipeline | 只读 | 单次运行 | Set Variables Activity |
Activity 输出 | Activity | 只读 | 单次运行 | 平台自动捕获 |
Activity 入参 | Activity | 只读 | 单次运行 | Activity 配置面板 |
Notebook 自定义参数 | Notebook | 可变 | 单次运行 | Notebook 代码 |
所有参数均通过双大括号或 ${} 语法引用:
{{参数名}} # 名称引用(脚本代码中使用) {{命名空间.参数名}} # 精确引用(Pipeline 配置中使用) ${系统变量名} # 系统变量引用
注意
双大括号内的内容作为字面量处理,不支持表达式运算。当多个类型的参数具有相同名称时,平台按照优先级规则决定最终生效的值。
系统变量由平台在运行时自动注入,无需定义,直接引用即可。
变量名 | 格式 | 说明 |
|---|---|---|
| yyyyMMdd | 当前业务日期(调度实例创建时确定) |
| HH | 当前业务小时 |
引用语法:
${date} ${hour}
典型用途:
default: "${date}")。项目参数在项目级别统一配置,所有 Pipeline 均可引用。适合存放与环境相关的公共配置,支持为不同环境(如 dev、staging、prod)设置不同的值。
配置入口:项目设置 → 参数管理,或通过 project.yml 文件定义。
引用语法:
{{project.参数名}}
注意
路径类型的输入框(Path 字段)仅支持引用项目参数,不能引用 Pipeline 参数或变量——路径引用在发布/调度阶段即需要解析,此时 Pipeline 参数可能尚未赋值。
定义示例:
project: name: dmp-data-warehouse parameters: - name: source_database type: string default: "ods_production" description: "源数据库名称" - name: storage_base_path type: string default: "tos://dmp-bucket/warehouse" description: "数据存储根路径" - name: db_password type: secret description: "数据库密码(加密存储)"
参数类型:
类型 | 说明 |
|---|---|
| 字符串(默认类型) |
| 整数 |
| 布尔值 |
| 敏感信息,加密存储,运行时才解密注入。不会以明文出现在 Git 仓库中,仅 Admin 角色可创建和修改 |
环境差异化(Bundle):
项目参数支持通过 Bundle 按环境覆盖,实现同一套代码在不同环境下运行:
targets: dev: variables: source_database: "ods_dev" storage_base_path: "tos://dmp-dev/warehouse" prod: variables: source_database: "ods_production" storage_base_path: "tos://dmp-prod/warehouse"
典型用途:
{{project.prod_db}}{{project.hdfs_root}}{{project.jdbc_url}}Pipeline 参数在 Pipeline 定义页设置默认值,每次触发时可以传入覆盖值。运行期间参数值不可修改。
引用语法:
{{pipeline.params.参数名}} # 简写(当无同名冲突时可使用) {{参数名}}
YAML 定义示例:
parameters: - name: biz_date type: String defaultValue: "${date}" description: "业务日期,默认取调度日期" - name: target_region type: String defaultValue: "cn-north" description: "目标区域" - name: run_mode type: String defaultValue: "incremental" description: "运行模式:full 全量 / incremental 增量"
触发时传入参数:
通过 API 触发时,在请求体中指定参数值:
{ "pipeline_id": "your_pipeline_id", "params": { "biz_date": "20260601", "region": "cn-north" } }
通过 Execute Pipeline Activity 调用子 Pipeline 时,使用 pipelineParameterValues 传入参数:
- name: run_sub_pipeline type: execute_pipeline pipelineName: daily_order_summary pipelineParameterValues: biz_date: "${date}" region: "{{pipeline.parameters.target_region}}" waitingSuccess: true bizDate: "${date}"
传值优先级(从高到低):
典型用途:
biz_date、ptstart_date、end_dateregion、biz_typePipeline 变量在 Pipeline 定义页声明初始值,运行过程中可通过 Set Variables Activity 动态修改。与 Pipeline 参数不同,变量的值在 Pipeline 执行期间是可变的。
引用语法:
{{pipeline.vars.变量名}} # 简写(当无同名冲突时可使用) {{变量名}}
YAML 定义示例:
variables: - name: check_status type: String defaultValue: "unknown" - name: retry_count type: String defaultValue: "0" - name: record_count type: String defaultValue: "0"
Pipeline 参数 vs 变量对比
对比维度 | Pipeline 参数 | Pipeline 变量 |
|---|---|---|
可变性 | 运行时只读 | 运行时可修改 |
定义位置 |
|
|
值来源 | 外部传入或默认值 | 初始值 + 运行时修改 |
修改变量值
Pipeline 变量只能通过 set_variable Activity 修改:
直接赋值:rawValue
计算赋值:calculateValue(支持 PLUS / MINUS / CONCAT)
引用上游输出:{{activities.xxx.output}}
修改变量的 YAML 示例:
- name: update_status type: set_variable variableType: PIPELINE_INNER variableName: check_status value: rawValue: "{{activities.check_result.output}}"
- name: increment_counter type: set_variable variableType: PIPELINE_INNER variableName: retry_count value: calculateValue: left: "{{variables.retry_count}}" op: PLUS right: "1"
支持的运算符:
运算符 | 含义 | 说明 |
|---|---|---|
| 加法 | 变量值需可转为数值类型。 |
| 减法 | 变量值需可转为数值类型。 |
| 字符串连接 | 将左右值拼接为新字符串。 |
典型用途:
Pipeline 输出通过 Set Variables Activity 定义,将 Pipeline 内部的变量或 Activity 出参映射为可对外暴露的输出值。当该 Pipeline 被其他 Pipeline 作为子 Pipeline 调用时,父 Pipeline 可通过引用语法获取其输出。
引用语法(在父 Pipeline 中引用子 Pipeline 的输出):
{{pipeline(子Pipeline名称).output.参数名}}
设置方法:
- name: set_output type: set_variable variableType: PIPELINE_OUTPUT outputVariables: - name: total_records description: "本次处理的总记录数" default: "{{variables.record_count}}" - name: process_status description: "处理状态" default: "{{variables.check_status}}"
典型用途:
Activity 执行完成后,平台从执行日志中捕获输出值。同一 Pipeline 内的下游节点可以引用这些值。
引用语法:
# 引用 Python Activity 输出 {{activity(Activity名称).output}} # 引用 Notebook 输出(取第一个 cell 的全部执行输出) {{activity(Activity名称).output.cells["0"]}} # Notebook cell 输出是 Python/Shell 时,支持 JSON JPath 解析 {{activity(Activity名称).output.cells["0"].print_json_key}} # 简写格式(Pipeline YAML 配置中) {{activities.Activity名称.output}} {{activities.Activity名称.output.字段名}}
注意
Activity 输出仅在该节点成功执行后才可被下游引用。并联(并发执行)的节点之间不能互相引用输出。
典型用途:
Activity 入参在 Activity 配置面板中声明,将「写代码」和「Pipeline 配置」两个过程解耦。代码内通过占位符引用参数名,具体的值在 Activity 配置中绑定来源。
引用语法(在 Activity 代码内部):
{{参数名}}
YAML 配置示例:
activities: - name: transform_orders type: notebook source: WORKSPACE path: /Workspace/Users/zhang3/notebooks/transform.notebook generalComputingResourceGroupName: default_py_group parameterValues: biz_date: "${date}" source_db: "{{project.parameters.source_database}}" threshold: "{{pipeline.parameters.run_mode}}" upstream_count: "{{activities.extract_data.output.row_count}}"
工作原理:
{{参数名}} 占位符。Notebook 自定义参数在 Notebook 代码中定义,主要用于 Notebook 之间互相传参,以及 Notebook 向下游 Activity 输出结果。
接收参数:
# 通过 dbutils.widgets.get() 接收 Pipeline 传入的参数 biz_date = dbutils.widgets.get("biz_date") print(f"Processing date: {biz_date}")
输出结果:
# 通过 dbutils.notebook.exit() 输出返回值 # 供下游 Activity 通过 {{activities.<name>.output}} 引用 dbutils.notebook.exit("success")
引用语法(在父 Notebook 或 Pipeline 中):
{{activity(Notebook名称).output.[0]}}
当多个类型的参数在 {{}} 中使用相同的名称时,平台视为同名参数,按以下优先级决定最终生效的值。优先级数字越小,越优先生效。例如,若项目参数和 Pipeline 参数均命名为 db_name,引用 {{db_name}} 时,Pipeline 参数(优先级 3)的值将覆盖项目参数(优先级 6)的值。
优先级 | 参数类型 | 作用域 |
|---|---|---|
0(最高) | Notebook 自定义参数 | Notebook |
1 | Activity 入参 | Activity |
2 | Pipeline 变量 | Pipeline |
3 | Pipeline 参数 | Pipeline |
4 | Activity 输出 | Activity |
5 | Pipeline 输出 | Pipeline |
6 | 项目参数 | Project |
7(最低) | 系统变量 | System |
说明
避免在不同参数类型中使用相同的名称,以减少依赖优先级规则推断带来的歧义,提升 Pipeline 可读性和可维护性。当必须区分时,使用完整命名空间写法(如 {{pipeline.params.biz_date}})。
Pipeline YAML 配置中使用精确引用格式,明确指定参数/变量/输出值的来源层级:
引用语法 | 说明 |
|---|---|
| 引用项目参数 |
| 引用当前 Pipeline 的参数 |
| 引用当前 Pipeline 的变量 |
| 引用上游 Activity 的输出 |
| 引用上游依赖 Pipeline 的输出 |
| for_each 中引用当前迭代元素 |
脚本代码(SQL / Python / Shell)中使用名称引用格式,只需给出参数名,系统按照 Activity → Pipeline → Project 的优先级自动查找:
SELECT * FROM {{source_database}}.orders WHERE dt = '${date}' AND region = '{{target_region}}';
注意
脚本代码中不能使用精确路径引用(如 {{pipeline.parameters.xxx}})。脚本可独立调试运行,不应耦合到具体的 Pipeline 结构。
Activity 类型 | 支持引用的字段 |
|---|---|
sql |
|
python / shell |
|
notebook |
|
jar |
|
if_condition / do_while |
|
for_each |
|
execute_pipeline |
|
set_variable |
|
引用类型 | 解析时机 | 说明 |
|---|---|---|
项目参数 | 发布/运行时 | 发布到目标环境时确定值。 |
Pipeline 参数 | 实例生成时 | 触发 Pipeline 时传入或使用默认值。 |
Pipeline 变量 | 运行时 | Activity 执行过程中动态读取。 |
Activity 输出 | 运行时 | 上游 Activity 完成后才产生值。 |
系统变量 | 实例生成时 | 调度实例创建时确定业务日期。 |
| 运行时 | for_each 每次迭代时替换。 |
-- 引用项目参数(动态数据库名)+ Pipeline 参数(业务日期和地区) INSERT INTO TABLE {{project.prod_db}}.target_table SELECT * FROM source_table WHERE dt = '{{params.biz_date}}' AND region = '{{params.region}}'
-- 校验写入行数是否符合预期 SELECT CASE WHEN COUNT(*) = {{activity(validate_node).output[0].expected_rows}} THEN 'PASS' ELSE 'FAIL' END AS check_result FROM target_table WHERE dt = '{{params.biz_date}}'
方式一:字符串替换(当前支持)
平台在提交代码前,将 {{}} 占位符替换为实际参数值:
# 平台在提交前完成字符串替换 dt = '{{biz_date}}' db = '{{project.prod_db}}'
方式二:通过 platform_sdk 引用(后续支持)
from platform_sdk import context # 获取 Pipeline 参数 date = context.params.get("biz_date") region = context.params.get("region") # 获取上游 Activity 的输出 prev_count = context.activity("sql_node_1").output("rows_count") # 获取项目参数 prod_db = context.project.get("prod_db") # 设置当前 Activity 出参,供下游节点引用 context.set_output("processed_count", len(result)) context.set_output("status", "success")
# Cell 1 - 接收参数 biz_date = dbutils.widgets.get("biz_date") source_db = dbutils.widgets.get("source_db") # Cell 2 - 使用参数进行数据处理 from pyspark.sql import functions as F df = spark.table(f"{source_db}.ods_orders").filter(F.col("dt") == biz_date) result_count = df.count() # Cell 3 - 输出结果供下游引用 dbutils.notebook.exit(str(result_count))
路径类型字段仅支持项目参数:
activities: - name: run_etl type: notebook source: WORKSPACE path: /Workspace/Users/{{project.parameters.code_owner}}/notebooks/etl.notebook generalComputingResourceGroupName: default_py_group
# 引用子 Pipeline "etl_daily" 的输出参数 row_count {{pipeline(etl_daily).output.row_count}} # 引用子 Pipeline "data_validate" 的输出参数 status {{pipeline(data_validate).output.status}}
参数层级设计的核心原则是脚本与 Pipeline 配置解耦:
使用场景 | 推荐类型 |
|---|---|
数据库连接、存储路径等全局配置 | 项目参数 |
每次运行时传入的业务参数(如日期) | Pipeline 参数 |
运行过程中动态更新的状态值 | Pipeline 变量 |
上游节点的执行结果 | Activity 输出 |
跨 Pipeline 传递数据 | Pipeline 输出 |
敏感信息(密钥、Token) | 项目参数(secret 类型) |
biz_date、hdfs_root(避免驼峰或连字符)。business_date 而非 bd。proj_、变量加 var_。{{参数名}} 可能匹配多个类型时,使用完整命名空间写法确保引用预期参数。错误现象 | 可能原因 | 解决方案 |
|---|---|---|
参数值为空字符串。 | 参数已定义但默认值为空。 | 检查参数定义的 default 字段。 |
运行时报「参数未找到」。 | 所有层级均无该参数定义。 | 在 Activity / Pipeline / 项目中补充参数定义。 |
引用表达式未被替换。 | 脚本中使用了精确引用语法。 | 脚本代码中改用名称引用 |
path 引用失效。 | path 中使用了 Pipeline 参数 | path 仅支持项目参数引用。 |
跨 Pipeline 输出为空。 | 上游未设置 PIPELINE_OUTPUT。 | 在上游添加 set_variable(PIPELINE_OUTPUT)Activity。 |
同名参数取值不符预期。 | 多层级存在同名参数。 | 使用完整命名空间或重命名避免冲突。 |
变量值未更新。 | 未使用 set_variable 修改。 | Pipeline 变量只能通过 set_variable Activity 修改。 |