将 BigLake metastore 与 Iceberg REST 目录搭配使用

BigLake metastore 中的托管式 Apache Iceberg REST 目录可为所有 Iceberg 数据提供单一可信来源,从而在所有查询引擎之间实现互操作性。它可让 Apache Spark 等查询引擎以一致的方式发现、读取元数据和管理 Iceberg 表。

与 Iceberg REST 目录搭配使用的 Iceberg 表称为“适用于 Apache Iceberg 的 BigLake 表”(预览版)。这些表是您通过开源引擎创建并存储在 Cloud Storage 中的 Iceberg 表。它们可以由开源引擎或 BigQuery 读取。仅支持从开源引擎写入。在本文档中,我们将这些表称为 BigLake Iceberg 表。

准备工作

  1. Verify that billing is enabled for your Google Cloud project.

    了解如何检查项目是否已启用结算功能
  2. Enable the BigLake API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  3. 可选:请管理员首次设置凭据自动售卖
  4. 可选:了解 BigQuery metastore 的工作原理以及为什么您应该使用它。

所需的角色

如需获得在 BigLake metastore 中使用 Iceberg REST 目录所需的权限,请让管理员向您授予以下 IAM 角色:

  • 执行管理任务,例如管理目录用户访问权限、存储空间访问权限和目录的凭据模式:
  • 在凭据自动售卖模式下读取表数据: 针对项目的 BigLake 查看者 (roles/biglake.viewer)
  • 以凭据贩售模式写入表数据:项目的 BigLake 编辑器 (roles/biglake.editor)
  • 在非凭证分发模式下读取目录资源和表数据:
  • 在非凭证分发模式下管理目录资源和写入表数据:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

设置凭证分发模式

凭据提供模式是一种存储访问权限委托机制,可让 BigLake Metastore 管理员直接控制 BigLake Metastore 资源的权限,从而无需目录用户直接访问 Cloud Storage 存储分区。它允许 BigLake 管理员向用户授予对特定数据文件的权限。

目录管理员在 Iceberg REST 目录客户端上启用凭据自动售卖。

作为目录用户,您随后可以指定访问权限委托(这是 Iceberg REST 目录 API 规范的一部分),指示 Iceberg REST 目录返回具有缩小权限范围的存储凭据。如需了解详情,请参阅使用 Iceberg REST 目录配置查询引擎

如需初始化目录并启用凭据自动售卖模式,请按以下步骤操作。

  1. 使用以下命令初始化目录:

    curl -H "x-goog-user-project: PROJECT_ID" -H "Accept: application/json" -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" https://biglake.googleapis.com/iceberg/v1/restcatalog/v1/config?warehouse=gs://CLOUD_STORAGE_BUCKET_NAME

    替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目的 ID。
    • CLOUD_STORAGE_BUCKET_NAME:存储 Iceberg 表的 Cloud Storage 存储桶的名称。

    curl 命令的输出类似于以下内容。您可以在响应的 overrides.prefix 字段中找到目录前缀值:

    {
      "overrides": {
        "catalog_credential_mode": "CREDENTIAL_MODE_END_USER",
        "prefix": "projects/PROJECT_ID/catalogs/CLOUD_STORAGE_BUCKET_NAME"
      },
      "endpoints": [
        "GET /v1/{prefix}/namespaces",
        "POST /v1/{prefix}/namespaces",
        "GET /v1/{prefix}/namespaces/{namespace}",
        "HEAD /v1/{prefix}/namespaces/{namespace}",
        "DELETE /v1/{prefix}/namespaces/{namespace}",
        "POST /v1/{prefix}/namespaces/{namespace}/properties",
        "GET /v1/{prefix}/namespaces/{namespace}/tables",
        "POST /v1/{prefix}/namespaces/{namespace}/tables",
        "GET /v1/{prefix}/namespaces/{namespace}/tables/{table}",
        "HEAD /v1/{prefix}/namespaces/{namespace}/tables/{table}",
        "POST /v1/{prefix}/namespaces/{namespace}/tables/{table}",
        "DELETE /v1/{prefix}/namespaces/{namespace}/tables/{table}"
      ]
    }
    
  2. 启用凭据自动售卖模式并提取要授予权限的服务账号,方法是运行以下命令:

    curl -X PATCH -H "Content-Type: application/json" -H "x-goog-user-project: PROJECT_ID" -H "Accept: application/json" -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" https://biglake.googleapis.com/iceberg/v1/restcatalog/extensions/PREFIX?update_mask=credential_mode -d '{"credential_mode":"CREDENTIAL_MODE_VENDED_CREDENTIALS"}'

    PREFIX 替换为上一条命令输出中的 prefix 字段。

    curl 命令的输出包含服务账号,类似于以下内容:

    {
      "name": "projects/PROJECT_ID/catalogs/CLOUD_STORAGE_BUCKET_NAME",
      "credential_mode": "CREDENTIAL_MODE_VENDED_CREDENTIALS",
      "biglake-service-account": "BIGLAKE_SERVICE_ACCOUNT"
    }
    
  3. 为确保您在上一步中提取的 BigLake 服务账号拥有使用凭据贩售模式所需的权限,请让您的管理员向该服务账号授予存储桶的 Storage Object User (roles/storage.objectUser) 角色。

