You need to enable JavaScript to run this app.
文档中心
E-MapReduce

E-MapReduce

复制全文
下载 pdf
EMR Serverless SDK 参考
Python Query SDK
复制全文
下载 pdf
Python Query SDK

简介

Python Query SDK 帮助 Serverless Spark 用户更加轻松地通过 Python 语言使用 Serverless Spark 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取、上传资源等。
本文提供了上述功能的示例代码,方便参考使用。

概念说明

系统概念

  • Endpoint:表示 Serverless Spark 对外服务的 API 域名
  • Region:表示 Serverless Spark 的数据中心所在的物理区域

目前 Serverless Spark 支持的地域和 API 域名如下表所示:

Region(中文名称)

Region

Endpoint

华北-北京

cn-beijing

emr-serverless.cn-beijing.volcengineapi.com

华东-上海

cn-shanghai

emr-serverless.cn-shanghai.volcengineapi.com

华南-广州

cn-guangzhou

emr-serverless.cn-guangzhou.volcengineapi.com

亚太东南-柔佛

ap-southeast-1

emr-serverless.ap-southeast-1.volcengineapi.com

  • Access Key / Secret Access Key:访问火山引擎 API 的密钥;用户可以通过火山引擎的“密钥管理”页面获取到 Access Key 和 Secret Access Key。

内部概念

  • Schema:一个包含数据表、资源、UDF 等信息的集合空间概念。
  • Task:定义某次任务的执行信息,包括 查询 SQL、执行方式(同步/异步)、任务名、参数等信息。
  • Job:表示某次 Task 执行生成的任务实例。
  • Result:表示某次 Job 的运行结果。
  • ResultSchema:运行结果的 Schema 信息。
  • Record:表示运行结果的结果集中的一行记录。

安装 SDK

Python 3.6+安装包:

python_serverless-1.0.3.6-py3-none-any.whl
未知大小

直接使用 wheel 安装:

# 解压安装包,用户需输入实际安装包名称
$ unzip python_serverless-1.0.3.6-py3-none-any.whl.zip 
# 安装SDK,用户需输入实际安装包名称
$ pip3 install python_serverless-1.0.3.6-py3-none-any.whl

快速入门

初始化客户端

Python Query SDK 目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:

from serverless.auth import StaticCredentials
from serverless.client import ServerlessClient

ak = 'your ak'
sk = 'your sk'
region = 'cn-beijing'
endpoint = 'emr-serverless.cn-beijing.volcengineapi.com'
service = 'emr_serverless'
connection_timeout = 30
socket_timeout = 30

client = ServerlessClient(credentials=StaticCredentials(ak, sk), 
    region=region, endpoint=endpoint, service=service,
    connection_timeout=connection_timeout, 
    socket_timeout=socket_timeout)

ServerlessClient 客户端是后续调用各种 Serverless Spark 功能的入口,当前版本 ServerlessClient 提供如下 API 接口:

API

功能

execute

执行作业

cancel_job

取消任务

get_job

获取任务实例状态

get_result

获取作业结果

第一个查询

初始化 Client 完成后,可通过执行相关 Task(目前支持 SQL,SparkJar 两种任务类型)来进行任务执行。
如下为一个进行简单 SQL 查询的例子:

sql = """
    SELECT * FROM `${your_schema}`.`${your_table}` LIMIT 100
"""

# 同步执行查询
job = client.execute(task=SQLTask(name="first query task", 
                            query=sql,
                            conf={}),
                    is_sync=True)

# 获取查询结果
if job.is_success():
    result = job.get_result()
    for record in result:
        print(', '.join([col for col in record]))

作业运行&取消

本节将以代码示例的形式展示更多 Serverless Spark 功能的使用方式。

提交 SQL 任务

SQLTask 是用于执行 SQL 查询任务的接口。主要提供如下参数:

参数

类型

是否必须

描述

query

str

Y

sql 语句

name

str

N

任务名

说明

如果不指定会以 SQLTask_${current_timestamp} 的方式生成

conf

dict

N

用于指定任务参数,默认为空

queue

str

N

指定运行队列名,不填则将选用公共队列

示例:

