Usar el cliente de Serverless Spark Connect

El cliente de Dataproc Spark Connect es un envoltorio del cliente de Apache Spark Connect. Permite que las aplicaciones se comuniquen con una sesión remota de Serverless para Apache Spark mediante el protocolo Spark Connect. En este documento se explica cómo instalar, configurar y usar el cliente.

Antes de empezar

  1. Asegúrate de tener los roles de Gestión de Identidades y Accesos que contengan los permisos necesarios para gestionar las sesiones interactivas y las plantillas de sesión.

  2. Si ejecutas el cliente fuera de Google Cloud, proporciona credenciales de autenticación. Define la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta del archivo de clave de tu cuenta de servicio.

Instalar o desinstalar el cliente

Puedes instalar o desinstalar el paquete dataproc-spark-connect con pip.

Instalar

Para instalar la versión más reciente del cliente, ejecuta el siguiente comando:

pip install -U dataproc-spark-connect

Desinstalar

Para desinstalar el cliente, ejecuta el siguiente comando:

pip uninstall dataproc-spark-connect

Configurar el cliente

Especifica el proyecto y la región de tu sesión. Puedes definir estos valores mediante variables de entorno o con la API de compilación en tu código.

Variables de entorno

Define las variables de entorno GOOGLE_CLOUD_PROJECT y GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


API Builder

Usa los métodos .projectId() y .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Iniciar una sesión de Spark

Para iniciar una sesión de Spark, añade las importaciones necesarias a tu aplicación o cuaderno de PySpark y, a continuación, llama a la API DataprocSparkSession.builder.getOrCreate().

  1. Importa la clase DataprocSparkSession.

  2. Llama al método getOrCreate() para iniciar la sesión.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Configurar propiedades de Spark

Para configurar las propiedades de Spark, encadena uno o varios métodos .config() al compilador.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Usar configuración avanzada

Para la configuración avanzada, usa la clase Session para personalizar ajustes como la subred o la versión del tiempo de ejecución.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Reutilizar una sesión con nombre

Las sesiones con nombre te permiten compartir una única sesión de Spark en varios cuadernos sin tener que esperar a que se inicie la sesión repetidamente.

  1. En tu primer cuaderno, crea una sesión con un ID personalizado.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. En otro cuaderno, reutiliza la sesión especificando el mismo ID de sesión.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Los IDs de sesión deben tener entre 4 y 63 caracteres, empezar por una letra en minúscula y contener solo letras en minúscula, números y guiones. El ID no puede terminar en un guion. No se puede reutilizar una sesión con un ID que esté en estado TERMINATED.

Usar comandos mágicos de Spark SQL

El paquete admite la biblioteca sparksql-magic para ejecutar consultas de Spark SQL en cuadernos de Jupyter. Los comandos mágicos son una función opcional.

  1. Instala las dependencias necesarias.

    pip install IPython sparksql-magic
    
  2. Carga la extensión mágica.

    %load_ext sparksql_magic
    
  3. Opcional: Configura los ajustes predeterminados.

    %config SparkSql.limit=20
    
  4. Ejecutar consultas de SQL.

    %%sparksql
    SELECT * FROM your_table
    

Para usar opciones avanzadas, añade marcas al comando %%sparksql. Por ejemplo, para almacenar en caché los resultados y crear una vista, ejecuta el siguiente comando:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

Se encuentran disponibles las siguientes opciones:

  • --cache o -c: almacena en caché el DataFrame.
  • --eager o -e: cachés con carga anticipada.
  • --view VIEW o -v VIEW: crea una vista temporal.
  • --limit N o -l N: anula el límite de visualización de filas predeterminado.
  • variable_name: almacena el resultado en una variable.

Siguientes pasos