Utiliser le client Serverless Spark Connect

Le client Dataproc Spark Connect est un wrapper du client Apache Spark Connect. Il permet aux applications de communiquer avec une session Serverless pour Apache Spark distante à l'aide du protocole Spark Connect. Ce document vous explique comment installer, configurer et utiliser le client.

Avant de commencer

  1. Assurez-vous de disposer des rôles Identity and Access Management qui contiennent les autorisations nécessaires pour gérer les sessions interactives et les modèles de session.

  2. Si vous exécutez le client en dehors de Google Cloud, fournissez des identifiants d'authentification. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS sur le chemin d'accès au fichier de clé de votre compte de service.

Installer ou désinstaller le client

Vous pouvez installer ou désinstaller le package dataproc-spark-connect à l'aide de pip.

Installer

Pour installer la dernière version du client, exécutez la commande suivante :

pip install -U dataproc-spark-connect

Désinstaller

Pour désinstaller le client, exécutez la commande suivante :

pip uninstall dataproc-spark-connect

Configurer le client

Spécifiez le projet et la région pour votre session. Vous pouvez définir ces valeurs à l'aide de variables d'environnement ou de l'API Builder dans votre code.

Variables d'environnement

Définissez les variables d'environnement GOOGLE_CLOUD_PROJECT et GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


API Builder

Utilisez les méthodes .projectId() et .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Démarrer une session Spark

Pour démarrer une session Spark, ajoutez les importations requises à votre application ou notebook PySpark, puis appelez l'API DataprocSparkSession.builder.getOrCreate().

  1. Importez la classe DataprocSparkSession.

  2. Appelez la méthode getOrCreate() pour démarrer la session.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Configurer les propriétés Spark

Pour configurer les propriétés Spark, enchaînez une ou plusieurs méthodes .config() au générateur.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Utiliser la configuration avancée

Pour une configuration avancée, utilisez la classe Session afin de personnaliser des paramètres tels que le sous-réseau ou la version de l'environnement d'exécution.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Réutiliser une session nommée

Les sessions nommées vous permettent de partager une même session Spark entre plusieurs notebooks, tout en évitant les retards répétés au démarrage des sessions.

  1. Dans votre premier notebook, créez une session avec un ID personnalisé.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. Dans un autre notebook, réutilisez la session en spécifiant le même ID de session.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Les ID de session doivent comporter entre 4 et 63 caractères, commencer par une lettre minuscule et ne contenir que des lettres minuscules, des chiffres et des traits d'union. L'ID ne peut pas se terminer par un trait d'union. Une session dont l'ID est à l'état TERMINATED ne peut pas être réutilisée.

Utiliser les commandes magiques Spark SQL

Le package est compatible avec la bibliothèque sparksql-magic pour exécuter des requêtes SparkSQL dans les notebooks Jupyter. Les commandes magiques sont une fonctionnalité facultative.

  1. Installez les dépendances requises.

    pip install IPython sparksql-magic
    
  2. Chargez l'extension magique.

    %load_ext sparksql_magic
    
  3. Facultatif : configurez les paramètres par défaut.

    %config SparkSql.limit=20
    
  4. Exécuter des requêtes SQL

    %%sparksql
    SELECT * FROM your_table
    

Pour utiliser les options avancées, ajoutez des options à la commande %%sparksql. Par exemple, pour mettre en cache les résultats et créer une vue, exécutez la commande suivante :

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

Les options suivantes sont disponibles :

  • --cache ou -c : met en cache le DataFrame.
  • --eager ou -e : caches avec chargement différé.
  • --view VIEW ou -v VIEW : crée une vue temporaire.
  • --limit N ou -l N : remplace la limite d'affichage des lignes par défaut.
  • variable_name : stocke le résultat dans une variable.

Étapes suivantes