限制

Iceberg REST 目录存在以下限制:

  • 不支持多区域存储桶双区域存储桶和自定义区域放置的存储桶。
  • 使用凭证分发模式时,您必须将 io-impl 属性设置为 org.apache.iceberg.gcp.gcs.GCSFileIO。默认值 org.apache.iceberg.hadoop.HadoopFileIO 不受支持。

配置 Iceberg REST 目录

集群

如需在 Dataproc 上将 Spark 与 Iceberg REST 目录搭配使用,请先创建一个包含 Iceberg 组件的集群:

gcloud dataproc clusters create CLUSTER_NAME \
    --enable-component-gateway \
    --project=PROJECT_ID \
    --region=REGION \
    --optional-components=ICEBERG \
    --image-version=DATAPROC_VERSION

替换以下内容:

  • CLUSTER_NAME:集群的名称。
  • PROJECT_ID:您的 Google Cloud 项目 ID。
  • REGION:Dataproc 集群的区域。
  • DATAPROC_VERSION:Dataproc 映像版本,例如 2.2

创建集群后,请配置 Spark 会话以使用 Iceberg REST 目录:

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

替换以下内容:

  • CATALOG_NAME:Iceberg REST 目录的名称。
  • APP_NAME:Spark 会话的名称。
  • CLOUD_STORAGE_BUCKET_NAME:存储 BigLake Iceberg 表的 Cloud Storage 存储桶的名称。
  • PROJECT_ID:使用 Iceberg REST 目录所产生的费用将计入该项目,该项目可能与拥有 Cloud Storage 存储桶的项目不同。如需详细了解使用 REST API 时的项目配置,请参阅系统参数

此示例未使用凭据自动售卖。如需使用凭据自动售卖,您必须将 X-Iceberg-Access-Delegation 标头添加到值为 vended-credentials 的 Iceberg REST 目录请求中,方法是将以下行添加到 SparkSession 构建器:

.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')

包含凭证分发的示例

以下示例使用凭据自动售卖功能配置查询引擎:

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

如需了解详情,请参阅 Iceberg 文档的 RESTCatalog 部分中的“标头”。

Dataproc 集群在以下版本中支持 Iceberg 的 Google 授权流程:

  • Dataproc on Compute Engine 2.2 映像版本 2.2.65 及更高版本。
  • Dataproc on Compute Engine 2.3 映像版本 2.3.11 及更高版本。

无服务器

向 Google Cloud Serverless for Apache Spark 提交 PySpark 批处理工作负载,并采用以下配置:

