Serverless Spark Connect-Client verwenden

Der Managed Service for Apache Spark Connect-Client ist ein Wrapper des Apache Spark Connect-Clients. Damit können Anwendungen über das Spark Connect-Protokoll mit einer Remote-Sitzung von Managed Service for Apache Spark kommunizieren. In diesem Dokument wird beschrieben, wie Sie den Client installieren, konfigurieren und verwenden.

Hinweis

  1. Prüfen Sie, ob Sie die Identity and Access Management-Rollen haben, die die Berechtigungen zum Verwalten interaktiver Sitzungen und Sitzungsvorlagen enthalten.

  2. Wenn Sie den Client außerhalb von Google Cloudausführen, geben Sie Authentifizierungsdaten an. Legen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS auf den Pfad der Schlüsseldatei Ihres Dienstkontos fest.

Client installieren oder deinstallieren

Sie können das Paket dataproc-spark-connect mit pip installieren oder deinstallieren.

Installieren

Führen Sie den folgenden Befehl aus, um die neueste Version des Clients zu installieren:

pip install -U dataproc-spark-connect

Deinstallieren

Führen Sie den folgenden Befehl aus, um den Client zu deinstallieren:

pip uninstall dataproc-spark-connect

Client konfigurieren

Geben Sie das Projekt und die Region für Ihre Sitzung an. Sie können diese Werte über Umgebungsvariablen oder mit der Builder API in Ihrem Code festlegen.

Umgebungsvariablen

Legen Sie die Umgebungsvariablen GOOGLE_CLOUD_PROJECT und GOOGLE_CLOUD_REGION fest.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


Builder API

Verwenden Sie die Methoden .projectId() und .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Spark-Sitzung starten

Fügen Sie die erforderlichen Importe Ihrer PySpark-Anwendung oder Ihrem Notebook hinzu, um eine Spark-Sitzung zu starten. Rufen Sie dann die API DataprocSparkSession.builder.getOrCreate() auf.

  1. Importieren Sie die Klasse DataprocSparkSession.

  2. Rufen Sie die Methode getOrCreate() auf, um die Sitzung zu starten.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Spark-Attribute konfigurieren

Verketten Sie eine oder mehrere .config()-Methoden mit dem Builder, um Spark-Attribute zu konfigurieren.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Erweiterte Konfiguration verwenden

Verwenden Sie für die erweiterte Konfiguration die Klasse Session, um Einstellungen wie das Subnetzwerk oder die Runtime-Version anzupassen.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Benannte Sitzung wiederverwenden

Mit benannten Sitzungen können Sie eine einzelne Spark-Sitzung für mehrere Notebooks freigeben und so wiederholte Verzögerungen beim Starten der Sitzung vermeiden.

  1. Erstellen Sie in Ihrem ersten Notebook eine Sitzung mit einer benutzerdefinierten ID.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. Verwenden Sie die Sitzung in einem anderen Notebook wieder, indem Sie dieselbe Sitzungs-ID angeben.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Sitzungs-IDs müssen zwischen 4 und 63 Zeichen lang sein, mit einem Kleinbuchstaben beginnen und dürfen nur Kleinbuchstaben, Ziffern und Bindestriche enthalten. Die ID darf nicht mit einem Bindestrich enden. Eine Sitzung mit einer ID im Status TERMINATED kann nicht wiederverwendet werden.

Spark SQL-Magic-Befehle verwenden

Das Paket unterstützt die sparksql-magic Bibliothek um Spark SQL-Abfragen in Jupyter-Notebooks auszuführen. Magic-Befehle sind eine optionale Funktion.

  1. Installieren Sie die erforderlichen Abhängigkeiten.

    pip install IPython sparksql-magic
    
  2. Laden Sie die Magic-Erweiterung.

    %load_ext sparksql_magic
    
  3. Optional: Konfigurieren Sie die Standardeinstellungen.

    %config SparkSql.limit=20
    
  4. Führen Sie SQL-Abfragen aus.

    %%sparksql
    SELECT * FROM your_table
    

Wenn Sie erweiterte Optionen verwenden möchten, fügen Sie dem Befehl %%sparksql Flags hinzu. Führen Sie beispielsweise den folgenden Befehl aus, um Ergebnisse im Cache zu speichern und eine Ansicht zu erstellen:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

Folgende Optionen sind verfügbar:

  • --cache oder -c: Speichert das DataFrame im Cache.
  • --eager oder -e: Speichert das DataFrame mit Eager Loading im Cache.
  • --view VIEW oder -v VIEW: Erstellt eine temporäre Ansicht.
  • --limit N oder -l N: Überschreibt das Standardlimit für die Zeilenanzeige.
  • variable_name: Speichert das Ergebnis in einer Variablen.

Nächste Schritte