查詢 BigQuery 資料表
本文說明如何在 Managed Service for Apache Spark 工作負載中使用 Spark SQL 和 Spark DataFrame API,查詢 BigQuery 表格。
事前準備
啟用 API,並視需要授予 Identity and Access Management 角色。
啟用 API
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
授予身分與存取權管理角色
如要在本頁面執行範例,必須授予 Managed Service for Apache Spark 和 BigQuery 角色。視機構政策而定,系統可能已授予這些角色。如要檢查角色授予情形,請參閱「是否需要授予角色?」一節。
Managed Service for Apache Spark 角色
根據預設,批次和工作階段會以 Compute Engine 預設服務帳戶執行,除非為工作負載或工作階段指定自訂服務帳戶。
服務帳戶使用者角色
如要取得提交批次工作負載所需的權限,請要求管理員授予您 Compute Engine 預設服務帳戶的「服務帳戶使用者」( roles/iam.serviceAccountUser) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
Dataproc Worker 角色
為確保 Compute Engine 預設服務帳戶具備提交批次工作負載的必要權限,請要求管理員在專案中,將 Dataproc Worker (roles/dataproc.worker) IAM 角色授予 Compute Engine 預設服務帳戶。
管理員或許也能透過自訂角色或其他預先定義的角色,將必要權限授予 Compute Engine 預設服務帳戶。
BigQuery 角色
用來執行 Managed Service for Apache Spark 批次工作負載或互動式工作階段的服務帳戶,必須在下列資源中獲授下列 IAM 角色:
BigQuery 資料檢視者 (
roles/bigquery.dataViewer),可從資料表讀取資料,方法如下:- 在 Spark SQL SELECT 和 INSERT INTO 範例中,從 bigquery.DATASET_ID.SOURCE_TABLE 讀取資料。
- 在 DataFrame API 範例中從 INFORMATION_SCHEMA 讀取資料。
BigQuery 使用者 (
roles/bigquery.user),允許 Spark 執行與 BigQuery 互動的工作。BigQuery 資料編輯者 (
roles/bigquery.dataEditor),可寫入資料或中繼資料,如下所示:- 以 Spark SQL INSERT INTO 範例來說,如要寫入 bigquery.DATASET_ID.DESTINATION_TABLE.
- 在查詢 INFORMATION_SCHEMA 的 DataFrame API 範例中,DATASET_ID 提供的
.option('materializationDataset', ...)需要這個角色,才能讓連接器為結果建立臨時表。
提交 Spark 批次工作負載
您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Managed Service for Apache Spark API,提交 Managed Service for Apache Spark 批次工作負載。
使用 Spark SQL
您可以使用 Spark BigQuery 目錄,直接從批次工作負載或互動式工作階段查詢標準 BigQuery 資料表。這個方法可讓您在 spark-sql 工作中使用標準 GoogleSQL 語法與 BigQuery 資料互動,不必編寫 PySpark 程式碼,也不必使用 DataFrame API 建立臨時檢視區塊。
設定 BigQuery 目錄
如要啟用 BigQuery 目錄,請為 Spark SQL 批次工作負載或互動式工作階段提供下列 Spark 屬性:
dataproc.sparkBqConnector.version=CONNECTOR_VERSION:指定 Spark BigQuery 連接器版本。spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog: (選用) 將bigquery目錄註冊為 Spark SQL 目錄。
Google Cloud CLI 範例:
gcloud dataproc batches submit spark-sql \
--project=PROJECT_ID \
--region=REGION \
--version=RUNTIME_VERSION \
--subnet=SUBNET \
--service-account=SERVICE_ACCOUNT \
--properties="dataproc.sparkBqConnector.version=CONNECTOR_VERSION,spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog" \
gs://BUCKET/my_query.sql
更改下列內容:
PROJECT_ID:專案 ID。專案 ID 會列在 Google Cloud 控制台資訊主頁的「Project info」(專案資訊) 部分。REGION:批次作業的執行區域RUNTIME_VERSION:選用。 Managed Service for Apache Spark 執行階段版本。 如未指定,系統會選取目前的預設執行階段版本。CONNECTOR_VERSION:Spark BigQuery 連接器版本。如要尋找與RUNTIME_VERSION相容的連接器版本,請參閱「Managed Service for Apache Spark 執行階段版本」。如果連接器未預先安裝,您可以在 GitHub 版本頁面找到可用版本。SUBNET:選用。用於批次工作負載的子網路。 如未指定,則會使用default子網路。SERVICE_ACCOUNT:選用。批次工作的服務帳戶執行身分。如未指定,則會使用 Compute Engine 預設服務帳戶。BUCKET:包含 SQL 檔案的 Cloud Storage bucket。
查詢 BigQuery 資料表
設定目錄後,您可以使用下列格式在 SQL 指令碼中參照 BigQuery 資料表:bigquery.DATASET_ID.TABLE_ID。
SQL 查詢範例:
-- Query data from a BigQuery table.
SELECT
column_a,
SUM(column_b)
FROM
bigquery.DATASET_ID.SOURCE_TABLE
WHERE
partition_date = CURRENT_DATE()
GROUP BY column_a;
-- Insert results into another BigQuery table.
INSERT INTO bigquery.DATASET_ID.DESTINATION_TABLE
SELECT column_a, column_b
FROM bigquery.DATASET_ID.SOURCE_TABLE
WHERE column_c = 'some_value';
更改下列內容:
DATASET_ID:BigQuery 資料集 ID。SOURCE_TABLE:要查詢的資料表 ID。DESTINATION_TABLE:要插入資料的表格 ID。
使用 DataFrame API
您必須使用 DataFrame API,才能存取 INFORMATION_SCHEMA 檢視畫面。
如要查詢
INFORMATION_SCHEMA:- 設定
spark.conf.set('viewsEnabled', 'true')。 - 提供
.option('materializationDataset', 'DATASET_ID'),供連接器寫入暫時結果。
- 設定
PySpark 查詢範例:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BigQuery Info Schema Test').getOrCreate()
# Required for INFORMATION_SCHEMA.
spark.conf.set('viewsEnabled', 'true')
# Query INFORMATION_SCHEMA.TABLES.
info_schema_df = spark.read.format('bigquery') \
.option('project', 'PROJECT_ID') \
.option('materializationDataset', 'DATASET_ID') \
.load(f'SELECT table_name, creation_time FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`')
info_schema_df.show(5, truncate=False)
更改下列內容:
PROJECT_ID:專案 ID。專案 ID 會列在 Google Cloud 控制台資訊主頁的「Project info」(專案資訊) 部分。DATASET_ID:SparkvBigQuery 連接器可將暫時資料寫入的 BigQuery 資料集 ID。
如需從標準 BigQuery 資料表讀取資料,然後將結果寫入輸出資料表的 PySpark 範例,請參閱「提交 PySpark 字詞計數批次工作負載」。