Utilizza il client Serverless Spark Connect

Il client Managed Service for Apache Spark Connect è un wrapper del client Apache Spark Connect. Consente alle applicazioni di comunicare con una sessione remota di Managed Service for Apache Spark utilizzando il protocollo Spark Connect. Questo documento mostra come installare, configurare e utilizzare il client.

Prima di iniziare

  1. Assicurati di disporre dei ruoli di Identity and Access Management che contengono le autorizzazioni necessarie per gestire le sessioni interattive e i modelli di sessione.

  2. Se esegui il client al di fuori di Google Cloud, fornisci le credenziali di autenticazione. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file delle chiavi dell'account di servizio.

Installare o disinstallare il client

Puoi installare o disinstallare il pacchetto dataproc-spark-connect utilizzando pip.

Installare

Per installare l'ultima versione del client, esegui il seguente comando:

pip install -U dataproc-spark-connect

Disinstallare

Per disinstallare il client, esegui il seguente comando:

pip uninstall dataproc-spark-connect

Configurare il client

Specifica il progetto e la regione per la sessione. Puoi impostare questi valori utilizzando le variabili di ambiente o l'API Builder nel codice.

Variabili di ambiente

Imposta le variabili di ambiente GOOGLE_CLOUD_PROJECT e GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


API Builder

Utilizza i metodi .projectId() e .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Avviare una sessione Spark

Per avviare una sessione Spark, aggiungi le importazioni richieste all'applicazione o al notebook PySpark, quindi chiama l'API DataprocSparkSession.builder.getOrCreate().

  1. Importa la classe DataprocSparkSession.

  2. Chiama il metodo getOrCreate() per avviare la sessione.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Configurare le proprietà Spark

Per configurare le proprietà Spark, concatenare uno o più metodi .config() al builder.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Utilizzare la configurazione avanzata

Per la configurazione avanzata, utilizza la classe Session per personalizzare le impostazioni, ad esempio la sottorete o la versione del runtime.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Riutilizzare una sessione denominata

Le sessioni denominate consentono di condividere una singola sessione Spark su più notebook, evitando ritardi ripetuti durante l'avvio della sessione.

  1. Nel primo notebook, crea una sessione con un ID personalizzato.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. In un altro notebook, riutilizza la sessione specificando lo stesso ID sessione.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Gli ID sessione devono contenere da 4 a 63 caratteri, iniziare con una lettera minuscola e contenere solo lettere minuscole, numeri e trattini. L'ID non può terminare con un trattino. Non è possibile riutilizzare una sessione con un ID nello stato TERMINATED.

Utilizzare i comandi magici di Spark SQL

Il pacchetto supporta la sparksql-magic libreria per eseguire query Spark SQL nei notebook Jupyter. I comandi magici sono una funzionalità facoltativa.

  1. Installa le dipendenze richieste.

    pip install IPython sparksql-magic
    
  2. Carica l'estensione magica.

    %load_ext sparksql_magic
    
  3. (Facoltativo) Configura le impostazioni predefinite.

    %config SparkSql.limit=20
    
  4. Esegui query SQL.

    %%sparksql
    SELECT * FROM your_table
    

Per utilizzare le opzioni avanzate, aggiungi i flag al comando %%sparksql. Ad esempio, per memorizzare nella cache i risultati e creare una visualizzazione, esegui il seguente comando:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

Sono disponibili le seguenti opzioni:

  • --cache o -c: memorizza nella cache il DataFrame.
  • --eager o -e: memorizza nella cache con il caricamento eager.
  • --view VIEW o -v VIEW: crea una visualizzazione temporanea.
  • --limit N o -l N: sostituisce il limite di visualizzazione delle righe predefinito.
  • variable_name: archivia il risultato in una variabile.

Passaggi successivi