连接到数据源
Managed Service for Apache Spark 可与数据库、数据仓库和流式服务集成,以处理来自多个系统的数据。 本文档介绍了如何将 Managed Service for Apache Spark 集群连接到内部和外部的数据源 Google Cloud。
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud的新用户, 请创建一个账号,以评估我们的产品在 实际场景中的表现。新客户还可以获得 300 美元的免费抵用金,用于 运行、测试和部署工作负载。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- 创建 Managed Service for Apache Spark 集群。
- 创建 Cloud Storage 存储桶。
- 确保您拥有访问数据源所需的权限。
所需角色
如需运行本页中的示例,您需要拥有某些 IAM 角色。根据组织政策,系统可能已授予这些角色。如需检查角色授予情况,请参阅 您是否需要授予角色?。
如需详细了解如何授予角色,请参阅 管理对项目、文件夹和组织的访问权限。
用户角色
如需获得创建 Managed Service for Apache Spark 集群所需的权限,请让管理员向您授予以下 IAM 角色:
-
Dataproc Editor (
roles/dataproc.editor) 项目的 -
Compute Engine 默认服务帐号的 Service Account User (
roles/iam.serviceAccountUser)
服务账号角色
如需确保 Compute Engine 默认服务帐号拥有创建 Managed Service for Apache Spark 集群所需的权限,请让管理员向 Compute Engine 默认服务帐号授予项目的 Dataproc Worker (roles/dataproc.worker) IAM 角色。
连接到 Google Cloud 服务
本部分介绍了如何将 Managed Service for Apache Spark 连接到 Google Cloud 内部和外部数据源。
连接到 Cloud Storage
Managed Service for Apache Spark 使用 Cloud Storage 作为其默认分布式文件系统。如需从 Cloud Storage 存储分区读取数据和向其写入数据,请使用 gs:// URI 前缀。无需进行其他配置。
以下示例展示了如何从 Cloud Storage 存储桶读取 CSV 文件,然后将 Parquet 文件写入该存储桶。
# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("gs://BUCKET_NAME/path/to/data.csv")
df.show()
# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
.mode("overwrite") \
.save("gs://BUCKET_NAME/path/to/output.parquet")
连接到 BigQuery
BigQuery 连接器预安装在 Managed Service for Apache Spark 2.1 及更高版本的映像版本中。该连接器支持在 Managed Service for Apache Spark 和 BigQuery 之间进行大规模数据传输。
以下示例展示了如何将 BigQuery 表读取到 Spark DataFrame 中,然后执行查询。
# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")
# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
.option("table", "bigquery-public-data.samples.shakespeare") \
.load()
df.createOrReplaceTempView("shakespeare")
# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()
使用 JDBC 连接到外部数据库
您可以使用 Spark 内置 JDBC 数据源将 Managed Service for Apache Spark 连接到提供 Java 数据库连接 (JDBC) 驱动程序的数据库,例如 PostgreSQL 或 MySQL。
提供 JDBC 驱动程序
如需使用 JDBC 连接到数据库,请在 Spark 类路径中提供驱动程序的 JAR 文件。
按作业
将驱动程序 JAR 存储在 Cloud Storage 存储桶中,并在提交作业时使用 --jars 标志引用该 JAR。Spark 会将 JAR 文件分发到作业所需的节点。这是针对作业特定依赖项的推荐方法。
集群级
如果集群上的所有作业都需要特定驱动程序,请在创建集群时使用初始化操作将驱动程序 JAR 从 Cloud Storage 复制到每个节点上的 Spark jars 目录中。
创建初始化操作脚本。
#!/bin/bash set -e -x gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/创建集群并引用该脚本。
gcloud dataproc clusters create my-jdbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
从 PostgreSQL 数据库读取数据
此示例使用按作业方法连接到 PostgreSQL 数据库。
下载 PostgreSQL JDBC 驱动程序 JAR,并将其上传到 Cloud Storage 存储桶。
提交 PySpark 作业,并使用
--jars标志引用驱动程序的 Cloud Storage 路径。gcloud dataproc jobs submit pyspark my_job.py \ --cluster=MY_CLUSTER \ --region=REGION \ --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar在
job.py文件中使用以下代码从数据库读取数据和向其写入数据。from pyspark.sql import SparkSession def main(): # WARNING: Do not hardcode credentials. Use a service like Secret Manager # to handle sensitive credentials. db_properties = { "user": "USERNAME", "password": "PASSWORD", "driver": "org.postgresql.Driver" } jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE" spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate() # Read data from a PostgreSQL table df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties) df.printSchema() df.show() # Write data to a new PostgreSQL table (df.write .jdbc(url=jdbc_url, table="public.new_table", mode="overwrite", properties=db_properties)) if __name__ == "__main__": main()
使用 ODBC 连接到外部数据库
如需连接到没有 JDBC 驱动程序的数据源,请使用开放式数据库连接 (ODBC) 驱动程序。在创建集群时,使用初始化操作在每个节点上安装驱动程序及其依赖项。
此示例演示了如何连接到 Microsoft SQL Server 实例。
创建初始化操作脚本。
- 创建一个用于安装适用于 Debian 的 Microsoft SQL Server ODBC 驱动程序的脚本。
#!/bin/bash # Initialization action for installing MS SQL ODBC driver on the cluster. set -e -x # Install dependencies for the driver. apt-get update apt-get install -y --no-install-recommends curl gnupg unixodbc-dev # Add Microsoft's official repository. curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list # Install the driver. apt-get update ACCEPT_EULA=Y apt-get install -y msodbcsql18 # Install the pyodbc library for Python. pip install pyodbc ``` 1. Upload the script to a Cloud Storage bucket. ```shell gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/ ```创建一个运行初始化操作的 Managed Service for Apache Spark 集群。
创建集群时,请指向 Cloud Storage 中的初始化操作脚本。
gcloud dataproc clusters create my-odbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.sh使用 PySpark 进行连接和查询
集群运行后,ODBC 驱动程序和
pyodbc库即可使用。以下代码在驱动程序节点上使用pyodbc将数据提取到 Pandas DataFrame 中,然后将其转换为 Spark DataFrame 以进行分布式处理。import pyodbc import pandas as pd from pyspark.sql import SparkSession def get_sql_data(connection_string, query): """ Connects to the database using pyodbc, executes a query, and returns the result as a Pandas DataFrame. """ cnxn = pyodbc.connect(connection_string) pdf = pd.read_sql(query, cnxn) cnxn.close() return pdf def main(): spark = SparkSession.builder.appName("ODBC Example").getOrCreate() # WARNING: Do not hardcode credentials. Use Secret Manager. server = 'SERVER.database.windows.net' database = 'DATABASE' username = 'USERNAME' password = 'PASSWORD' driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver. # Create the connection string. connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;' sql_query = "SELECT * FROM Sales.Customer" # Fetch data as a Pandas DataFrame. pandas_df = get_sql_data(connection_string, sql_query) # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing. spark_df = spark.createDataFrame(pandas_df) print("Successfully read data from SQL Server:") spark_df.printSchema() spark_df.show(5) # You can now perform distributed operations on the Spark DataFrame. print(f"Total number of customers: {spark_df.count()}") if __name__ == "__main__": main()
最佳做法
本部分介绍了管理凭据和依赖项的最佳实践。
凭据管理
请勿在代码中硬编码密码或其他 Secret。使用 Secret Manager 安全地存储凭据,并从 Managed Service for Apache Spark 作业中访问这些凭据。
依赖项管理
在创建集群时,使用初始化操作安装驱动程序和库。对于复杂环境,请考虑构建预安装了所有依赖项的自定义 Managed Service for Apache Spark 映像。自定义映像可缩短集群启动时间并确保一致性。
在使用初始化操作脚本之前,请先将其从公共来源复制到您自己的版本化 Cloud Storage 存储桶。请勿直接引用公共脚本,因为这些脚本可能会在不事先通知的情况下发生更改。
后续步骤
- 详细了解 Secret Manager。
- 了解如何创建自定义映像。
- 详细了解初始化操作。