查询 BigQuery 表
本文档介绍了如何在 Managed Service for Apache Spark 工作负载中使用 Spark SQL 和 Spark DataFrame API 来查询 BigQuery 表。
准备工作
启用 API,并在需要时授予 Identity and Access Management 角色。
启用 API
- 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
授予 Identity and Access Management 角色
您需要获得 Managed Service for Apache Spark 和 BigQuery 角色授予权限,才能运行本页中的示例。根据组织政策,这些角色可能已获授予。如需检查角色授予情况,请参阅您是否需要授予角色?。
Managed Service for Apache Spark 角色
默认情况下,批处理和会话以 Compute Engine 默认服务账号的身份运行,除非为工作负载或会话指定了自定义服务账号。
服务账号用户角色
如需获得提交批处理工作负载所需的权限,请让您的管理员为您授予 Compute Engine 默认服务账号的 Service Account User (roles/iam.serviceAccountUser) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
Dataproc Worker 角色
如需确保 Compute Engine 默认服务账号具有提交批处理工作负载所需的权限,请让您的管理员向 Compute Engine 默认服务账号授予项目的 Dataproc Worker (roles/dataproc.worker) IAM 角色。
您的管理员也可以通过自定义角色或其他预定义角色为 Compute Engine 默认服务账号授予所需的权限。
BigQuery 角色
用于运行 Managed Service for Apache Spark 批处理工作负载或互动式会话的服务账号必须在以下资源上被授予以下 IAM 角色:
BigQuery Data Viewer (
roles/bigquery.dataViewer),用于从表中读取数据,如下所示:- 在 Spark SQL SELECT 和 INSERT INTO 示例中,从 bigquery.DATASET_ID.SOURCE_TABLE 读取数据。
- 在 DataFrame API 示例中从 INFORMATION_SCHEMA 读取数据。
BigQuery User (
roles/bigquery.user),以允许 Spark 执行与 BigQuery 交互的作业。BigQuery Data Editor (
roles/bigquery.dataEditor) 角色,以便写入数据或元数据,如下所示:- 对于 Spark SQL INSERT INTO 示例,要写入 bigquery.DATASET_ID.DESTINATION_TABLE。
- 对于查询 INFORMATION_SCHEMA 的 DataFrame API 示例,需要在
.option('materializationDataset', ...)中提供的 DATASET_ID 上拥有此角色,以允许连接器为结果创建临时表。
提交 Spark 批处理工作负载
您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Managed Service for Apache Spark API 来提交 Managed Service for Apache Spark 批处理工作负载。
使用 Spark SQL
您可以使用 Spark BigQuery 目录直接从批处理工作负载或互动式会话中查询标准 BigQuery 表。此方法可让您使用标准 GoogleSQL 语法在 spark-sql 作业中与 BigQuery 数据互动,而无需编写 PySpark 代码或使用 DataFrame API 创建临时视图。
配置 BigQuery 目录
如需启用 BigQuery 目录,请为 Spark SQL 批处理工作负载或交互式会话提供以下 Spark 属性:
dataproc.sparkBqConnector.version=CONNECTOR_VERSION:指定 Spark BigQuery 连接器版本。spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog:(可选) 将bigquery目录注册为 Spark SQL 目录。
Google Cloud CLI 示例:
gcloud dataproc batches submit spark-sql \
--project=PROJECT_ID \
--region=REGION \
--version=RUNTIME_VERSION \
--subnet=SUBNET \
--service-account=SERVICE_ACCOUNT \
--properties="dataproc.sparkBqConnector.version=CONNECTOR_VERSION,spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog" \
gs://BUCKET/my_query.sql
替换以下内容:
PROJECT_ID:项目 ID。项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。REGION:批处理将运行的区域RUNTIME_VERSION:可选。 Managed Service for Apache Spark 运行时版本。 如果未指定,则选择当前默认运行时版本。CONNECTOR_VERSION:Spark BigQuery 连接器版本。 如需查找与RUNTIME_VERSION兼容的连接器版本,请参阅 Managed Service for Apache Spark 运行时版本。如果连接器未预安装,您可以在 GitHub 版本页面上找到可用版本。SUBNET:可选。用于批处理工作负载的子网。 如果未指定,则使用default子网。SERVICE_ACCOUNT:可选。批量作业将以该服务账号的身份运行。如果未指定,则使用 Compute Engine 默认服务账号。BUCKET:包含 SQL 文件的 Cloud Storage 存储桶。
查询 BigQuery 表
配置目录后,您可以使用以下格式在 SQL 脚本中引用 BigQuery 表:bigquery.DATASET_ID.TABLE_ID。
示例 SQL 查询:
-- Query data from a BigQuery table.
SELECT
column_a,
SUM(column_b)
FROM
bigquery.DATASET_ID.SOURCE_TABLE
WHERE
partition_date = CURRENT_DATE()
GROUP BY column_a;
-- Insert results into another BigQuery table.
INSERT INTO bigquery.DATASET_ID.DESTINATION_TABLE
SELECT column_a, column_b
FROM bigquery.DATASET_ID.SOURCE_TABLE
WHERE column_c = 'some_value';
替换以下内容:
DATASET_ID:BigQuery 数据集 ID。SOURCE_TABLE:要查询的表的 ID。DESTINATION_TABLE:要向其中插入数据的表的 ID。
使用 DataFrame API
需要使用 DataFrame API 才能访问 INFORMATION_SCHEMA 视图。
如需查询
INFORMATION_SCHEMA,请执行以下操作:- 设置
spark.conf.set('viewsEnabled', 'true')。 - 为连接器提供
.option('materializationDataset', 'DATASET_ID')以写入临时结果。
- 设置
PySpark 查询示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BigQuery Info Schema Test').getOrCreate()
# Required for INFORMATION_SCHEMA.
spark.conf.set('viewsEnabled', 'true')
# Query INFORMATION_SCHEMA.TABLES.
info_schema_df = spark.read.format('bigquery') \
.option('project', 'PROJECT_ID') \
.option('materializationDataset', 'DATASET_ID') \
.load(f'SELECT table_name, creation_time FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`')
info_schema_df.show(5, truncate=False)
替换以下内容:
PROJECT_ID:项目 ID。项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。DATASET_ID:SparkvBigQuery 连接器可写入临时数据的 BigQuery 数据集 ID。
如需查看从标准 BigQuery 表读取数据,然后将结果写入输出表的 PySpark 示例,请参阅提交 PySpark 字数统计批处理工作负载。