サーバーレス Spark Connect クライアントを使用する

Dataproc Spark Connect クライアントは、Apache Spark Connect クライアントのラッパーです。これにより、アプリケーションは Spark Connect プロトコルを使用して、リモートの Serverless for Apache Spark セッションと通信できます。このドキュメントでは、クライアントをインストール、構成、使用する方法について説明します。

始める前に

  1. インタラクティブ セッションとセッション テンプレートの管理に必要な権限を含む Identity and Access Management ロールがあることを確認します。

  2. Google Cloudの外部でクライアントを実行する場合は、認証情報を指定します。GOOGLE_APPLICATION_CREDENTIALS 環境変数をサービス アカウント キーファイルのパスに設定します。

クライアントをインストールまたはアンインストールする

pip を使用して dataproc-spark-connect パッケージをインストールまたはアンインストールできます。

インストール

クライアントの最新バージョンをインストールするには、次のコマンドを実行します。

pip install -U dataproc-spark-connect

アンインストール

クライアントをアンインストールするには、次のコマンドを実行します。

pip uninstall dataproc-spark-connect

クライアントを構成する

セッションのプロジェクトとリージョンを指定します。これらの値は、環境変数を使用するか、コードでビルダー API を使用して設定できます。

環境変数

GOOGLE_CLOUD_PROJECTGOOGLE_CLOUD_REGION の環境変数を設定します。

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


Builder API

.projectId() メソッドと .location() メソッドを使用します。

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Spark セッションを開始する

Spark セッションを開始するには、必要なインポートを PySpark アプリケーションまたはノートブックに追加し、DataprocSparkSession.builder.getOrCreate() API を呼び出します。

  1. DataprocSparkSession クラスをインポートします。

  2. getOrCreate() メソッドを呼び出してセッションを開始します。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Spark プロパティを構成する

Spark プロパティを構成するには、1 つ以上の .config() メソッドをビルダーにチェーンします。

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

高度な設定を使用する

高度な構成を行うには、Session クラスを使用して、サブネットワークやランタイム バージョンなどの設定をカスタマイズします。

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

名前付きセッションを再利用する

名前付きセッションを使用すると、複数のノートブック間で 1 つの Spark セッションを共有し、セッションの起動時間の遅延を繰り返すことを回避できます。

  1. 最初のノートブックで、カスタム ID を使用してセッションを作成します。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. 別のノートブックで、同じセッション ID を指定してセッションを再利用します。

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

セッション ID は 4 ~ 63 文字で、先頭は英小文字にする必要があります。使用できるのは英小文字、数字、ハイフンのみです。ID の末尾をハイフンにすることはできません。ID が TERMINATED 状態のセッションは再利用できません。

Spark SQL マジック コマンドを使用する

このパッケージは、Jupyter ノートブックで Spark SQL クエリを実行するための sparksql-magic ライブラリをサポートしています。マジック コマンドはオプション機能です。

  1. 必要な依存関係をインストールします。

    pip install IPython sparksql-magic
    
  2. マジック拡張機能を読み込みます。

    %load_ext sparksql_magic
    
  3. 省略可: デフォルト設定を構成します。

    %config SparkSql.limit=20
    
  4. SQL クエリを実行します。

    %%sparksql
    SELECT * FROM your_table
    

詳細オプションを使用するには、%%sparksql コマンドにフラグを追加します。たとえば、結果をキャッシュに保存してビューを作成するには、次のコマンドを実行します。

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

次のオプションが用意されています。

  • --cache または -c: DataFrame をキャッシュに保存します。
  • --eager または -e: 事前読み込みを行うキャッシュ。
  • --view VIEW または -v VIEW: 一時的なビューを作成します。
  • --limit N または -l N: デフォルトの行表示上限をオーバーライドします。
  • variable_name: 結果を変数に保存します。

次のステップ