def execute_sql_task():
    from serverless.client import ServerlessClient
    from serverless.auth import StaticCredentials
    from serverless.task import SQLTask
    from serverless.exceptions import QuerySdkError

    ak = 'xxxx'
    sk = 'xxxx'
    region = 'cn-beijing'
    service = 'emr_serverless'
    endpoint = 'emr-serverless.cn-beijing.volcengineapi.com'
    sql = '${CUSTOM_SQL_STATEMEMT}'

    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region,endpoint=endpoint)

    try:
        job = client.execute(task=SQLTask(name='sql task', query=sql, 
        conf={
              # 计算组名,可选,默认选用Default计算组
              #"serverless.compute.group.name": "xxx"
        }), is_sync=True)
        if job.is_success():
            result = job.get_result()
            for record in result:
                print(', '.join([col for col in record]))

    except QuerySdkError as e:
        print(
            "Error in executing sql task. code = %s, error = %s" % (
            e.error_code, e.info))


if __name__ == "__main__":
    execute_sql_task()

提交 SparkJar 任务

SparkJar 任务为用户提供了通过编写 Spark 应用进行定制化数据分析需求的支持。详见 Spark Jar 作业开发指南 文档。
JarTask 是 SDK 提供 SparkJar 任务执行的接口:

参数

类型

是否必须

描述

jar

str

Y

任务执行时使用的 SparkJar 资源,需传入tos路径,例如:['tos://bucket/path/to/jar']

main_class

str

Y

Spark application 的 main class

main_args

list

N

spark application 的 main function 参数;不传默认为 empty list

name

str

N

任务名;如果不指定会以 SparkJarTask_${current_time} 的方式生成

conf

dict

N

用于指定任务参数,默认为空

queue

str

N

指定运行队列名,不填则将选用公共队列

depend_jars

str[ ]

N

依赖的jar文件,对应spark-submit的--jars选项,例如:['tos://bucket/path/to/jar']

files

str[ ]

N

依赖的文件,对应spark-submit的--files选项, ,例如:['tos://bucket/path/to/file']

archives

str[ ]

N

依赖的archive文件,对应spark-submit的--archieves选项, ,例如:['tos://bucket/path/to/archive']

示例:

def execute_spark_task():
    from serverless.client import ServerlessClient
    from serverless.auth import StaticCredentials
    from serverless.task import JarTask

    ak = 'xxxx'
    sk = 'xxxx'
    region = 'cn-beijing'
    service = 'emr_serverless'
    endpoint = 'emr-serverless.cn-beijing.volcengineapi.com'

    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)

    jar_resource = 'tos://bucket/path/to/jar'
    main_class = 'com.xxx.xxx'
    main_args = ['arg_xxx', 'arg_yyy']
    job = client.execute(task=JarTask(name="spark task test",
        jar=jar_resource,
        main_class=main_class,
        main_args=main_args,
        conf={'serverless.spark.access.key': '${ak}', 'serverless.spark.secret.key': '${sk}'}
),
    is_sync=True)

    print('The task executed successfully.')
    print('Tracking ui: %s' % job.get_tracking_url())


if __name__ == "__main__":
    execute_spark_task()

提交 PySpark 任务

Pyspark 任务为用户提供了通过编写 Python Spark 应用进行定制化数据分析需求的支持。详见 PySpark 作业
PySparkTask 是 SDK 提供 PySpark 任务执行的接口:

参数

类型

是否必须

描述

script

str

Y

任务执行时使用的 SparkJar 资源,需传入tos路径,例如:['tos://bucket/path/to/pyfile']

args

list

N

spark application 的 main function 参数,默认为 empty list

name

str

N

任务名。如果不指定会以 SparkJarTask_${current_time} 的方式生成

conf

dict

N

用于指定任务参数,默认为空

queue

str

N

指定运行队列名,不填则将选用公共队列

depend_jars

[]str

N

依赖的jar文件,对应spark-submit的--jars选项,例如:['tos://bucket/path/to/jar']

files

[]str

N

依赖的文件,对应spark-submit的--files选项, ,例如:['tos://bucket/path/to/file']

archives

[]str

N

依赖的archive文件,对应spark-submit的--archieves选项, ,例如:['tos://bucket/path/to/archive']

pyfiles

[]str

N

依赖的pyfile文件,对应spark-submit的--pyfiles选项,,例如:['tos://bucket/path/to/pyfile']

示例:

def execute_pyspark_task():
    from serverless.client import ServerlessClient
    from serverless.auth import StaticCredentials
    from serverless.task import PySparkTask

    ak = 'xxxx'
    sk = 'xxxx'
    region = 'cn-beijing'
    service = 'emr_serverless'
    endpoint = 'emr-serverless.cn-beijing.volcengineapi.com'

    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)

    pyscript_resource = 'tos://bucket/path/to/pyfile'
    main_args = ['arg_xxx', 'arg_yyy']
    files = ['tos://bucket/path/to/dependency_pyfile']
    task = PySparkTask(name='pyspark task test',
        # queue='${queue_name}',
        script=pyscript_resource,
        args=main_args,
        conf={'serverless.spark.access.key': '${tos_ak}',
            'serverless.spark.secret.key': '${tos_sk}'},
        files=files
    )
        
    job = client.execute(task=task, is_sync=True)

    print('The task executed successfully.')
    print('Tracking ui: %s' % job.get_tracking_url())


