本文介绍 Pipeline 中与计算资源、引擎、Python 依赖和网络相关的环境配置,帮助您正确配置 Activity 的执行环境。
Activity 执行时需要绑定到具体的计算资源组。不同类型的 Activity 使用不同的资源绑定方式。
SQL 和 Jar 类型的 Activity 通过 engineType + engineQueue 绑定计算引擎和队列:
- name: extract_orders type: sql engineType: emr_serverless_spark engineQueue: "{{project.parameters.computing_queue}}"
字段 | 说明 |
|---|---|
| 计算引擎类型 |
| 计算队列名称,决定使用哪组计算资源。 |
Notebook 中同时包含 SQL Cell 和 Python/Scala Cell,需分别配置资源:
- name: data_analysis type: notebook sqlEngineType: emr_serverless_spark sqlEngineQueue: "{{project.parameters.sql_queue}}" sqlComputingResourceGroupName: "{{project.parameters.sql_resource_group}}" generalComputingResourceGroupName: "{{project.parameters.python_resource_group}}"
字段 | 用途 |
|---|---|
| SQL Cell 的引擎和资源配置 |
| Python/Scala Cell 的资源组 |
资源组通过名称(Name)或 ID(Id)标识,同时提供时以 ID 优先:
# 方式一:仅按名称 computingResourceGroupName: default_python_group # 方式二:名称 + ID(推荐,更精确) computingResourceGroupName: default_python_group computingResourceGroupId: 3
说明
建议将资源组名称配置为项目参数,实现开发/生产环境使用不同的计算资源。
Jar 类型的 Activity 支持通过 sparkConf 自定义 Spark 运行参数:
- name: heavy_etl_job type: jar engineType: emr_serverless_spark engineQueue: prod_queue mainClass: com.example.HeavyETL sparkConf: - key: spark.executor.memory value: "8g" - key: spark.executor.instances value: "20" - key: spark.executor.cores value: "4" - key: spark.sql.shuffle.partitions value: "200" - key: spark.dynamicAllocation.enabled value: "true" - key: spark.dynamicAllocation.maxExecutors value: "50"
常用 Spark 配置项:
配置项 | 说明 | 建议值 |
|---|---|---|
| 单个 Executor 内存 | 根据数据量设置,通常 4g–16g。 |
| Executor 数量 | 根据数据量和并行度设置。 |
| 单个 Executor CPU 核心数 | 通常 2–4 |
| Shuffle 分区数 | 数据量大时增大(如 200–1000) |
| 动态资源分配 | 建议开启 |
Python 类型的任务(运行在 K8S 容器中)支持配置 Python 版本和 pip 依赖:
- name: data_transform type: python source: WORKSPACE path: /Workspace/Users/zhang3/scripts/transform.py computingResourceGroupName: default_python_group environment: version: "3.12" dependencies: - "pandas>=1.5.0" - "requests>=2.28.0" - "numpy>=1.24.0" - "pyarrow>=12.0.0"
字段 | 类型 | 说明 |
|---|---|---|
| Enum | Python 版本: |
| Array | pip 依赖列表,支持版本约束。 |
语法 | 含义 | 示例 |
|---|---|---|
| 大于等于 |
|
| 精确版本 |
|
| 小于等于 |
|
| 版本区间 |
|
无约束 | 安装最新版 |
|
注意
生产环境建议使用精确版本(==)或版本区间,避免因依赖升级导致不可预期的行为变化。
在 K8S 容器中运行的任务(Python、Shell)支持配置网络访问:
- name: sync_external_data type: python source: WORKSPACE path: /Workspace/Users/zhang3/scripts/sync.py computingResourceGroupName: default_python_group network: vpcId: vpc-xxxxx subnetIds: - subnet-xxxxx securityGroupIds: - sg-xxxxx
字段 | 类型 | 说明 |
|---|---|---|
| String | VPC ID |
| Array | 子网 ID 列表 |
| Array | 安全组 ID 列表 |
需要网络配置的场景:
说明
如果任务不需要访问外部网络资源,可省略 network 配置。
Jar 类型的 Activity 支持加载额外的依赖资源:
- name: etl_with_udf type: jar source: TOS path: tos://bucket/jars/etl-app.jar engineType: emr_serverless_spark engineQueue: prod_queue mainClass: com.example.ETLJob extraResources: - source: TOS path: tos://bucket/jars/common-udf.jar - source: RESOURCE path: /workspace/resources/connector.resource.yaml - source: WORKSPACE path: /Workspace/Users/zhang3/lib/utils.jar
source 值 | 说明 |
|---|---|
| 从对象存储加载。 |
| 从资源中心加载已注册的资源。 |
| 从工作空间加载。 |
典型用途:
Python、Shell、Jar 类型的 Activity 支持通过 arguments 传入命令行参数:
# Python 脚本 - name: transform type: python arguments: - "--mode" - "full" - "--date" - "{{pipeline.parameters.biz_date}}" # Jar 任务 - name: spark_job type: jar arguments: - "--biz_date" - "{{pipeline.parameters.biz_date}}" - "--partition" - "{{project.parameters.target_partition}}"
说明
arguments 中的每个元素对应一个命令行参数,按顺序传入脚本。支持使用 {{...}} 引用动态值。
建议 | 说明 |
|---|---|
资源组参数化 | 将 |
合理配置 Spark 参数 | 根据数据量和任务特点调整 Executor 数量和内存,避免资源浪费或不足。 |
锁定 Python 依赖版本 | 生产环境使用精确版本号,避免自动升级引发问题。 |
按需配置网络 | 仅在需要访问外部资源时配置 VPC 网络,减少不必要的网络暴露。 |
复用资源配置 | 相同类型的 Activity 使用统一的资源组和引擎配置,便于管理和扩缩容。 |