データソースに接続する
Managed Service for Apache Spark は、データベース、データ ウェアハウス、ストリーミング サービスと統合して、複数のシステムからデータを処理します。このドキュメントでは、Managed Service for Apache Spark クラスタを Google Cloudの内外のデータソースに接続する方法について説明します。
始める前に
- Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Managed Service for Apache Spark クラスタを作成する。
- Cloud Storage バケットを作成します。
- データソースにアクセスするために必要な権限があることを確認します。
必要なロール
このページの例を実行するには、特定の IAM ロールが必要です。組織のポリシーによっては、これらのロールがすでに付与されている場合があります。ロール付与を確認するには、ロールを付与する必要がありますか?をご覧ください。
ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
ユーザーロール
Managed Service for Apache Spark クラスタの作成に必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。
-
プロジェクトに対する Dataproc 編集者 (
roles/dataproc.editor) -
Compute Engine のデフォルトのサービス アカウントに対するサービス アカウント ユーザー (
roles/iam.serviceAccountUser)
サービス アカウント ロール
Compute Engine のデフォルト サービス アカウントに Managed Service for Apache Spark クラスタを作成するために必要な権限を付与するには、プロジェクトに対する Dataproc ワーカー (roles/dataproc.worker)IAM ロールを Compute Engine のデフォルト サービス アカウントに付与するよう管理者に依頼してください。
Google Cloud サービスに接続する
このセクションでは、Managed Service for Apache Spark を Google Cloudと外部データソースに接続する方法について説明します。
Cloud Storage への接続
Managed Service for Apache Spark は、デフォルトの分散ファイル システムとして Cloud Storage を使用します。Cloud Storage バケットからデータを読み取り、Cloud Storage バケットにデータを書き込むには、gs:// URI 接頭辞を使用します。追加の構成は必要ありません。
次の例は、Cloud Storage バケットから CSV ファイルを読み取り、Parquet ファイルをバケットに書き込む方法を示しています。
# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("gs://BUCKET_NAME/path/to/data.csv")
df.show()
# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
.mode("overwrite") \
.save("gs://BUCKET_NAME/path/to/output.parquet")
BigQuery に接続する
BigQuery コネクタは、Managed Service for Apache Spark 2.1 以降のイメージ バージョンにプリインストールされています。このコネクタを使用すると、Managed Service for Apache Spark と BigQuery 間で大規模なデータ転送が可能になります。
次の例は、BigQuery テーブルを Spark DataFrame に読み込んでからクエリを実行する方法を示しています。
# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")
# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
.option("table", "bigquery-public-data.samples.shakespeare") \
.load()
df.createOrReplaceTempView("shakespeare")
# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()
JDBC を使用して外部データベースに接続する
Managed Service for Apache Spark は、Spark 組み込みの JDBC データソースを使用して、PostgreSQL や MySQL などの Java Database Connectivity(JDBC)ドライバを提供するデータベースに接続できます。
JDBC ドライバを使用可能にする
JDBC でデータベースに接続するには、ドライバの JAR ファイルを Spark クラスパスで使用できるようにします。
ジョブ単位
ドライバ JAR を Cloud Storage バケットに保存し、ジョブを送信するときに --jars フラグで参照します。Spark は、ジョブに必要なノードに JAR ファイルを配布します。これは、ジョブ固有の依存関係に推奨される方法です。
クラスタ全体
クラスタ上のすべてのジョブに特定のドライバが必要な場合は、クラスタの作成時に初期化アクションを使用して、Cloud Storage からすべてのノードの Spark jars ディレクトリにドライバ JAR をコピーします。
初期化アクション スクリプトを作成します。
#!/bin/bash set -e -x gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/クラスタを作成し、スクリプトを参照します。
gcloud dataproc clusters create my-jdbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
PostgreSQL データベースから読み取る
この例では、ジョブごとの方法を使用して PostgreSQL データベースに接続します。
PostgreSQL JDBC ドライバ JAR をダウンロードして、Cloud Storage バケットにアップロードします。
PySpark ジョブを送信し、
--jarsフラグを使用してドライバの Cloud Storage パスを参照します。gcloud dataproc jobs submit pyspark my_job.py \ --cluster=MY_CLUSTER \ --region=REGION \ --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jarjob.pyファイルで次のコードを使用して、データベースの読み取りと書き込みを行います。from pyspark.sql import SparkSession def main(): # WARNING: Do not hardcode credentials. Use a service like Secret Manager # to handle sensitive credentials. db_properties = { "user": "USERNAME", "password": "PASSWORD", "driver": "org.postgresql.Driver" } jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE" spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate() # Read data from a PostgreSQL table df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties) df.printSchema() df.show() # Write data to a new PostgreSQL table (df.write .jdbc(url=jdbc_url, table="public.new_table", mode="overwrite", properties=db_properties)) if __name__ == "__main__": main()
ODBC を使用して外部データベースに接続する
JDBC ドライバが利用できないデータソースに接続するには、Open Database Connectivity(ODBC)ドライバを使用します。クラスタの作成時に、初期化アクションを使用して、各ノードにドライバとその依存関係をインストールします。
この例では、Microsoft SQL Server インスタンスに接続する方法を示します。
初期化アクション スクリプトを作成します。
- Debian 用の Microsoft SQL Server ODBC ドライバをインストールするスクリプトを作成します。
#!/bin/bash # Initialization action for installing MS SQL ODBC driver on the cluster. set -e -x # Install dependencies for the driver. apt-get update apt-get install -y --no-install-recommends curl gnupg unixodbc-dev # Add Microsoft's official repository. curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list # Install the driver. apt-get update ACCEPT_EULA=Y apt-get install -y msodbcsql18 # Install the pyodbc library for Python. pip install pyodbc ``` 1. Upload the script to a Cloud Storage bucket. ```shell gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/ ```初期化アクションを実行する Managed Service for Apache Spark クラスタを作成します。
クラスタを作成するときに、Cloud Storage の初期化アクション スクリプトを指定します。
gcloud dataproc clusters create my-odbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.shPySpark を使用して接続してクエリを実行する
クラスタが実行されると、ODBC ドライバと
pyodbcライブラリが使用可能になります。次のコードでは、ドライバ ノードでpyodbcを使用してデータを Pandas DataFrame に取得し、分散処理用に Spark DataFrame に変換します。import pyodbc import pandas as pd from pyspark.sql import SparkSession def get_sql_data(connection_string, query): """ Connects to the database using pyodbc, executes a query, and returns the result as a Pandas DataFrame. """ cnxn = pyodbc.connect(connection_string) pdf = pd.read_sql(query, cnxn) cnxn.close() return pdf def main(): spark = SparkSession.builder.appName("ODBC Example").getOrCreate() # WARNING: Do not hardcode credentials. Use Secret Manager. server = 'SERVER.database.windows.net' database = 'DATABASE' username = 'USERNAME' password = 'PASSWORD' driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver. # Create the connection string. connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;' sql_query = "SELECT * FROM Sales.Customer" # Fetch data as a Pandas DataFrame. pandas_df = get_sql_data(connection_string, sql_query) # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing. spark_df = spark.createDataFrame(pandas_df) print("Successfully read data from SQL Server:") spark_df.printSchema() spark_df.show(5) # You can now perform distributed operations on the Spark DataFrame. print(f"Total number of customers: {spark_df.count()}") if __name__ == "__main__": main()
ベスト プラクティス
このセクションでは、認証情報と依存関係を管理するためのベスト プラクティスについて説明します。
認証情報の管理
パスワードなどのシークレットをコードにハードコードしないでください。Secret Manager を使用して認証情報を安全に保存し、Managed Service for Apache Spark ジョブからアクセスします。
依存関係の管理
クラスタの作成時に、初期化アクションを使用してドライバとライブラリをインストールします。複雑な環境の場合は、すべての依存関係がプリインストールされたカスタム Managed Service for Apache Spark イメージの構築を検討してください。カスタム イメージを使用すると、クラスタの起動時間が短縮され、一貫性が確保されます。
本番環境で使用する前に、初期化アクション スクリプトを公開ソースから独自のバージョニングされた Cloud Storage バケットにコピーします。公開スクリプトは予告なく変更される可能性があるため、直接参照しないでください。
次のステップ
- シークレット マネージャーの詳細を確認する。
- カスタム イメージを作成する方法を確認する。
- 詳しくは、初期化アクションをご覧ください。