if __name__ == "__main__":
    execute_pyspark_task()

提交Ray作业

from serverless import RayJobTask


_job = _client.execute(task=RayJobTask(name="test rayjob task",
        conf={
            # 计算组名,可选,默认选用Default计算组
            # "serverless.compute.group.name": "xxx",
            # 如需使用自定义镜像,添加该配置,替换镜像地址
            # "serverless.ray.version": "2.22.0",
            # "serverless.ray.image": "emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-ray/ray:2.30.0-py3.11-ubuntu20.04-240",
        },
        head_cpu='4',
        head_memory='16Gi',
        worker_cpu='4',
        worker_memory='16Gi',
        worker_replicas=2,
        # entrypoint_cmd 格式为 /home/ray/workdir/{zip包名}/{py文件名}
        # 其中 /home/ray/workdir 目前为固定值
        entrypoint_cmd='python /home/ray/workdir/rayzip.py',
        # tos 文件路径
        entrypoint_resource='tos://wbw-dev/rayunzip/rayzip.py.zip',
        runtime_env={
            # 需要runtime引入的pip包
            "pip": ["requests==2.26.0", "pendulum==2.1.2"],
            "env_vars": {
                "counter_name": "test_counter",
            },
        },
        # queue="xxx",
    ),
    is_sync=False,
)

参数说明:

参数

描述

name

作业名称

head_cpu & head_memory

设置Ray head node的cpu和memory,格式与k8s resource一致,例如 4Gi

worker_cpu & worker_memory

设置Ray worker node的cpu和memory,格式与k8s resource一致,例如 4Gi

worker_replicas

设置Ray worker node的节点个数

entrypoint_cmd

用于运行作业的entrypoint shell command

entrypoint_resource

用于运行作业的相关资源文件,目前支持传入一个tos路径,作业执行时将会对该文件进行解压

runtime_env

设置Ray作业的运行时环境,可用于指定环境变量/工作目录/pip安装依赖

is_sync

设置作业是否是同步执行;设置为true,则execute函数将会同步阻塞直至作业结束,设置为false,则execute函数在任务提交后将会立即返回

同步/异步执行

通过is_sync 参数进行控制:

参数

类型

是否必须

描述

task

Task

Y

需要执行的任务

is_sync

bool

Y

是否同步执行

timeout

int

N

同步执行的超时时间,单位 :s

# 异步
client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=False)

# 同步执行,3 分钟超时
client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=True, timeout=180)

取消任务

# 取消任务可以通过 job 实例;也可以通过 ServerlessClient 进行取消
job.cancel()
client.cancel_job(job)

查看任务实例相关信息

获取任务实例

可以根据任务 ID 进行任务实例的获取:

job = client.get_job(jobId)

获取引擎侧任务执行 UI

从拿到的任务实例获取任务对应的 Spark UI 页面:

job.get_tracking_url()

查看执行日志

_job = _client.get_job('${job_id}')
_log_cursor = _client.get_driver_log(_job)
while _log_cursor.has_next():
    _log_cursor.fetch_next_page()
    current_rows = _log_cursor.current_rows
    for log_entry in current_rows:
        print(log_entry)

查询提交日志

# 查看提交日志
_log_cursor = _client.get_submission_log(_submission_failed_job)
while _log_cursor.has_next():
    _log_cursor.fetch_next_page()
    current_rows = _log_cursor.current_rows
    for log_entry in current_rows:
        print(log_entry)

等待任务

异步调用后,如果想重新同步阻塞等待任务到达某种状态,可以尝试调用 wait_for() 函数:

# 等待任务结束
job.wait_for_finished()

# 自定义结束状态
def when_to_exit() -> bool:
    return job.get_tracking_url() is not None

job.wait_for(when_to_exit=_when_to_exit, timeout=180)

获取查询结果

对job实例调用 get_result() 获取任务的查询结果:

result = job.get_result()

# 或者由 serverless client 侧获取
result = client.get_result(job)

for record in result:
    print("row: (%s, %s, %s)" % (record[0], record[1], record[2]))

7 执行异常

任务异常将会以 QuerySdkError 的形式进行抛出,exception message 内携带具体的执行错误信息。

最近更新时间:2026.06.13 16:10:42
这个页面对您有帮助吗?
有用
有用
无用
无用