Utilizza il client Serverless Spark Connect

Il client Dataproc Spark Connect è un wrapper del client Apache Spark Connect. Consente alle applicazioni di comunicare con una sessione Serverless per Apache Spark remota utilizzando il protocollo Spark Connect. Questo documento mostra come installare, configurare e utilizzare il client.

Prima di iniziare

  1. Assicurati di disporre dei ruoli Identity and Access Management che contengono le autorizzazioni necessarie per gestire le sessioni interattive e i modelli di sessione.

  2. Se esegui il client al di fuori di Google Cloud, fornisci le credenziali di autenticazione. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file di chiave del account di servizio.

Installare o disinstallare il client

Puoi installare o disinstallare il pacchetto dataproc-spark-connect utilizzando pip.

Installa

Per installare l'ultima versione del client, esegui il seguente comando:

pip install -U dataproc-spark-connect

Disinstalla

Per disinstallare il client, esegui questo comando:

pip uninstall dataproc-spark-connect

Configura il client

Specifica il progetto e la regione per la sessione. Puoi impostare questi valori utilizzando le variabili di ambiente o l'API Builder nel tuo codice.

Variabili di ambiente

Imposta le variabili di ambiente GOOGLE_CLOUD_PROJECT e GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


API Builder

Utilizza i metodi .projectId() e .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Avvia una sessione Spark

Per avviare una sessione Spark, aggiungi le importazioni richieste all'applicazione PySpark o al notebook, quindi chiama l'API DataprocSparkSession.builder.getOrCreate().

  1. Importa il corso DataprocSparkSession.

  2. Chiama il metodo getOrCreate() per avviare la sessione.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Configura le proprietà Spark

Per configurare le proprietà Spark, concatena uno o più metodi .config() al builder.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Utilizzare la configurazione avanzata

Per la configurazione avanzata, utilizza la classe Session per personalizzare impostazioni come la subnet o la versione del runtime.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Riutilizzare una sessione denominata

Le sessioni denominate consentono di condividere una singola sessione Spark in più notebook evitando ritardi ripetuti all'avvio della sessione.

  1. Nel primo notebook, crea una sessione con un ID personalizzato.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. In un altro blocco note, riutilizza la sessione specificando lo stesso ID sessione.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Gli ID sessione devono contenere da 4 a 63 caratteri, iniziare con una lettera minuscola e contenere solo lettere minuscole, numeri e trattini. L'ID non può terminare con un trattino. Una sessione con un ID nello stato TERMINATED non può essere riutilizzata.

Utilizzare i comandi magici Spark SQL

Il pacchetto supporta la libreria sparksql-magic per eseguire query Spark SQL nei blocchi note Jupyter. I comandi magici sono una funzionalità facoltativa.

  1. Installa le dipendenze richieste.

    pip install IPython sparksql-magic
    
  2. Carica l'estensione Magic.

    %load_ext sparksql_magic
    
  3. (Facoltativo) Configura le impostazioni predefinite.

    %config SparkSql.limit=20
    
  4. Esegui query SQL.

    %%sparksql
    SELECT * FROM your_table
    

Per utilizzare le opzioni avanzate, aggiungi flag al comando %%sparksql. Ad esempio, per memorizzare nella cache i risultati e creare una vista, esegui questo comando:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

Sono disponibili le seguenti opzioni:

  • --cache o -c: memorizza nella cache il DataFrame.
  • --eager o -e: cache con caricamento rapido.
  • --view VIEW o -v VIEW: crea una visualizzazione temporanea.
  • --limit N o -l N: sostituisce il limite di visualizzazione predefinito delle righe.
  • variable_name: memorizza il risultato in una variabile.

Passaggi successivi