gcloud dataproc batches submit pyspark PYSPARK_FILE \
    --project=PROJECT_ID \
    --region=REGION \
    --version=RUNTIME_VERSION \
    --properties="\
    spark.sql.defaultCatalog=CATALOG_NAME,\
    spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
    spark.sql.catalog.CATALOG_NAME.type=rest,\
    spark.sql.catalog.CATALOG_NAME.uri=https://biglake.googleapis.com/iceberg/v1/restcatalog,\
    spark.sql.catalog.CATALOG_NAME.warehouse=gs://CLOUD_STORAGE_BUCKET_NAME,\
    spark.sql.catalog.CATALOG_NAME.header.x-goog-user-project=PROJECT_ID,\
    spark.sql.catalog.CATALOG_NAME.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager,\
    spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,\
    spark.sql.catalog.CATALOG_NAME.rest-metrics-reporting-enabled=false"

替换以下内容:

  • PYSPARK_FILE:PySpark 应用文件的 gs:// Cloud Storage 路径。
  • PROJECT_ID:您的 Google Cloud 项目 ID。
  • REGION:Dataproc 批次工作负载的区域。
  • RUNTIME_VERSION:Serverless for Apache Spark 运行时版本,例如 2.2
  • CATALOG_NAME:Iceberg REST 目录的名称。
  • CLOUD_STORAGE_BUCKET_NAME:存储 BigLake Iceberg 表的 Cloud Storage 存储桶的名称。

如需使用凭据自动售卖,您必须向 Iceberg REST 目录请求添加 X-Iceberg-Access-Delegation 标头,并将值设置为 vended-credentials,方法是将以下行添加到 Serverless for Apache Spark 配置中:

.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')

包含凭证分发的示例

以下示例使用凭据自动售卖功能配置查询引擎:

gcloud dataproc batches submit pyspark PYSPARK_FILE \
    --project=PROJECT_ID \
    --region=REGION \
    --version=RUNTIME_VERSION \
    --properties="\
    spark.sql.defaultCatalog=CATALOG_NAME,\
    spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
    spark.sql.catalog.CATALOG_NAME.type=rest,\
    spark.sql.catalog.CATALOG_NAME.uri=https://biglake.googleapis.com/iceberg/v1/restcatalog,\
    spark.sql.catalog.CATALOG_NAME.warehouse=gs://CLOUD_STORAGE_BUCKET_NAME,\
    spark.sql.catalog.CATALOG_NAME.header.x-goog-user-project=PROJECT_ID,\
    spark.sql.catalog.CATALOG_NAME.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager,\
    spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,\
    spark.sql.catalog.CATALOG_NAME.rest-metrics-reporting-enabled=false,
    spark.sql.catalog.CATALOG_NAME.header.X-Iceberg-Access-Delegation=vended-credentials"

如需了解详情,请参阅 Iceberg 文档的 RESTCatalog 部分中的“标头”。

Serverless for Apache Spark 在以下运行时版本中支持 Iceberg 的 Google 授权流程:

  • Serverless for Apache Spark 2.2 运行时 2.2.60 及更高版本
  • Serverless for Apache Spark 2.3 运行时 2.3.10 及更高版本

Trino

如需将 Trino 与 Iceberg REST 目录搭配使用,请创建包含 Trino 组件的 Dataproc 集群,并使用 gcloud dataproc clusters create --properties 标志配置目录属性。以下示例创建了一个名为 CATALOG_NAME 的 Trino 目录:

gcloud dataproc clusters create CLUSTER_NAME \
    --enable-component-gateway \
    --region=REGION \
    --image-version=DATAPROC_VERSION \
    --network=NETWORK_ID \
    --optional-components=TRINO \
    --properties="\
trino-catalog:CATALOG_NAME.connector.name=iceberg,\
trino-catalog:CATALOG_NAME.iceberg.catalog.type=rest,\
trino-catalog:CATALOG_NAME.iceberg.rest-catalog.uri=https://biglake.googleapis.com/iceberg/v1/restcatalog,\
trino-catalog:CATALOG_NAME.iceberg.rest-catalog.warehouse=gs://CLOUD_STORAGE_BUCKET_NAME,\
trino-catalog:CATALOG_NAME.iceberg.rest-catalog.biglake.project-id=PROJECT_ID,\
trino-catalog:CATALOG_NAME.iceberg.rest-catalog.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager"

替换以下内容:

  • CLUSTER_NAME:集群的名称。
  • REGION:Dataproc 集群区域。
  • DATAPROC_VERSION:Dataproc 映像版本,例如 2.2
  • NETWORK_ID:集群网络 ID。如需了解详情,请参阅 Dataproc 集群网络配置
  • CATALOG_NAME:使用 Iceberg REST 目录为 Trino 目录指定的名称。
  • CLOUD_STORAGE_BUCKET_NAME:存储 BigLake Iceberg 表的 Cloud Storage 存储桶的名称。
  • PROJECT_ID:要用于 BigLake metastore 的 Google Cloud 项目 ID。

创建集群后,使用 SSH 连接到主虚拟机实例,然后按如下方式使用 Trino CLI:

trino

Dataproc Trino 支持以下版本中的 Iceberg 的 Google 授权流程:

  • Dataproc on Compute Engine 2.2 运行时 2.2.65 版及更高版本
  • Dataproc on Compute Engine 2.3 运行时版本 2.3.11 及更高版本
  • 不支持在 Compute Engine 3.0 上运行 Dataproc。

Iceberg 1.10 或更高版本

开源 Iceberg 1.10 及更高版本内置了对 GoogleAuthManager 中 Google 授权流的支持。以下示例展示了如何配置 Apache Spark 以使用 BigLake metastore Iceberg REST 目录。

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

替换以下内容:

  • CATALOG_NAME:Iceberg REST 目录的名称。
  • APP_NAME:Spark 会话的名称。
  • CLOUD_STORAGE_BUCKET_NAME:存储 BigLake Iceberg 表的 Cloud Storage 存储桶的名称。
  • PROJECT_ID:使用 Iceberg REST 目录所产生的费用将计入该项目,该项目可能与拥有 Cloud Storage 存储桶的项目不同。如需详细了解使用 REST API 时的项目配置,请参阅系统参数

上述示例未使用凭据自动售卖功能。如需使用凭据自动售卖,您必须将 X-Iceberg-Access-Delegation 标头添加到值为 vended-credentials 的 Iceberg REST 目录请求中,方法是将以下行添加到 SparkSession 构建器:

.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')

包含凭证分发的示例

以下示例使用凭据自动售卖功能配置查询引擎:

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

如需了解详情,请参阅 Iceberg 文档的 RESTCatalog 部分中的“标头”。

之前的 Iceberg 版本

对于 1.10 之前的开源 Iceberg 版本,您可以通过配置具有以下内容的会话来配置标准 OAuth 身份验证:

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1,org.apache.iceberg:iceberg-gcp-bundle:1.9.1') \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f"spark.sql.catalog.{catalog_name}.token", "TOKEN") \
  .config(f"spark.sql.catalog.{catalog_name}.oauth2-server-uri", "https://oauth2.googleapis.com/token") \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

替换以下内容:

  • CATALOG_NAME:Iceberg REST 目录的名称。
  • APP_NAME:Spark 会话的名称。
  • CLOUD_STORAGE_BUCKET_NAME:存储 BigLake Iceberg 表的 Cloud Storage 存储桶的名称。
  • PROJECT_ID:使用 Iceberg REST 目录所产生的费用将计入该项目,该项目可能与拥有 Cloud Storage 存储桶的项目不同。如需详细了解使用 REST API 时的项目配置,请参阅系统参数
  • TOKEN:您的身份验证令牌,有效期为一小时,例如使用 gcloud auth application-default print-access-token 生成的令牌。

