Serverless Spark Connect-Client verwenden

Der Dataproc Spark Connect-Client ist ein Wrapper des Apache Spark Connect-Clients. Damit können Anwendungen über das Spark Connect-Protokoll mit einer Remote-Sitzung von Serverless for Apache Spark kommunizieren. In diesem Dokument wird beschrieben, wie Sie den Client installieren, konfigurieren und verwenden.

Hinweise

  1. Sie benötigen die IAM-Rollen (Identity and Access Management), die die Berechtigungen zum Verwalten interaktiver Sitzungen und Sitzungsvorlagen enthalten.

  2. Wenn Sie den Client außerhalb von Google Cloudausführen, müssen Sie Authentifizierungsdaten angeben. Legen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS auf den Pfad Ihrer Dienstkontoschlüsseldatei fest.

Client installieren oder deinstallieren

Sie können das dataproc-spark-connect-Paket mit pip installieren oder deinstallieren.

Installieren

Führen Sie den folgenden Befehl aus, um die neueste Version des Clients zu installieren:

pip install -U dataproc-spark-connect

Deinstallieren

Führen Sie den folgenden Befehl aus, um den Client zu deinstallieren:

pip uninstall dataproc-spark-connect

Client konfigurieren

Geben Sie das Projekt und die Region für Ihre Sitzung an. Sie können diese Werte mit Umgebungsvariablen oder mit der Builder API in Ihrem Code festlegen.

Umgebungsvariablen

Legen Sie die Umgebungsvariablen GOOGLE_CLOUD_PROJECT und GOOGLE_CLOUD_REGION fest.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


Builder API

Verwenden Sie die Methoden .projectId() und .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Spark-Sitzung starten

Wenn Sie eine Spark-Sitzung starten möchten, fügen Sie Ihrer PySpark-Anwendung oder Ihrem Notebook die erforderlichen Importe hinzu und rufen Sie dann die DataprocSparkSession.builder.getOrCreate() API auf.

  1. Importieren Sie die Klasse DataprocSparkSession.

  2. Rufen Sie die Methode getOrCreate() auf, um die Sitzung zu starten.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Spark-Attribute konfigurieren

Wenn Sie Spark-Attribute konfigurieren möchten, verketten Sie eine oder mehrere .config()-Methoden mit dem Builder.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Erweiterte Konfiguration verwenden

Für die erweiterte Konfiguration verwenden Sie die Klasse Session, um Einstellungen wie das Subnetz oder die Laufzeitversion anzupassen.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Benannte Sitzung wiederverwenden

Mit benannten Sitzungen können Sie eine einzelne Spark-Sitzung für mehrere Notebooks freigeben und so wiederholte Verzögerungen beim Starten von Sitzungen vermeiden.

  1. Erstellen Sie in Ihrem ersten Notebook eine Sitzung mit einer benutzerdefinierten ID.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. In einem anderen Notebook können Sie die Sitzung wiederverwenden, indem Sie dieselbe Sitzungs-ID angeben.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Sitzungs-IDs müssen 4 bis 63 Zeichen lang sein, mit einem Kleinbuchstaben beginnen und dürfen nur Kleinbuchstaben, Ziffern und Bindestriche enthalten. Die ID darf nicht mit einem Bindestrich enden. Eine Sitzung mit einer ID im Status TERMINATED kann nicht wiederverwendet werden.

Spark SQL-Magic-Befehle verwenden

Das Paket unterstützt die sparksql-magic-Bibliothek zum Ausführen von Spark SQL-Abfragen in Jupyter-Notebooks. Magic-Befehle sind eine optionale Funktion.

  1. Installieren Sie die erforderlichen Abhängigkeiten.

    pip install IPython sparksql-magic
    
  2. Laden Sie die Magic-Erweiterung.

    %load_ext sparksql_magic
    
  3. Optional: Konfigurieren Sie die Standardeinstellungen.

    %config SparkSql.limit=20
    
  4. SQL-Abfragen ausführen.

    %%sparksql
    SELECT * FROM your_table
    

Wenn Sie erweiterte Optionen verwenden möchten, fügen Sie dem %%sparksql-Befehl Flags hinzu. Wenn Sie beispielsweise Ergebnisse im Cache speichern und eine Ansicht erstellen möchten, führen Sie den folgenden Befehl aus:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

Folgende Optionen sind verfügbar:

  • --cache oder -c: Der DataFrame wird im Cache gespeichert.
  • --eager oder -e: Caches mit Eager Loading.
  • --view VIEW oder -v VIEW: Erstellt eine temporäre Ansicht.
  • --limit N oder -l N: Überschreibt das Standardlimit für die Zeilenanzeige.
  • variable_name: Speichert das Ergebnis in einer Variablen.

Nächste Schritte