构建代理以丰富元数据

Knowledge Catalog(以前称为 Dataplex Universal Catalog)用于管理组织内数据资产的元数据。此元数据提供了智能体用来发现、理解和查询回答用户问题所需数据的上下文。

虽然 Knowledge Catalog 会自动管理资源、跟踪技术架构并生成说明和数据配置文件,但有价值的业务上下文通常位于其他位置,例如:

  • 内部文档和 Wiki
  • 代码库
  • Google Chat 和 Slack 等沟通渠道

您可以构建 AI 智能体,以从这些来源提取上下文,并大规模持续丰富元数据。本教程使用 dataplex-labs 代码库中的示例代码,向您展示如何构建执行以下操作的智能体:

  • 提取上下文: 从知识库、文档、代码或聊天内容中提取业务上下文,以丰富技术元数据。
  • 生成文档: 根据提取的上下文和其他信息来源,为 BigQuery 表生成文档。
  • 改进搜索和发现: 将生成的文档发布到 Knowledge Catalog,让用户可以通过搜索更轻松地找到和理解条目。

准备工作

如需运行 Knowledge Catalog 丰富智能体,您必须满足以下要求:

所需角色

如需获得使用丰富智能体所需的权限,请让您的管理员为您授予项目的以下 IAM 角色: Google Cloud iam.gserviceaccount.com

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

这些预定义角色包含 使用丰富智能体所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

使用丰富智能体需要以下权限:

  • bigquery.projects.get/createDatasets
  • dataplex.projects.search
  • dataplex.entryGroups.get/updateEntries
  • aiplatform.endpoints.predict
  • serviceusage.services.use

您也可以使用自定义角色或其他预定义角色来获取这些权限。

启用 API

如需使用 Knowledge Catalog 丰富智能体,请在项目中启用以下 API:

  • BigQuery API
  • Knowledge Catalog API
  • Vertex AI API
  • Service Usage API

启用 API 所需的角色

如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予 角色

启用 API

安装依赖项

您需要以下 Python 软件包和工具才能运行示例:

  • google-adk(智能体开发套件 (ADK))
  • google-cloud-dataplex Knowledge Catalog Python 客户端
  • google-auth 用于管理应用默认凭据
  • mcp[cli] 用于构建示例 MCP 服务器
  • gcloud 用于身份验证和配置。如需安装 Google Cloud CLI,请参阅 Google Cloud SDK 文档。

设置环境

  1. 配置 gcloud 并登录:

    gcloud auth application-default login
    gcloud config set core/project PROJECT_ID
    

    替换以下内容:

    • PROJECT_ID 替换为您的项目 ID
  2. 克隆 dataplex-labs 代码库并导航到示例源目录:

    git clone https://github.com/GoogleCloudPlatform/dataplex-labs.git
    cd dataplex-labs/knowledge_catalog_enrichment_agent/src
    
  3. 如需安装依赖项,请使用提供的脚本来设置 Python 虚拟环境和必要的环境变量:

    source env.sh --install
    
  4. 如需在云项目的 us 区域中创建一个名为 kc_sample_analytics 的示例 BigQuery 数据集,请运行 create_data.py 脚本:

    python3 ../sample/data/create_data.py
    

    该示例还包含 sample/docs 目录中的多个文档。这些文档构成了一个本地知识库。丰富智能体使用此知识库提取信息并生成文档。

下载元数据

首先,运行下载工具,从 Knowledge Catalog 中提取 BigQuery 数据集及其表的元数据快照。这会创建本地元数据工件。

--dir 实参用于指定写入元数据文件的目录。

python3 -m enrichment.download \
  --dir ../sample/metadata.initial \
  --dataset ${KC_ENRICH_SAMPLE_PROJECT}.kc_sample_analytics

该脚本使用以下命名惯例在 sample/metadata 目录中为每个表创建一个 Markdown 文件:<project_id>.<dataset_id>.<table_id>.md

丰富元数据

创建本地 Markdown 文件后,运行丰富智能体。智能体会遍历每个文件,查找与表相关的信息,并汇总发现结果以及引用,以生成更新后的 Markdown 文件。

  • --dir:指定包含本地元数据文件的目录。
  • --output-dir:指定更新后的元数据文件的目标目录。
  • --config-dir:指定包含智能体指令、MCP 工具和技能的目录。
python3 -m enrichment.enrich \
  --dir ../sample/metadata.initial \
  --output-dir ../sample/metadata.new \
  --config-dir ../sample/config

查看元数据

丰富后的元数据文件包含智能体生成的文档。在将更改发布到 Knowledge Catalog 之前,请根据需要查看和修改这些文件。

git diff --no-index ../sample/metadata.initial ../sample/metadata.new

发布元数据

