将数据加载到 BigQuery 中
本文档介绍了如何使用 Managed Service for Apache Spark 运行 Spark 作业,该作业会将经过处理的数据从 Cloud Storage 加载到 BigQuery 表中。Managed Service for Apache Spark 通过管理 Spark 环境和必要的连接器来简化此过程。
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud的新用户, 请创建账号,以便在 实际场景中评估我们产品的性能。新客户还可以获得 300 美元的免费抵用金,用于 运行、测试和部署工作负载。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- 创建 Cloud Storage 存储桶。
- 创建使用映像版本
2.1或更高版本的 Managed Service for Apache Spark 集群。 - 创建 BigQuery 数据集。
准备 PySpark 脚本
创建一个名为
load_analytics_data.py的本地 Python 文件。在文件中添加以下代码。此脚本会从 Cloud Storage 路径读取数据,执行聚合,并将结果写入 BigQuery。
from pyspark.sql import SparkSession from pyspark.sql.functions import sum as _sum import sys # --- Configuration --- gcs_bucket = "BUCKET_NAME" bq_project = "PROJECT_ID" bq_dataset = "DATASET" bq_table = "corpus_word_counts" # --- Paths --- processed_path = f"gs://{gcs_bucket}/processed/processed_data" temp_gcs_path = f"{gcs_bucket}" # --- Spark Session Initialization --- spark = SparkSession.builder \ .appName("Dataproc-BigQuery-Load") \ .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \ .getOrCreate() # --- Read Processed Data from Cloud Storage --- processed_df = spark.read.parquet(processed_path) # --- Final Aggregation for Analytics-Ready Stage --- analytics_df = processed_df.groupBy("corpus") \ .agg(_sum("word_count_int").alias("total_word_count")) \ .orderBy("corpus") print("Aggregated Analytics-Ready data:") analytics_df.show() # --- Write DataFrame to BigQuery --- print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}") analytics_df.write \ .format("bigquery") \ .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \ .mode("append") \ .save() print("Successfully wrote data to BigQuery.") # --- Stop Spark Session --- spark.stop()替换以下占位符:
BUCKET_NAME:Cloud Storage 存储桶的名称。PROJECT_ID:您的 Google Cloud 项目 ID。
将
load_analytics_data.py脚本上传到您的 Cloud Storage 存储桶。
提交 Managed Service for Apache Spark 作业
将 PySpark 脚本作为作业提交到您的 Managed Service for Apache Spark 集群。
在终端中,运行
gcloud dataproc jobs submit pyspark命令:gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \ --cluster=CLUSTER_NAME \ --region=REGION替换以下占位符:
BUCKET_NAME:Cloud Storage 存储桶的名称。CLUSTER_NAME:Managed Service for Apache Spark 集群的名称。REGION:集群所在的区域。
该命令会将 PySpark 作业提交到 Managed Service for Apache Spark。Managed Service for Apache Spark 工作器会从指定的 Cloud Storage 路径提取脚本,并在集群上执行该脚本。
验证数据加载
在 Google Cloud 控制台中,前往 Managed Service for Apache Spark 作业 页面,以监控 作业的执行情况并查看驱动程序输出日志。
作业完成后,前往 BigQuery 页面。
在探索器面板中,找到您的项目和数据集,然后选择
corpus_word_counts表。点击预览 标签页,检查加载的数据。
Spark-BigQuery 连接器的工作原理
借助 Spark-BigQuery 连接器,Spark 应用可以从 BigQuery 读取数据以及将数据写入 BigQuery。在映像版本为 2.1 或更高版本的 Managed Service for Apache Spark 集群上,该连接器已预安装。
该连接器使用间接写入方法加载数据。此方法同时利用 Managed Service for Apache Spark 和 BigQuery 来实现高性能。
Managed Service for Apache Spark 集群上的 Spark 作业会将最终 DataFrame 写入 Cloud Storage 存储桶中的临时位置。
写入 Cloud Storage 完成后,连接器会触发 BigQuery 加载作业。
BigQuery 会将数据从临时 Cloud Storage 位置提取到目标表中。
这种间接方法将 Spark 计算与 BigQuery 提取分离。这种分离可让每项服务高效运行,并确保大型数据加载的高吞吐量。
后续步骤
- 详细了解 Spark-BigQuery 连接器。
- 了解如何在 BigQuery 中查询和直观呈现数据。
- 了解如何使用 Looker 直观呈现 BigQuery 数据。