本文介绍在 Dataleap New IDE 中创建和编辑 Pipeline 的完整流程,涵盖 DAG 画布编辑器的使用、基本信息配置、调度与依赖设置、参数与变量管理以及配置验证。
DAG 画布编辑器是 Pipeline 可视化编排的核心界面,提供拖拽式的工作流搭建体验。
操作 | 说明 |
|---|---|
添加节点 | 从左侧 Activity 类型面板中拖拽节点到画布,或右键画布选择"添加节点"。 |
建立依赖 | 从上游节点的输出锚点拖拽连线到下游节点的输入锚点。 |
删除节点 | 选中节点后按 Delete 键或右键选择"删除"。 |
删除连线 | 选中连线后按 Delete 键。 |
节点配置 | 双击或单击节点打开右侧配置面板。 |
画布缩放 | 滚轮缩放、或使用底部工具栏的缩放按钮。 |
画布平移 | 按住空格键拖拽画布。 |
全选 |
|
撤销/重做 |
|
可用的 Activity 类型:
类别 | 包含节点 |
|---|---|
计算节点 | Notebook、SQL、Jar、Copy Data |
控制节点 | If/Else、For Each、Until/Do While、Execute Pipeline |
数据感知节点 | Checker |
变量节点 | Set Variable |
创建 Pipeline 后需配置以下基本信息:
配置项 | 字段 | 说明 |
|---|---|---|
Pipeline 名称 |
| 唯一标识名,只包含字母、数字和下划线,长度不超过 127。发布后不建议修改。 |
描述 |
| Pipeline 的用途说明。 |
展示名称 |
| 用于界面展示,支持中文和特殊字符,最长 255 字符。 |
执行身份 |
| 格式为 |
调度资源组 |
| 指定调度引擎使用的资源组。 |
metadata.name 在项目内唯一,发布时系统自动维护 name 与发布态 Pipeline ID 的映射.pipeline.yml 文件中 metadata.name 相同,它们发布时会发布到同一个 Pipelinename(修改会导致发布为新的 Pipeline)spec.displayName 字段,修改展示名称Pipeline 支持三种触发方式:
触发类型 |
| 说明 |
|---|---|---|
定时调度 |
| 按 Cron 表达式定期执行 |
手动触发 |
| 仅支持手动触发 |
外部触发 |
| 由外部系统通过 API 触发 |
spec: trigger: type: scheduled cronExpression: "0 2 * * *" frequency: daily priority: d4 effectiveTime: "2026-01-01 00:00:00.000" expirationTime: "9999-12-31 23:59:59.000" earliestBackfillTime: "2026-01-01 00:00:00.000" concurrency: 1
调度频率 frequency:
值 | 说明 | 典型 Cron 表达式 |
|---|---|---|
| 分钟级调度 |
|
| 小时级调度 |
|
| 天级调度(默认) |
|
| 周级调度 |
|
| 月级调度 |
|
调度优先级 priority:
值 | 别名 | 说明 |
|---|---|---|
|
| 普通(默认) |
|
| 高优先级 |
|
| 超高优先级 |
|
| 核心任务 |
|
| 最高优先级 |
并发控制:concurrency 控制同一 Pipeline 最多允许几个实例同时运行:
值 | 含义 |
|---|---|
| 不限制并发(默认) |
| 串行执行,前一个完成后才启动下一个 |
| 最多 N 个实例并行 |
# 手动触发 spec: trigger: type: manual # 外部触发 spec: trigger: type: external
手动触发的 Pipeline 不会自动生成调度实例,仅在用户手动点击"运行"、通过 API 触发或被 execute_pipeline Activity 调用时执行。
Pipeline 级别的依赖在 spec.dependsOn 中配置:
spec: dependsOn: pipelines: - name: upstream_ods_pipeline pipelineId: 100 offsetsType: set offsets: - "0" self: enabled: true
依赖配置的详细说明请参见跨 Pipeline Checker 依赖。
在 spec.parameters 中定义 Pipeline 接收的输入参数:
spec: parameters: - name: biz_date description: "业务日期" default: "${date}" - name: run_mode description: "运行模式:full / incremental" default: "incremental"
字段 | 类型 | 说明 |
|---|---|---|
| String | 参数名,Pipeline 内唯一(字母、数字、下划线) |
| String | 参数描述 |
| String | 默认值,支持系统变量 |
在 spec.variables 中定义运行时可修改的内部变量:
spec: variables: - name: record_count description: "已处理记录数" default: "0" - name: error_flag description: "错误标记" default: "false"
字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| String | 是 | 变量名,Pipeline 内唯一 |
| String | 否 | 变量描述 |
| String | 否 | 初始值 |
参数与变量的区别:
特性 | 参数(Parameters) | 变量(Variables) |
|---|---|---|
可变性 | 只读,运行时不可修改 | 可通过 Set Variable 节点修改 |
用途 | 接收外部传入的运行时配置 | Activity 间传递中间状态 |
引用语法 |
|
|
说明
参数与变量的完整引用规则请参见参数配置规则。
Pipeline 在保存和发布前需通过配置验证,确保运行时不会出现低级错误。
验证项 | 说明 |
|---|---|
必填字段检查 |
|
DAG 合法性 | Activity 依赖关系不能形成环路。 |
名称唯一性 | 同一 Pipeline 内 Activity 名称不能重复 |
引用合法性 |
|
路径存在性 |
|
参数引用合法性 |
|
时机 | 行为 |
|---|---|
保存 | 保存时弹出警告,但不阻止保存。 |
发布 | 发布前强制通过全部验证项,不通过则阻止发布。 |
错误信息 | 原因 | 解决方式 |
|---|---|---|
循环依赖检测 | Activity A → B → C → A 形成环路 | 检查并断开环路中的某条依赖。 |
Activity 名称重复 | 两个 Activity 使用了相同的 | 修改其中一个的名称。 |
未知 Activity 引用 |
| 检查拼写或补充缺失的 Activity。 |
文件路径不存在 |
| 先创建脚本文件再配置 Activity。 |