运行发布工具,将丰富后的元数据部署到 Knowledge Catalog。

python3 -m src.enrichment.publish --dir ../sample/metadata.new

针对您的数据进行自定义

在上一步中,您使用了 --config-dir 参数将代理指向 ../sample/config 目录以进行配置。这样,智能体就知道在哪里查找信息以及如何与不同的来源互动。

该示例附带一个默认配置,该配置指示智能体使用本地 MCP 服务器访问本地知识库 (sample/docs) 中的文件。如需在您的环境中应用此工作流,您可以自定义这些配置文件,以将智能体连接到您的内部 Wiki、代码库、Google 云端硬盘或其他系统。

sample/config/ 目录包含以下文件:

sample/config/
├─ instructions.md
├─ mcp.json
└─ skills/
    └─ kb-search/
        └─ SKILL.md
  • instructions.md:使用与您的组织相关的详细信息(例如指示智能体搜索特定知识库)来扩充智能体的基准指令。
  • mcp.json:配置智能体可用于访问信息来源工具的 MCP 服务器,例如用于从本地目录读取文件的工具。
  • SKILL.md:描述智能体应如何使用特定工具与信息来源互动,例如使用 list_contentsread_filesearch_content 在本地文档中查找信息。

探索示例 Knowledge Catalog 代码

丰富流程部分中的 downloadpublish 工具使用 Knowledge Catalog API 来读取和写入元数据。

本部分介绍了这些 API 的工作原理,以便您可以根据自己的集成调整示例。

搜索和检索元数据

该示例使用以下 API 来搜索和检索元数据:

  • SearchEntries 用于检索数据集的条目和位置元数据。
  • ListEntries 用于枚举目录 EntryGroup 中的 BigQuery 表。
  • GetEntry 用于提取每个 BigQuery 表的特定元数据。

以下代码展示了如何搜索数据集以找到其条目组,列出其中包含的所有表,并检索其特定元数据:

import google.cloud.dataplex_v1 as dataplex

BIGQUERY_TABLE_TYPE = "projects/dataplex-types/locations/global/entryTypes/bigquery-table"
OVERVIEW_ASPECT_TYPE = "projects/dataplex-types/locations/global/aspectTypes/overview"

catalog = dataplex.CatalogServiceClient()

dataset_reference = '...'   # project_id.dataset_id
project_id, dataset_id = dataset_reference.split('.')

# 1. Search for dataset to determine its location
search_response = catalog.search_entries(
    request=dataplex.SearchEntriesRequest(
        name=f"projects/{project_id}/locations/global",
        query=f"type=dataset name={dataset_id}",
        page_size=1
    )
)
dataset_entry = search_response.results[0].dataplex_entry
location_id = dataset_entry.entry_source.location

# 2. List resources in the underlying group
entry_group_name = f"projects/{project_id}/locations/{location_id}/entryGroups/@bigquery"
entry_filter = f'parent_entry="{dataset_entry.name}"'
list_response = catalog.list_entries(
    request=dataplex.ListEntriesRequest(
        parent=entry_group_name,
        entry_filter=entry_filter,
    )
)

# 3. Retrieve metadata for each table in the list
for table_entry in list_response.entries:
    entry = catalog.get_entry(
        request=dataplex.GetEntryRequest(
            name=table_entry.name,
            view="CUSTOM",
            aspect_types=[OVERVIEW_ASPECT_TYPE]
        )
    )

更新表元数据

以下代码展示了如何将生成的文档发布到表的概览切面并更新其元数据:

import google.cloud.dataplex_v1 as dataplex
import google.protobuf.field_mask_pb2 as field_mask_pb2
import google.protobuf.json_format as jsonpb

OVERVIEW_ASPECT_TYPE = "projects/dataplex-types/locations/global/aspectTypes/overview"
OVERVIEW_ASPECT_KEY = "dataplex-types.global.overview"

catalog = dataplex.CatalogServiceClient()

table_reference = "..."    # project_id.dataset_id.table_id
project_id, dataset_id, table_id = table_reference.split('.')

entry_data = {
    "name": f"bigquery.googleapis.com/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
    "aspects": {
        OVERVIEW_ASPECT_KEY: {
            "aspectType": OVERVIEW_ASPECT_TYPE,
            "data": {
                "content": "...", # content parsed from local markdown file
                "contentType": "MARKDOWN"
            }
        }
    }
}

entry = dataplex.Entry()
jsonpb.ParseDict(entry_data, entry._pb)

catalog.update_entry(
    request=dataplex.UpdateEntryRequest(
        entry=entry,
        update_mask=field_mask_pb2.FieldMask(paths=["aspects"]),
        aspect_keys=[OVERVIEW_ASPECT_KEY],
    )
)

后续步骤