将数据加载到 BigQuery 中
本文档介绍了如何使用 Managed Service for Apache Spark 运行 Spark 作业,该作业可将处理后的数据从 Cloud Storage 加载到 BigQuery 表中。Managed Service for Apache Spark 通过管理 Spark 环境和必要的连接器来简化此流程。
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- 创建 Cloud Storage 存储桶。
- 创建使用映像版本
2.1或更高版本的 Managed Service for Apache Spark 集群。 - 创建 BigQuery 数据集。
准备 PySpark 脚本
创建一个名为
load_analytics_data.py的本地 Python 文件。在文件中添加以下代码。此脚本从 Cloud Storage 路径读取数据,执行聚合,并将结果写入 BigQuery。
from pyspark.sql import SparkSession from pyspark.sql.functions import sum as _sum import sys # --- Configuration --- gcs_bucket = "BUCKET_NAME" bq_project = "PROJECT_ID" bq_dataset = "DATASET" bq_table = "corpus_word_counts" # --- Paths --- processed_path = f"gs://{gcs_bucket}/processed/processed_data" temp_gcs_path = f"{gcs_bucket}" # --- Spark Session Initialization --- spark = SparkSession.builder \ .appName("Dataproc-BigQuery-Load") \ .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \ .getOrCreate() # --- Read Processed Data from Cloud Storage --- processed_df = spark.read.parquet(processed_path) # --- Final Aggregation for Analytics-Ready Stage --- analytics_df = processed_df.groupBy("corpus") \ .agg(_sum("word_count_int").alias("total_word_count")) \ .orderBy("corpus") print("Aggregated Analytics-Ready data:") analytics_df.show() # --- Write DataFrame to BigQuery --- print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}") analytics_df.write \ .format("bigquery") \ .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \ .mode("append") \ .save() print("Successfully wrote data to BigQuery.") # --- Stop Spark Session --- spark.stop()替换以下占位符:
BUCKET_NAME:Cloud Storage 存储桶的名称。PROJECT_ID: Google Cloud项目 ID。
将
load_analytics_data.py脚本上传到您的 Cloud Storage 存储桶。
提交 Managed Service for Apache Spark 作业
将 PySpark 脚本作为作业提交到 Managed Service for Apache Spark 集群。
在终端中,运行
gcloud dataproc jobs submit pyspark命令:gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \ --cluster=CLUSTER_NAME \ --region=REGION替换以下占位符:
BUCKET_NAME:Cloud Storage 存储桶的名称。CLUSTER_NAME:Managed Service for Apache Spark 集群的名称。REGION:集群所在的区域。
该命令会将 PySpark 作业提交到 Managed Service for Apache Spark 服务。Managed Service for Apache Spark 工作人员会从指定的 Cloud Storage 路径中提取脚本,并在集群上执行该脚本。
验证数据加载
在 Google Cloud 控制台中,前往 Managed Service for Apache Spark 作业页面,以监控作业的执行情况并查看驱动程序输出日志。
作业完成后,前往 BigQuery 页面。
在“探索器”面板中,找到您的项目和数据集,然后选择
corpus_word_counts表。点击预览标签页,检查已加载的数据。
Spark-BigQuery 连接器的工作方式
Spark-BigQuery 连接器使 Spark 应用能够从 BigQuery 读取数据以及将数据写入 BigQuery。在映像版本为 2.1 或更高版本的 Managed Service for Apache Spark 集群上,该连接器已预安装。
连接器使用间接写入方法来加载数据。此方法同时利用了 Managed Service for Apache Spark 和 BigQuery,可实现高性能。
Managed Service for Apache Spark 集群上的 Spark 作业将最终 DataFrame 写入 Cloud Storage 存储桶中的临时位置。
写入 Cloud Storage 完成后,连接器会触发 BigQuery 加载作业。
BigQuery 将临时 Cloud Storage 位置中的数据注入到目标表中。
这种间接方法可将 Spark 计算与 BigQuery 提取分离。这种解耦可让每项服务高效运行,并确保在处理大量数据时实现高吞吐量。
后续步骤
- 详细了解 Spark-BigQuery 连接器。
- 了解如何在 BigQuery 中查询和直观呈现数据。
- 了解如何使用 Looker 直观呈现 BigQuery 数据。