遵循本文所述步骤,您可完整体验从图片数据入湖、图片数据集处理,到文搜图 / 图搜图的全流程端到端实践。通过该过程,您可以掌握以下技能:
本次实践使用以下 1000 张动物图片进行图片数据入湖、处理、增值消费的全流程的测试,您可单击下载此示例数据,也可自行准备测试图片。
注意
您需要开通好火山引擎对象存储 TOS,并创建好 TOS bucket 用于本次实验过程中存储测试的图片数据。
准备工作 | 操作要点 | 参考文档 | 配置示例 |
|---|---|---|---|
创建 Bucket |
| Bucket名:las-doctest |
您需要登录进去访问控制(IAM)控制台,准备一个用于后续使用 API 方式访问 TOS 、LAS 的 AK和SK。
准备工作 | 操作要点 | 参考文档 | 配置示例 |
|---|---|---|---|
获取AK、SK | 为了确保访问 TOS/LAS 服务的安全性,您必须提供有效的访问密钥 (AK/SK),用于后续的鉴权。
|
您需要开通好 LAS 服务,并创建好用于本次实验的队列资源和开发机环境。
准备工作 | 操作要点 | 参考文档 | 配置示例 |
|---|---|---|---|
队列资源 |
| 参见操作要点 | |
LAS API Key | 您调用 LAS 在线算子之前,您需要先生成 API Key 用于算子调用的鉴权。
注意 LAS 的为您提供了离线和在线两类数据处理算子,仅在线算子需使用 LAS API Key 鉴权。
| 参见操作要点 | |
开发机 | 您调用 LAS 离线算子之前,您需要先准备好离线算子的调用开发环境。
|
注意
本实践步骤的主要目的是将图片上传至TOS Bucket中,并在 LAS 创建一个图片数据集,后续即可直接在 LAS 的图片数据集对图片进行预览、数据处理等操作。
登录至 TOS 产品控制台后,将待处理的图片上传至TOS Bucket中。
操作步骤 | 操作要点 | 参考文档 | 配置示例 |
|---|---|---|---|
上传图片 | 在准备好的 TOS Bucket 中上传待处理的图片。您可根据需要创建文件夹,将图片上传至文件夹中。 | TOS 路径为: |
登录 LAS 控制台,单击左侧导航栏的 数据集 > 通用数据集,进入数据集页面后单击“创建数据集”。参考以下配置要点,配置数据集参数,完成后单击“创建”,完成图片数据集创建。
操作步骤 | 操作要点 | 参考文档 | 配置示例 |
|---|---|---|---|
创建图片数据集 |
|
|
数据集创建完成后,您可在数据集详情页面查看数据集基本信息,对于图片数据集,支持预览前十条图片。
注意
本实践步骤的主要目的是将图片数据集转为 Lance 数据集,实现图片数据入湖,便于后续对图片数据进行图片内容理解、向量化等数据处理操作。
您需要先登录 TOS 控制台,准备好一个用于存储转换后 Lance 数据集数据的 TOS 路径。本示例的 Lance 数据集的 TOS 路径为:tos://las-doctest/image2lance/。
此步骤的操作主要在开发机中使用VeDaft调用SDK对数据集进行读写等操作,过程中涉及TOS Bucket的访问等,建议将一些鉴权参数、固定的路径参数配置为开发环境的环境变量,便于后续实际执行代码中引用。
您可参考下文准备好环境变量的配置。
# 地域设置,本示例使用华北2-北京地域 export REGION="cn-beijing" # 鉴权AK相关环境变量 # 访问TOS、LAS的认证sk、ak export LAS_TOS_SECRET_KEY="<your_sk>" export LAS_TOS_ACCESS_KEY="<your_ak>" # TOS 访问相关环境变量 # TOS的访问端点 export LAS_TOS_ENDPOINT="https://tos-cn-beijing.ivolces.com" # TOS端点(用于TOSConfig) export TOS_ENDPOINT="https://tos-cn-beijing.ivolces.com"
在开发机环境中执行 source env.sh 命令,使环境变量生效。
# 请在 LAS 开发机中执行以下代码 import os """ 转为 Lance 数据集 """ import daft from daft.io import CreateLasDatasetOptions, IOConfig, LanceWriteOptions from daft.las.io import TOSConfig # 您需要设置图片数据集名称,并设置新的Lance数据集的TOS路径 dataset_name = "las_dataset_image" lance_tos_dir = f"tos://las-doctest/image2lance/las_dataset_image_lance.lance" LANCE_DATASET_FORMAT = "lance" # 创建相关的配置 io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) write_options = LanceWriteOptions(io_config=io_config, uri=lance_tos_dir) create_ds_options = CreateLasDatasetOptions( nick_name="daft_test_lance_write", privacy="public", description="This is my dataset", ) # 读取原始数据 df = daft.read_las_dataset(name=dataset_name) # 在这里进行创建,新建的Lance数据集名称规则为:图片数据集名称加上_lance后缀 new_dataset_name = dataset_name + "_lance" df.write_las_dataset( name=new_dataset_name, format=LANCE_DATASET_FORMAT, write_options=write_options, create_ds_options=create_ds_options, ) # 读取转换后的新数据集 df = daft.read_las_dataset(name=new_dataset_name) print("\n\nnew dataset:") df.show()
您需要修改以下参数。
las_dataset_image其他参数可保持示例值即可。
new dataset: ERROR:daft_local_execution.runtime_stats:RuntimeStatsManager finished with active nodes {0} ╭────────────────────────────────┬────────┬──────────╮ │ image ┆ size ┆ num_rows │ │ --- ┆ --- ┆ --- │ │ String ┆ Int64 ┆ Int64 │ ╞════════════════════════════════╪════════╪══════════╡ │ s3://las-doctest/images/anima… ┆ 15390 ┆ None │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 240478 ┆ None │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 15008 ┆ None │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 11645 ┆ None │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 9909 ┆ None │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 14252 ┆ None │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 3226 ┆ None │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 878304 ┆ None │ ╰────────────────────────────────┴────────┴──────────╯ (Showing first 8 rows)
转换后的 Lance 数据集包含“image”、“size”、“num_rows”字段。其中:
回显中如果出现ERROR提示,但是正常返回了数据集数据,可忽略ERROR提示。
完成转 Lance 数据集的操作后,您可登录 LAS 控制台,在数据集列表页面查看新创建的Lance数据集。
注意
本实践步骤的主要目的是对图片进行简单处理,将图片二进制数据也写入入湖后的 Lance 数据集中,便于后续在 Lance 数据集中直接预览图片内容,以及为后续对图片数据进行图片内容理解、向量化等数据处理操作做准备。
# 地域设置,本示例使用华北2-北京地域 export REGION="cn-beijing" # 鉴权AK相关环境变量 # 访问TOS、LAS的认证sk、ak export LAS_TOS_SECRET_KEY="<your_sk>" export LAS_TOS_ACCESS_KEY="<your_ak>" # TOS 访问相关环境变量 # TOS的访问端点 export LAS_TOS_ENDPOINT="https://tos-cn-beijing.ivolces.com" # TOS端点(用于TOSConfig) export TOS_ENDPOINT="https://tos-cn-beijing.ivolces.com" # 数据集固定参数也可设置为环境变量 export DATASET_NAME="las_dataset_image_lance"
本步骤新增了:
source env.sh 命令,使环境变量生效。在开发机中执行以下脚本,将图片处理为二进制后,写入 Lance 数据集。
import os # 从环境变量引入以下参数取值 LAS_TOS_ACCESS_KEY = os.environ["LAS_TOS_ACCESS_KEY"] LAS_TOS_SECRET_KEY = os.environ["LAS_TOS_SECRET_KEY"] TOS_ENDPOINT = os.environ["TOS_ENDPOINT"] REGION = os.environ["REGION"] dataset_name = os.environ["DATASET_NAME"] import pyarrow as pa import tos from daft.io.lance import merge_columns from daft.io import IOConfig from daft.las.io import TOSConfig # 从 tos 路径读取图片数据 def read_tos_path(path: str, tos_client: tos.TosClientV2): if (not path) or path.strip() == "": return None if path.startswith("tos://"): path = path[len("tos://") :] if path.startswith("s3://"): path = path[len("s3://") :] bucket, path = path.split("/", 1) obj = tos_client.get_object(bucket=bucket, key=path) return obj.read() def get_create_assign_raw_image_func(input_col: str, output_col: str): def create_assign_raw_image(batch: pa.RecordBatch) -> pa.RecordBatch: paths = batch.column(input_col).to_pylist() tos_client = tos.TosClientV2(LAS_TOS_ACCESS_KEY, LAS_TOS_SECRET_KEY, TOS_ENDPOINT, REGION) image_data_list = [] for path in paths: data = read_tos_path(path, tos_client) image_data_list.append(data) image_data_array = pa.array(image_data_list, type=pa.binary()) new_batch = pa.RecordBatch.from_arrays([image_data_array], names=[output_col]) # 仅返回新列 return new_batch return create_assign_raw_image print("正在将图片原始数据存入 lance 数据集中 ...") # 您需要修改以下代码中的lance_tos_dir、merge_columns中transform参数 io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) lance_tos_dir = f"tos://las-doctest/image2lance/{dataset_name}.lance" merge_columns( uri=lance_tos_dir.replace("tos://", "s3://"), io_config=io_config, transform=get_create_assign_raw_image_func("image", "raw_image"), ) print("图片原始数据已存入 lance 数据集")
您需要根据实际情况修改示例代码中的以下参数:
如果因为误操作导致数据集中出现无用字段或无用数据,可参考此内容进行删除:参考:常见数据集操作示例。
读取数据集:
""" 读取数据 """ import os import daft dataset_name = os.environ["DATASET_NAME"] df = daft.read_las_dataset(name=dataset_name) df.show()
代码输出:
ERROR:daft_local_execution.runtime_stats:RuntimeStatsManager finished with active nodes {0} ╭────────────────────────────────┬────────┬──────────┬────────────────────────────────╮ │ image ┆ size ┆ num_rows ┆ raw_image │ │ --- ┆ --- ┆ --- ┆ --- │ │ String ┆ Int64 ┆ Int64 ┆ Binary │ ╞════════════════════════════════╪════════╪══════════╪════════════════════════════════╡ │ s3://las-doctest/images/anima… ┆ 15390 ┆ None ┆ b"ÿØÿà�JFI… │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 240478 ┆ None ┆ b"ÿØÿà�JFI… │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 15008 ┆ None ┆ b"ÿØÿà�JFI… │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 11645 ┆ None ┆ b"ÿØÿà�JFI… │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 9909 ┆ None ┆ b"ÿØÿà�JFI… │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 14252 ┆ None ┆ b"ÿØÿà�JFI… │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 3226 ┆ None ┆ b"ÿØÿà�JFI… │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/images/anima… ┆ 878304 ┆ None ┆ b"ÿØÿà�JFI… │ ╰────────────────────────────────┴────────┴──────────┴────────────────────────────────╯ (Showing first 8 rows)
注意
本实践步骤的主要目的是调用 LAS 的数据处理算子,对图片进行数据增值处理:
# 地域设置,本示例使用华北2-北京地域 export REGION="cn-beijing" # 鉴权AK相关环境变量 # 访问TOS、LAS的认证sk、ak export LAS_TOS_SECRET_KEY="<your_sk>" export LAS_TOS_ACCESS_KEY="<your_ak>" # 调用 LAS 算子的LAS API Key,使用在线方式(API)调用时需要 export LAS_API_KEY="464812df-2dfb-41f6-********" # TOS 访问相关环境变量 # TOS的访问端点 export LAS_TOS_ENDPOINT="https://tos-cn-beijing.ivolces.com" # TOS端点(用于TOSConfig) export TOS_ENDPOINT="https://tos-cn-beijing.ivolces.com" # 数据集固定参数也可设置为环境变量 export DATASET_NAME="las_dataset_image_lance" export TOS_LANCE_DIR="tos://las-doctest/image2lance/las_dataset_image_lance.lance" # 测试配置 # 测试用的示例图片URL export TEST_IMAGE_URL="https://ark-project.tos-cn-beijing.volces.com/images/view.jpeg"
本步骤新增了:
source env.sh 命令,使环境变量生效。您可以使用一张测试图片URL,快速体验 LAS 的多模态深度思考算子对图片内容理解的处理结果。以下以调用:多模态深度思考(Doubao-seed-1.8)算子,进行快速体验,在开发机中执行以下curl命令,即可快速体验 LAS 算子对图片理解处理的结果示例。
curl --location "https://operator.las.cn-beijing.volces.com/api/v1/chat/completions" --header "Content-Type: application/json" --header "Authorization: Bearer $LAS_API_KEY" --data '{ "messages": [ { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": "https://ark-project.tos-cn-beijing.volces.com/images/view.jpeg" } }, { "type": "text", "text": "图片里面有什么" } ] } ], "model": "doubao-seed-1-8-251228" }'
返回示例:
{"model":"doubao-seed-1-8-251228","usage":{"prompt_tokens":270,"completion_tokens":521,"total_tokens":791,"prompt_tokens_details":{"cached_tokens":0,"provisioned_tokens":null},"completion_tokens_details":{"reasoning_tokens":354,"provisioned_tokens":null}},"created":1773200390,"id":"0217732003734693dfd182f******","choices":[{"finish_reason":"stop","index":0,"logprobs":null,"message":{"content":"这是一幅充满自然静谧感的户外场景: - **前景**:开阔的深蓝色湖面水波轻漾,一艘亮橙色的单人皮划艇浮在水面,艇上坐着一人,正抬起黑色船桨,似乎正在划行,水面清晰倒映出皮划艇与人的身影。 - **中景**:湖对岸是一片茂密的深绿色针叶林,林间萦绕着淡淡的薄雾,给树林添了几分朦胧感。 - **背景**:树林后方是连绵巍峨的雪山,山体被冰雪覆盖,山峦硬朗的轮廓在光影下清晰可见,尽显雄伟壮阔。 - **天空**:上方是澄澈的淡蓝色天空,点缀着几缕轻薄的云丝,整体氛围开阔宁静,传递出户外自然的闲适与苍茫感。","reasoning_content":"用户现在需要描述这张图片里的内容,首先看整体:首先是前景,一片平静的深蓝色湖泊,水面上有一个橙色的皮划艇,艇上有一个人,正举着黑色的船桨,似乎在划行,水面有他和皮划艇的倒影。然后中间部分,湖泊对岸是茂密的深绿色针叶林,树林上方有淡淡的雾气,显得很静谧。背景是巍峨的雪山,山体覆盖着积雪,轮廓硬朗,在蓝天之下显得很壮阔。天空是浅蓝色,飘着几缕轻薄的白云,整体氛围宁静又开阔,有种户外自然的闲适感。现在把这些元素整理清楚,分层次描述。 首先看主体: 1. 水面与皮划艇:前景是开阔平静的深蓝色湖面,水面有轻微波纹,一艘亮橙色的单人皮划艇在湖面,艇上坐着一个人,正抬起黑色船桨,似乎在进行划行动作,水面倒映出皮划艇和人的身影。 2. 中景:湖的对岸是一片茂密的深色针叶林(看起来像冷杉、松树这类),林间有淡淡的薄雾,给树林蒙上了一层朦胧感。 3. 背景:树林后方是连绵巍峨的雪山,山体被冰雪覆盖,山峦的轮廓清晰,在光影下显现出岩石与积雪的质感,显得很雄伟。 4. 天空:上方是澄澈的淡蓝色天空,点缀着几缕轻薄的白色云丝,整体天气晴朗,让整个画面的氛围宁静又开阔,充满自然的静谧与壮阔感。 现在把这些组织成通顺的描述,让内容清晰有条理。","role":"assistant"}}],"object":"chat.completion"}
以下使用图文 embedding(豆包系列模型)作为示例,为您演示如何批量将图文内容向量化。处理时,会在 Lance 数据集中增加新列:image_embedding,用于写入 向量化结果数据。图片向量化后,可以支持后续以图搜图的场景
在开发机中执行以下脚本。
#本段代码需要先执行"source env.sh", env.sh文件在"配置环境变量"部分 # 导入必要的库 import os import daft import lance import pandas as pd import pyarrow as pa from daft import col from daft.io import IOConfig from daft.io.object_store_options import io_config_to_storage_options from daft.las.functions.ark_llm.doubao_embedding_vision import DoubaoEmbeddingVision from daft.las.functions.udf import las_udf from daft.las.io import TOSConfig from daft.las.infra.las_dataset import LasDatasetClient, LasDatasetConfig # 从环境变量获取配置 TOS_LANCE_DIR = os.getenv("TOS_LANCE_DIR") # Lance数据集路径 DATASET_NAME = os.getenv("DATASET_NAME") # 数据集名称 # 批处理参数 BATCH_SIZE = int(os.getenv("BATCH_SIZE", "10")) # 单次处理行数上限 MAX_CYCLES = int(os.getenv("MAX_CYCLES", "5")) # 最大批次数 # 列名配置 IMAGE_EMBEDDING_NEW_COLUMN = "image_embedding" # 新 embedding 列名 DOUBAO_LLM_COLUMN = "doubao_llm_result" # 豆包 LLM 结果列名 # 设置 TOS 对象存储相关环境变量 os.environ["LAS_TOS_ACCESS_KEY"] = os.getenv("LAS_TOS_ACCESS_KEY") os.environ["LAS_TOS_SECRET_KEY"] = os.getenv("LAS_TOS_SECRET_KEY") os.environ["TOS_ENDPOINT"] = os.getenv("LAS_TOS_ENDPOINT") # 配置 IO 设置,用于访问 TOS 对象存储 io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) def update_image_embedding(): # 将 TOS 路径转换为 S3 兼容路径 lance_path = TOS_LANCE_DIR.replace("tos://", "s3://") # 检查 embedding 列是否存在,不存在则创建 storage_options = io_config_to_storage_options(io_config, lance_path) lance_ds = lance.dataset(uri=lance_path, storage_options=storage_options) existing_columns = [field.name for field in lance_ds.schema] column_exists = IMAGE_EMBEDDING_NEW_COLUMN in existing_columns print(f"{IMAGE_EMBEDDING_NEW_COLUMN} 列存在: {column_exists}") if not column_exists: print(f"创建 {IMAGE_EMBEDDING_NEW_COLUMN} 列...") vec_type = pa.list_(pa.float64()) @lance.batch_udf() def add_empty_column(batch): null_array = pa.array([None] * batch.num_rows, type=vec_type) return pa.RecordBatch.from_arrays([null_array], names=[IMAGE_EMBEDDING_NEW_COLUMN]) lance_ds.add_columns(add_empty_column) print(f"成功创建 {IMAGE_EMBEDDING_NEW_COLUMN} 列") # 读取完整数据集,过滤出待处理行 df = daft.read_lance(lance_path, io_config=io_config) all_data = df.collect().to_pandas() if IMAGE_EMBEDDING_NEW_COLUMN in all_data.columns: pending_data = all_data[all_data[IMAGE_EMBEDDING_NEW_COLUMN].isna()].copy() pending_data = pending_data.reset_index(drop=True) print(f"跳过已有 embedding 的 {len(all_data) - len(pending_data)} 行") else: pending_data = all_data.copy() print(f"待处理图片总数: {len(pending_data)}") process_limit = MAX_CYCLES * BATCH_SIZE pending_data = pending_data.iloc[:process_limit].copy() print(f"本次实际处理: {len(pending_data)} 张图片") if pending_data.empty: print("所有图片已处理完成,无需继续") return pending_data[DOUBAO_LLM_COLUMN] = ( pending_data[DOUBAO_LLM_COLUMN] .fillna("图片内容") .replace("", "图片内容") ) # 2. 构建 Daft DataFrame 并调用算子批量计算 embedding pending_daft_df = daft.from_pandas(pending_data) pending_daft_df = pending_daft_df.with_column( IMAGE_EMBEDDING_NEW_COLUMN, las_udf( DoubaoEmbeddingVision, construct_args={ "image_format": "jpeg", # 按需调整图片格式 "source_type": "url", # image 列存放的是 URL }, )(col("image"), col(DOUBAO_LLM_COLUMN)), # image列 + 文本列 → 图文联合向量 ) result_df = pending_daft_df.collect().to_pandas() # ------------------------------------------------------------------ # 过滤掉算子调用失败(返回 None)的行 valid_mask = result_df[IMAGE_EMBEDDING_NEW_COLUMN].notna() valid_df = result_df[valid_mask].copy() print(f"成功生成 embedding: {len(valid_df)} 行,失败: {(~valid_mask).sum()} 行") if valid_df.empty: print("没有成功生成的 embedding,跳过更新") return # 更新 Lance 数据集(与原逻辑一致) try: lance_ds = lance.dataset(uri=lance_path, storage_options=storage_options) update_table = pa.Table.from_pandas(valid_df, preserve_index=False) update_table = update_table.cast(lance_ds.schema) lance_ds.merge_insert("image").when_matched_update_all().execute(update_table) print("数据集更新成功") except Exception as e: print(f"更新数据集失败: {e}") print("尝试备用方案(内存更新,不写回)...") try: current_df = daft.read_lance(lance_path, io_config=io_config).collect().to_pandas() for _, row in valid_df.iterrows(): mask = current_df["image"] == row["image"] idx = current_df[mask].index if len(idx) > 0: current_df.at[idx[0], IMAGE_EMBEDDING_NEW_COLUMN] = row[IMAGE_EMBEDDING_NEW_COLUMN] print("备用方案:内存已更新,但未写回数据集") except Exception as e2: print(f"备用方案也失败: {e2}") print(f"\n处理完成!共处理了 {len(valid_df)} 张图片") # 程序入口点 if __name__ == "__main__": update_image_embedding()
您需按需修改示例代码中的以下参数:
对图片完成处理后,您可查看Lance 数据集,检查处理结果。
读取数据集:
""" 读取数据 """ import os import daft dataset_name = os.environ["DATASET_NAME"] df = daft.read_las_dataset(name=dataset_name) df.show()
返回示例:
ERROR:daft_local_execution.runtime_stats:RuntimeStatsManager finished with active nodes {0} ╭────────────────────┬────────┬────────────┬───────────────────┬───────────────────╮ │ image ┆ size ┆ … ┆ llm_result03 ┆ image_embedding │ │ --- ┆ --- ┆ ┆ --- ┆ --- │ │ String ┆ Int64 ┆ (3 hidden) ┆ String ┆ List[Float64] │ ╞════════════════════╪════════╪════════════╪═══════════════════╪═══════════════════╡ │ s3://las-doctest/i ┆ 15390 ┆ … ┆ {"description": ┆ [0.06982421875, │ │ mages/anima… ┆ ┆ ┆ "一群袋鼠站在绿色 ┆ 0.00019168853… │ │ ┆ ┆ ┆ 的草地上… ┆ │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/i ┆ 240478 ┆ … ┆ {"description":" ┆ [0.02880859375, │ │ mages/anima… ┆ ┆ ┆ 一只棕黄色带斑纹 ┆ -0.0268554687… │ │ ┆ ┆ ┆ 的蜥蜴,尾… ┆ │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/i ┆ 15008 ┆ … ┆ {"content": "一只 ┆ [0.040771484375, │ │ mages/anima… ┆ ┆ ┆ 黑色羽毛、颈部有 ┆ -0.040039062… │ │ ┆ ┆ ┆ 橙红色裸露皮… ┆ │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/i ┆ 11645 ┆ … ┆ {"description": ┆ [-0.0003032684326 │ │ mages/anima… ┆ ┆ ┆ "一只蝙蝠展开翅膀 ┆ 171875, -0.0… │ │ ┆ ┆ ┆ 停在红苹… ┆ │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/i ┆ 9909 ┆ … ┆ {"description": ┆ [0.00689697265625 │ │ mages/anima… ┆ ┆ ┆ "一只小考拉被灰色 ┆ , -0.0222167… │ │ ┆ ┆ ┆ 布料包裹… ┆ │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/i ┆ 14252 ┆ … ┆ {"description": ┆ [-0.015380859375, │ │ mages/anima… ┆ ┆ ┆ "一个穿粉色上衣的 ┆ -0.02966308… │ │ ┆ ┆ ┆ 小女孩,… ┆ │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/i ┆ 3226 ┆ … ┆ {"content": "一只 ┆ [0.0279541015625, │ │ mages/anima… ┆ ┆ ┆ 白色的鹅,喙和脚 ┆ -0.03710937… │ │ ┆ ┆ ┆ 呈橙色,站立… ┆ │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-doctest/i ┆ 878304 ┆ … ┆ {"description": ┆ [0.036865234375, │ │ mages/anima… ┆ ┆ ┆ "图片展示了一只毛 ┆ 0.0063171386… │ │ ┆ ┆ ┆ 发乌黑的… ┆ │ ╰────────────────────┴────────┴────────────┴───────────────────┴───────────────────╯ (Showing first 8 rows)
至此,我们已经完成Lance格式的图片数据集的创建、图片标签,图片向量等操作。我们可以基于这些信息,实现文搜图,图搜图,以及模型训练等。
如果有更多业务场景需求,更多图片标签数据,可以参考前面的步骤,增加更多信息,如GPT/Claude等图片识别信息列等等。
以下图片数据消费的示例是通过文搜图、图搜图进行示例,有图片搜索结果展示的需求,因此使用 Jupyter notebook 来运行示例代码,您需先安装好 Jupyter notebook 。
import daft import os from PIL import Image import io import matplotlib.pyplot as plt # ✅ 用 UDF 实现字符串包含判断 @daft.udf(return_dtype=daft.DataType.bool()) def contains_keyword(col, keyword): return [keyword in (s or "") for s in col.to_pylist()] def query_pic_with_word(dataset_name, query_word): df = daft.read_las_dataset(name=dataset_name) keyword = "袋鼠" # ✅ 使用 UDF 过滤 result = df.where(contains_keyword(df["doubao_llm_result"], keyword)) result_data = result.collect() for i, row in enumerate(result_data.to_pylist()): print(f"URL: {row['image']}") if 'raw_image' in row and row['raw_image'] is not None: image_data = row['raw_image'] raw_image = Image.open(io.BytesIO(image_data)) plt.figure(figsize=(8, 6)) plt.imshow(raw_image) plt.axis('off') plt.show() if 'doubao_llm_result' in row and row['doubao_llm_result']: print(f"描述: {row['doubao_llm_result']}") print("-" * 50) def main(): dataset_name = "las_dataset_image_lance" query_word = "帮我查询袋鼠图片" query_pic_with_word(dataset_name, query_word) if __name__ == "__main__": main()
本文搜图的示例代码,为向Lance 数据集中搜索“袋鼠”相关图片。
其中main函数中:
import numpy as np from daft.las.functions.ark_llm.doubao_embedding_vision import DoubaoEmbeddingVision from daft.las.functions.udf import las_udf from daft import col import daft import os from PIL import Image import io import matplotlib.pyplot as plt # ✅ 用 UDF 实现字符串包含判断 @daft.udf(return_dtype=daft.DataType.bool()) def contains_keyword(col, keyword): return [keyword in (s or "") for s in col.to_pylist()] def query_pic_with_word(dataset_name, query_word): df = daft.read_las_dataset(name=dataset_name) keyword = "袋鼠" # ✅ 使用 UDF 过滤 result = df.where(contains_keyword(df["llm_result03"], keyword)) result_data = result.collect() for i, row in enumerate(result_data.to_pylist()): print(f"URL: {row['image']}") if 'raw_image' in row and row['raw_image'] is not None: image_data = row['raw_image'] raw_image = Image.open(io.BytesIO(image_data)) plt.figure(figsize=(8, 6)) plt.imshow(raw_image) plt.axis('off') plt.show() if 'llm_result03' in row and row['llm_result03']: print(f"描述: {row['llm_result03']}") print("-" * 50) def main(): dataset_name = "las_dataset_image_lance" query_word = "帮我查询袋鼠图片" query_pic_with_word(dataset_name, query_word) def cosine_similarity(a, b): """计算余弦相似度""" return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) def generate_image_embedding(image_url): """使用算子生成图片embedding(直接传URL,无需下载转base64)""" df = daft.from_pydict({"image_path": [image_url]}) df = df.with_column( "embedding", las_udf( DoubaoEmbeddingVision, construct_args={ "image_format": "jpeg", "source_type": "url", }, )(col("image_path")), ) result = df.collect().to_pydict() return result["embedding"][0] def query_pic_with_pic(dataset_name, query_pic_url): """ 以图片向量的方式查询相似图片并显示 参数: dataset_name: LAS数据集名称 query_pic_url: 查询的图片URL """ # 下载查询图片(仅用于显示) from urllib.parse import urlparse import tos parsed = urlparse(query_pic_url) bucket = parsed.netloc key = parsed.path.lstrip('/') client = tos.TosClientV2(os.environ["LAS_TOS_ACCESS_KEY"], os.environ["LAS_TOS_SECRET_KEY"], "tos-cn-beijing.volces.com", "cn-beijing") resp = client.get_object(bucket=bucket, key=key) query_image_data = resp.read() query_image = Image.open(io.BytesIO(query_image_data)) print("正在生成查询图片的embedding...") query_embedding = generate_image_embedding(query_pic_url) if query_embedding is None: print("无法生成查询图片的embedding") return print(f"成功生成embedding,维度: {len(query_embedding)}") # 读取LAS数据集 df = daft.read_las_dataset(name=dataset_name) # 获取所有数据 all_data = df.collect() all_rows = all_data.to_pylist() # 显示查询图片 print("查询图片:") plt.figure(figsize=(8, 6)) plt.imshow(query_image) plt.title("查询图片") plt.axis('off') plt.show() print("-" * 50) # 计算所有图片与查询图片的相似度 similarities = [] for i, row in enumerate(all_rows): if row['image_embedding'] is not None: similarity = cosine_similarity(query_embedding, row['image_embedding']) similarities.append((similarity, i, row)) # 按相似度排序,取前3个且相似度大于40% similarities.sort(key=lambda x: x[0], reverse=True) top_3 = [s for s in similarities[:3] if s[0] > 0.40] if not top_3: print("未找到相似度大于40%的图片") return # 显示结果 for similarity, idx, row in top_3: print(f"URL: {row['image']}") print(f"相似度: {similarity:.4f}") if 'raw_image' in row and row['raw_image'] is not None: image_data = row['raw_image'] raw_image = Image.open(io.BytesIO(image_data)) plt.figure(figsize=(8, 6)) plt.imshow(raw_image) plt.axis('off') plt.show() if 'llm_result03' in row and row['llm_result03']: print(f"描述: {row['llm_result03']}") print("-" * 50) if __name__ == "__main__": dataset_name = "las_dataset_image_lance" query_pic_url = "tos://las-doctest/images/animals/pic_search_1000_images/0009fc27d9.jpg" query_pic_with_pic(dataset_name, query_pic_url)
本图搜图的示例代码,为给定一个输入图片,向Lance 数据集中搜索图片,计算图片的余弦相似度,返回相似度TOP 3的图片结果。
其中main函数中: