将 BigLake metastore 与 Iceberg REST 目录搭配使用
BigLake metastore 中的托管式 Apache Iceberg REST 目录可为所有 Iceberg 数据提供单一可信来源,从而在所有查询引擎之间实现互操作性。它可让 Apache Spark 等查询引擎以一致的方式发现、读取元数据和管理 Iceberg 表。
与 Iceberg REST 目录搭配使用的 Iceberg 表称为“适用于 Apache Iceberg 的 BigLake 表”(预览版)。这些表是您通过开源引擎创建并存储在 Cloud Storage 中的 Iceberg 表。它们可以由开源引擎或 BigQuery 读取。仅支持从开源引擎写入。在本文档中,我们将这些表称为 BigLake Iceberg 表。
准备工作
-
Verify that billing is enabled for your Google Cloud project.
了解如何检查项目是否已启用结算功能。 -
Enable the BigLake API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. - 可选:请管理员首次设置凭据自动售卖。
- 可选:了解 BigQuery metastore 的工作原理以及为什么您应该使用它。
所需的角色
如需获得在 BigLake metastore 中使用 Iceberg REST 目录所需的权限,请让管理员向您授予以下 IAM 角色:
-
执行管理任务,例如管理目录用户访问权限、存储空间访问权限和目录的凭据模式:
-
针对项目的 BigLake Admin (
roles/biglake.admin
) 角色 -
针对 Cloud Storage 存储桶的 Storage Admin (
roles/storage.admin
)
-
针对项目的 BigLake Admin (
-
在凭据自动售卖模式下读取表数据:
针对项目的 BigLake 查看者 (
roles/biglake.viewer
) -
以凭据贩售模式写入表数据:项目的 BigLake 编辑器 (
roles/biglake.editor
) -
在非凭证分发模式下读取目录资源和表数据:
-
项目的 BigLake Viewer (
roles/biglake.viewer
) -
针对 Cloud Storage 存储桶的 Storage Object Viewer (
roles/storage.objectViewer
)
-
项目的 BigLake Viewer (
-
在非凭证分发模式下管理目录资源和写入表数据:
-
项目的 BigLake Editor (
roles/biglake.editor
) 角色 -
针对 Cloud Storage 存储桶的 Storage Object User (
roles/storage.objectUser
)
-
项目的 BigLake Editor (
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
设置凭证分发模式
凭据提供模式是一种存储访问权限委托机制,可让 BigLake Metastore 管理员直接控制 BigLake Metastore 资源的权限,从而无需目录用户直接访问 Cloud Storage 存储分区。它允许 BigLake 管理员向用户授予对特定数据文件的权限。
目录管理员在 Iceberg REST 目录客户端上启用凭据自动售卖。
作为目录用户,您随后可以指定访问权限委托(这是 Iceberg REST 目录 API 规范的一部分),指示 Iceberg REST 目录返回具有缩小权限范围的存储凭据。如需了解详情,请参阅使用 Iceberg REST 目录配置查询引擎。
如需初始化目录并启用凭据自动售卖模式,请按以下步骤操作。
使用以下命令初始化目录:
curl -H "x-goog-user-project: PROJECT_ID" -H "Accept: application/json" -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" https://biglake.googleapis.com/iceberg/v1/restcatalog/v1/config?warehouse=gs://CLOUD_STORAGE_BUCKET_NAME
替换以下内容:
PROJECT_ID
:您的 Google Cloud 项目的 ID。CLOUD_STORAGE_BUCKET_NAME
:存储 Iceberg 表的 Cloud Storage 存储桶的名称。
curl
命令的输出类似于以下内容。您可以在响应的overrides.prefix
字段中找到目录前缀值:{ "overrides": { "catalog_credential_mode": "CREDENTIAL_MODE_END_USER", "prefix": "projects/PROJECT_ID/catalogs/CLOUD_STORAGE_BUCKET_NAME" }, "endpoints": [ "GET /v1/{prefix}/namespaces", "POST /v1/{prefix}/namespaces", "GET /v1/{prefix}/namespaces/{namespace}", "HEAD /v1/{prefix}/namespaces/{namespace}", "DELETE /v1/{prefix}/namespaces/{namespace}", "POST /v1/{prefix}/namespaces/{namespace}/properties", "GET /v1/{prefix}/namespaces/{namespace}/tables", "POST /v1/{prefix}/namespaces/{namespace}/tables", "GET /v1/{prefix}/namespaces/{namespace}/tables/{table}", "HEAD /v1/{prefix}/namespaces/{namespace}/tables/{table}", "POST /v1/{prefix}/namespaces/{namespace}/tables/{table}", "DELETE /v1/{prefix}/namespaces/{namespace}/tables/{table}" ] }
启用凭据自动售卖模式并提取要授予权限的服务账号,方法是运行以下命令:
curl -X PATCH -H "Content-Type: application/json" -H "x-goog-user-project: PROJECT_ID" -H "Accept: application/json" -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" https://biglake.googleapis.com/iceberg/v1/restcatalog/extensions/PREFIX?update_mask=credential_mode -d '{"credential_mode":"CREDENTIAL_MODE_VENDED_CREDENTIALS"}'
将
PREFIX
替换为上一条命令输出中的prefix
字段。curl
命令的输出包含服务账号,类似于以下内容:{ "name": "projects/PROJECT_ID/catalogs/CLOUD_STORAGE_BUCKET_NAME", "credential_mode": "CREDENTIAL_MODE_VENDED_CREDENTIALS", "biglake-service-account": "BIGLAKE_SERVICE_ACCOUNT" }
为确保您在上一步中提取的 BigLake 服务账号拥有使用凭据贩售模式所需的权限,请让您的管理员向该服务账号授予存储桶的 Storage Object User (
roles/storage.objectUser
) 角色。
限制
Iceberg REST 目录存在以下限制:
- 不支持多区域存储桶、双区域存储桶和自定义区域放置的存储桶。
- 使用凭证分发模式时,您必须将
io-impl
属性设置为org.apache.iceberg.gcp.gcs.GCSFileIO
。默认值org.apache.iceberg.hadoop.HadoopFileIO
不受支持。
配置 Iceberg REST 目录
集群
如需在 Dataproc 上将 Spark 与 Iceberg REST 目录搭配使用,请先创建一个包含 Iceberg 组件的集群:
gcloud dataproc clusters create CLUSTER_NAME \ --enable-component-gateway \ --project=PROJECT_ID \ --region=REGION \ --optional-components=ICEBERG \ --image-version=DATAPROC_VERSION
替换以下内容:
CLUSTER_NAME
:集群的名称。PROJECT_ID
:您的 Google Cloud 项目 ID。REGION
:Dataproc 集群的区域。DATAPROC_VERSION
:Dataproc 映像版本,例如2.2
。
创建集群后,请配置 Spark 会话以使用 Iceberg REST 目录:
import pyspark from pyspark.context import SparkContext from pyspark.sql import SparkSession catalog_name = "CATALOG_NAME" spark = SparkSession.builder.appName("APP_NAME") \ .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \ .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \ .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \ .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \ .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \ .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \ .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \ .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \ .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \ .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \ .getOrCreate()
替换以下内容:
CATALOG_NAME
:Iceberg REST 目录的名称。APP_NAME
:Spark 会话的名称。CLOUD_STORAGE_BUCKET_NAME
:存储 BigLake Iceberg 表的 Cloud Storage 存储桶的名称。PROJECT_ID
:使用 Iceberg REST 目录所产生的费用将计入该项目,该项目可能与拥有 Cloud Storage 存储桶的项目不同。如需详细了解使用 REST API 时的项目配置,请参阅系统参数。
此示例未使用凭据自动售卖。如需使用凭据自动售卖,您必须将 X-Iceberg-Access-Delegation
标头添加到值为 vended-credentials
的 Iceberg REST 目录请求中,方法是将以下行添加到 SparkSession
构建器:
.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')
包含凭证分发的示例
以下示例使用凭据自动售卖功能配置查询引擎:
import pyspark from pyspark.context import SparkContext from pyspark.sql import SparkSession catalog_name = "CATALOG_NAME" spark = SparkSession.builder.appName("APP_NAME") \ .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \ .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \ .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \ .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \ .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \ .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \ .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \ .config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials') \ .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \ .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \ .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \ .getOrCreate()
如需了解详情,请参阅 Iceberg 文档的 RESTCatalog
部分中的“标头”。
Dataproc 集群在以下版本中支持 Iceberg 的 Google 授权流程:
- Dataproc on Compute Engine 2.2 映像版本 2.2.65 及更高版本。
- Dataproc on Compute Engine 2.3 映像版本 2.3.11 及更高版本。
无服务器
向 Google Cloud Serverless for Apache Spark 提交 PySpark 批处理工作负载,并采用以下配置:
gcloud dataproc batches submit pyspark PYSPARK_FILE \ --project=PROJECT_ID \ --region=REGION \ --version=RUNTIME_VERSION \ --properties="\ spark.sql.defaultCatalog=CATALOG_NAME,\ spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\ spark.sql.catalog.CATALOG_NAME.type=rest,\ spark.sql.catalog.CATALOG_NAME.uri=https://biglake.googleapis.com/iceberg/v1/restcatalog,\ spark.sql.catalog.CATALOG_NAME.warehouse=gs://CLOUD_STORAGE_BUCKET_NAME,\ spark.sql.catalog.CATALOG_NAME.header.x-goog-user-project=PROJECT_ID,\ spark.sql.catalog.CATALOG_NAME.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager,\ spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,\ spark.sql.catalog.CATALOG_NAME.rest-metrics-reporting-enabled=false"
替换以下内容:
PYSPARK_FILE
:PySpark 应用文件的gs://
Cloud Storage 路径。PROJECT_ID
:您的 Google Cloud 项目 ID。REGION
:Dataproc 批次工作负载的区域。RUNTIME_VERSION
:Serverless for Apache Spark 运行时版本,例如2.2
。CATALOG_NAME
:Iceberg REST 目录的名称。CLOUD_STORAGE_BUCKET_NAME
:存储 BigLake Iceberg 表的 Cloud Storage 存储桶的名称。
如需使用凭据自动售卖,您必须向 Iceberg REST 目录请求添加 X-Iceberg-Access-Delegation
标头,并将值设置为 vended-credentials
,方法是将以下行添加到 Serverless for Apache Spark 配置中:
.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')
包含凭证分发的示例
以下示例使用凭据自动售卖功能配置查询引擎:
gcloud dataproc batches submit pyspark PYSPARK_FILE \ --project=PROJECT_ID \ --region=REGION \ --version=RUNTIME_VERSION \ --properties="\ spark.sql.defaultCatalog=CATALOG_NAME,\ spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\ spark.sql.catalog.CATALOG_NAME.type=rest,\ spark.sql.catalog.CATALOG_NAME.uri=https://biglake.googleapis.com/iceberg/v1/restcatalog,\ spark.sql.catalog.CATALOG_NAME.warehouse=gs://CLOUD_STORAGE_BUCKET_NAME,\ spark.sql.catalog.CATALOG_NAME.header.x-goog-user-project=PROJECT_ID,\ spark.sql.catalog.CATALOG_NAME.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager,\ spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,\ spark.sql.catalog.CATALOG_NAME.rest-metrics-reporting-enabled=false, spark.sql.catalog.CATALOG_NAME.header.X-Iceberg-Access-Delegation=vended-credentials"
如需了解详情,请参阅 Iceberg 文档的 RESTCatalog
部分中的“标头”。
Serverless for Apache Spark 在以下运行时版本中支持 Iceberg 的 Google 授权流程:
- Serverless for Apache Spark 2.2 运行时 2.2.60 及更高版本
- Serverless for Apache Spark 2.3 运行时 2.3.10 及更高版本
Trino
如需将 Trino 与 Iceberg REST 目录搭配使用,请创建包含 Trino 组件的 Dataproc 集群,并使用 gcloud dataproc clusters create --properties
标志配置目录属性。以下示例创建了一个名为 CATALOG_NAME
的 Trino 目录:
gcloud dataproc clusters create CLUSTER_NAME \ --enable-component-gateway \ --region=REGION \ --image-version=DATAPROC_VERSION \ --network=NETWORK_ID \ --optional-components=TRINO \ --properties="\ trino-catalog:CATALOG_NAME.connector.name=iceberg,\ trino-catalog:CATALOG_NAME.iceberg.catalog.type=rest,\ trino-catalog:CATALOG_NAME.iceberg.rest-catalog.uri=https://biglake.googleapis.com/iceberg/v1/restcatalog,\ trino-catalog:CATALOG_NAME.iceberg.rest-catalog.warehouse=gs://CLOUD_STORAGE_BUCKET_NAME,\ trino-catalog:CATALOG_NAME.iceberg.rest-catalog.biglake.project-id=PROJECT_ID,\ trino-catalog:CATALOG_NAME.iceberg.rest-catalog.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager"
替换以下内容:
CLUSTER_NAME
:集群的名称。REGION
:Dataproc 集群区域。DATAPROC_VERSION
:Dataproc 映像版本,例如2.2
。NETWORK_ID
:集群网络 ID。如需了解详情,请参阅 Dataproc 集群网络配置。CATALOG_NAME
:使用 Iceberg REST 目录为 Trino 目录指定的名称。CLOUD_STORAGE_BUCKET_NAME
:存储 BigLake Iceberg 表的 Cloud Storage 存储桶的名称。PROJECT_ID
:要用于 BigLake metastore 的 Google Cloud 项目 ID。
创建集群后,使用 SSH 连接到主虚拟机实例,然后按如下方式使用 Trino CLI:
trino
Dataproc Trino 支持以下版本中的 Iceberg 的 Google 授权流程:
- Dataproc on Compute Engine 2.2 运行时 2.2.65 版及更高版本
- Dataproc on Compute Engine 2.3 运行时版本 2.3.11 及更高版本
- 不支持在 Compute Engine 3.0 上运行 Dataproc。
Iceberg 1.10 或更高版本
开源 Iceberg 1.10 及更高版本内置了对 GoogleAuthManager
中 Google 授权流的支持。以下示例展示了如何配置 Apache Spark 以使用 BigLake metastore Iceberg REST 目录。
import pyspark from pyspark.context import SparkContext from pyspark.sql import SparkSession catalog_name = "CATALOG_NAME" spark = SparkSession.builder.appName("APP_NAME") \ .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \ .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \ .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \ .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \ .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \ .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \ .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \ .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \ .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \ .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \ .getOrCreate()
替换以下内容:
CATALOG_NAME
:Iceberg REST 目录的名称。APP_NAME
:Spark 会话的名称。CLOUD_STORAGE_BUCKET_NAME
:存储 BigLake Iceberg 表的 Cloud Storage 存储桶的名称。PROJECT_ID
:使用 Iceberg REST 目录所产生的费用将计入该项目,该项目可能与拥有 Cloud Storage 存储桶的项目不同。如需详细了解使用 REST API 时的项目配置,请参阅系统参数。
上述示例未使用凭据自动售卖功能。如需使用凭据自动售卖,您必须将 X-Iceberg-Access-Delegation
标头添加到值为 vended-credentials
的 Iceberg REST 目录请求中,方法是将以下行添加到 SparkSession
构建器:
.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')
包含凭证分发的示例
以下示例使用凭据自动售卖功能配置查询引擎:
import pyspark from pyspark.context import SparkContext from pyspark.sql import SparkSession catalog_name = "CATALOG_NAME" spark = SparkSession.builder.appName("APP_NAME") \ .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \ .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \ .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \ .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \ .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \ .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \ .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \ .config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials') \ .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \ .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \ .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \ .getOrCreate()
如需了解详情,请参阅 Iceberg 文档的 RESTCatalog
部分中的“标头”。
之前的 Iceberg 版本
对于 1.10 之前的开源 Iceberg 版本,您可以通过配置具有以下内容的会话来配置标准 OAuth 身份验证:
import pyspark from pyspark.context import SparkContext from pyspark.sql import SparkSession catalog_name = "CATALOG_NAME" spark = SparkSession.builder.appName("APP_NAME") \ .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1,org.apache.iceberg:iceberg-gcp-bundle:1.9.1') \ .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \ .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \ .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \ .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \ .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \ .config(f"spark.sql.catalog.{catalog_name}.token", "TOKEN") \ .config(f"spark.sql.catalog.{catalog_name}.oauth2-server-uri", "https://oauth2.googleapis.com/token") \ .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \ .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \ .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \ .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \ .getOrCreate()
替换以下内容:
CATALOG_NAME
:Iceberg REST 目录的名称。APP_NAME
:Spark 会话的名称。CLOUD_STORAGE_BUCKET_NAME
:存储 BigLake Iceberg 表的 Cloud Storage 存储桶的名称。PROJECT_ID
:使用 Iceberg REST 目录所产生的费用将计入该项目,该项目可能与拥有 Cloud Storage 存储桶的项目不同。如需详细了解使用 REST API 时的项目配置,请参阅系统参数。TOKEN
:您的身份验证令牌,有效期为一小时,例如使用gcloud auth application-default print-access-token
生成的令牌。
上述示例未使用凭据自动售卖功能。如需使用凭据自动售卖,您必须将 X-Iceberg-Access-Delegation
标头添加到值为 vended-credentials
的 Iceberg REST 目录请求中,方法是将以下行添加到 SparkSession
构建器:
.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')
包含凭证分发的示例
以下示例使用凭据自动售卖功能配置查询引擎:
import pyspark from pyspark.context import SparkContext from pyspark.sql import SparkSession catalog_name = "CATALOG_NAME" spark = SparkSession.builder.appName("APP_NAME") \ .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1,org.apache.iceberg:iceberg-gcp-bundle:1.9.1') \ .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \ .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \ .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1/restcatalog') \ .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \ .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \ .config(f"spark.sql.catalog.{catalog_name}.token", "TOKEN") \ .config(f"spark.sql.catalog.{catalog_name}.oauth2-server-uri", "https://oauth2.googleapis.com/token") \ .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \ .config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials') \ .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \ .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \ .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \ .getOrCreate()
如需了解详情,请参阅 Iceberg 文档的 RESTCatalog
部分中的“标头”。
创建命名空间
Spark
spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;")
将 NAMESPACE_NAME
替换为您的命名空间的名称。
Trino
CREATE SCHEMA IF NOT EXISTS CATALOG_NAME.SCHEMA_NAME; USE CATALOG_NAME.SCHEMA_NAME;
替换以下内容:
CATALOG_NAME
:使用 Iceberg REST 目录的 Trino 目录的名称。SCHEMA_NAME
:架构的名称。
创建表
Spark
spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE NAMESPACE_NAME.TABLE_NAME").show()
替换以下内容:
NAMESPACE_NAME
:您的命名空间名称TABLE_NAME
:表的名称。
Trino
CREATE TABLE TABLE_NAME (id int, data varchar); DESCRIBE TABLE_NAME;
将 TABLE_NAME
替换为您的表的名称。
列出表
Spark
spark.sql("SHOW TABLES").show()
Trino
SHOW TABLES;
将数据插入到表中
以下示例将示例数据插入到表中:
Spark
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\"), (2, \"second row\"), (3, \"third row\");")
Trino
INSERT INTO TABLE_NAME VALUES (1, 'first row'), (2, 'second row'), (3, 'third row');
查询表
以下示例从表中选择所有数据:
Spark
spark.sql("SELECT * FROM TABLE_NAME;").show()
Trino
SELECT * FROM TABLE_NAME;
以下示例从 BigQuery 查询同一表:
SELECT * FROM `CLOUD_STORAGE_BUCKET_NAME>NAMESPACE_OR_SCHEMA_NAME.TABLE_NAME`;
替换以下内容:
CLOUD_STORAGE_BUCKET_NAME
:用于 Iceberg REST 目录的 Cloud Storage 存储桶的名称。例如,如果您的 URI 为gs://iceberg_bucket
,请使用iceberg_bucket
。NAMESPACE_OR_SCHEMA_NAME
:如果使用 Spark,则为表命名空间;如果使用 Trino,则为表架构名称。TABLE_NAME
:表格的名称。
更改表架构
以下示例向表中添加了一列:
Spark
spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS ( desc string);") spark.sql("DESCRIBE NAMESPACE_NAME.TABLE_NAME").show()
替换以下内容:
NAMESPACE_NAME
:您的命名空间名称TABLE_NAME
:表的名称。
Trino
ALTER TABLE TABLE_NAME ADD COLUMN desc varchar; DESCRIBE SCHEMA_NAME.TABLE_NAME;
替换以下内容:
SCHEMA_NAME
:架构的名称TABLE_NAME
:表的名称。
删除表
以下示例会从指定命名空间中删除表:
Spark
spark.sql("DROP TABLE TABLE_NAME;")
Trino
DROP TABLE TABLE_NAME;
价格
如需详细了解价格,请参阅 BigLake 价格。