上述示例未使用凭据自动售卖功能。如需使用凭据自动售卖,您必须将 X-Iceberg-Access-Delegation 标头添加到值为 vended-credentials 的 Iceberg REST 目录请求中,方法是将以下行添加到 SparkSession 构建器:

.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')

包含凭证分发的示例

以下示例使用凭据自动售卖功能配置查询引擎:

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1,org.apache.iceberg:iceberg-gcp-bundle:1.9.1') \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f"spark.sql.catalog.{catalog_name}.token", "TOKEN") \
  .config(f"spark.sql.catalog.{catalog_name}.oauth2-server-uri", "https://oauth2.googleapis.com/token") \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

如需了解详情,请参阅 Iceberg 文档的 RESTCatalog 部分中的“标头”。

创建命名空间

Spark

spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")

spark.sql("USE NAMESPACE_NAME;")

NAMESPACE_NAME 替换为您的命名空间的名称。

Trino

CREATE SCHEMA IF NOT EXISTS  CATALOG_NAME.SCHEMA_NAME;

USE CATALOG_NAME.SCHEMA_NAME;

替换以下内容:

  • CATALOG_NAME:使用 Iceberg REST 目录的 Trino 目录的名称。
  • SCHEMA_NAME:架构的名称。

创建表

Spark

spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")

spark.sql("DESCRIBE NAMESPACE_NAME.TABLE_NAME").show()

替换以下内容:

  • NAMESPACE_NAME:您的命名空间名称
  • TABLE_NAME:表的名称。

Trino

CREATE TABLE TABLE_NAME (id int, data varchar);

DESCRIBE TABLE_NAME;

TABLE_NAME 替换为您的表的名称。

列出表

Spark

spark.sql("SHOW TABLES").show()

Trino

SHOW TABLES;

将数据插入到表中

以下示例将示例数据插入到表中:

Spark

spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\"), (2, \"second row\"), (3, \"third row\");")

Trino

INSERT INTO TABLE_NAME VALUES (1, 'first row'), (2, 'second row'), (3, 'third row');

查询表

以下示例从表中选择所有数据:

Spark

spark.sql("SELECT * FROM TABLE_NAME;").show()

Trino

SELECT * FROM TABLE_NAME;

以下示例从 BigQuery 查询同一表:

SELECT * FROM `CLOUD_STORAGE_BUCKET_NAME>NAMESPACE_OR_SCHEMA_NAME.TABLE_NAME`;

替换以下内容:

  • CLOUD_STORAGE_BUCKET_NAME:用于 Iceberg REST 目录的 Cloud Storage 存储桶的名称。例如,如果您的 URI 为 gs://iceberg_bucket,请使用 iceberg_bucket
  • NAMESPACE_OR_SCHEMA_NAME:如果使用 Spark,则为表命名空间;如果使用 Trino,则为表架构名称。

  • TABLE_NAME:表格的名称。

更改表架构

以下示例向表中添加了一列:

Spark

spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS ( desc string);")
spark.sql("DESCRIBE NAMESPACE_NAME.TABLE_NAME").show()

替换以下内容:

  • NAMESPACE_NAME:您的命名空间名称
  • TABLE_NAME:表的名称。

Trino

ALTER TABLE TABLE_NAME ADD COLUMN desc varchar;
DESCRIBE SCHEMA_NAME.TABLE_NAME;

替换以下内容:

  • SCHEMA_NAME:架构的名称
  • TABLE_NAME:表的名称。

删除表

以下示例会从指定命名空间中删除表:

Spark

spark.sql("DROP TABLE TABLE_NAME;")

Trino

DROP TABLE TABLE_NAME;

价格

如需详细了解价格,请参阅 BigLake 价格

后续步骤