Usa el cliente de Serverless Spark Connect

El cliente de Dataproc Spark Connect es un wrapper del cliente de Apache Spark Connect. Permite que las aplicaciones se comuniquen con una sesión remota de Serverless for Apache Spark a través del protocolo Spark Connect. En este documento, se muestra cómo instalar, configurar y usar el cliente.

Antes de comenzar

  1. Asegúrate de tener los roles de Identity and Access Management que contienen los permisos necesarios para administrar las sesiones interactivas y las plantillas de sesiones.

  2. Si ejecutas el cliente fuera de Google Cloud, proporciona credenciales de autenticación. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta de acceso al archivo de claves de tu cuenta de servicio.

Instala o desinstala el cliente

Puedes instalar o desinstalar el paquete dataproc-spark-connect con pip.

Instalar

Para instalar la versión más reciente del cliente, ejecuta el siguiente comando:

pip install -U dataproc-spark-connect

Desinstalar

Para desinstalar el cliente, ejecuta el siguiente comando:

pip uninstall dataproc-spark-connect

Configura el cliente

Especifica el proyecto y la región para tu sesión. Puedes establecer estos valores con variables de entorno o con la API de compilador en tu código.

Variables de entorno

Configura las variables de entorno GOOGLE_CLOUD_PROJECT y GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


API de Builder

Usa los métodos .projectId() y .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Inicia una sesión de Spark

Para iniciar una sesión de Spark, agrega las importaciones necesarias a tu aplicación o notebook de PySpark y, luego, llama a la API de DataprocSparkSession.builder.getOrCreate().

  1. Importa la clase DataprocSparkSession.

  2. Llama al método getOrCreate() para iniciar la sesión.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Configura las propiedades de Spark

Para configurar las propiedades de Spark, encadena uno o más métodos .config() al compilador.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Cómo usar la configuración avanzada

Para la configuración avanzada, usa la clase Session para personalizar parámetros de configuración, como la subred o la versión del entorno de ejecución.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Cómo volver a usar una sesión con nombre

Las sesiones con nombre te permiten compartir una sola sesión de Spark en varios notebooks y, al mismo tiempo, evitar las demoras repetidas en el tiempo de inicio de la sesión.

  1. En tu primer notebook, crea una sesión con un ID personalizado.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. En otro notebook, reutiliza la sesión especificando el mismo ID de sesión.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Los IDs de sesión deben tener entre 4 y 63 caracteres, comenzar con una letra minúscula y contener solo letras minúsculas, números y guiones. El ID no puede terminar con un guion. No se puede reutilizar una sesión con un ID en estado TERMINATED.

Usa comandos mágicos de Spark SQL

El paquete admite la biblioteca sparksql-magic para ejecutar consultas de Spark SQL en notebooks de Jupyter. Los comandos mágicos son una función opcional.

  1. Instala las dependencias requeridas.

    pip install IPython sparksql-magic
    
  2. Carga la extensión mágica.

    %load_ext sparksql_magic
    
  3. Opcional: Configura los parámetros de configuración predeterminados.

    %config SparkSql.limit=20
    
  4. Ejecutar consultas en SQL

    %%sparksql
    SELECT * FROM your_table
    

Para usar opciones avanzadas, agrega marcas al comando %%sparksql. Por ejemplo, para almacenar en caché los resultados y crear una vista, ejecuta el siguiente comando:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

Las siguientes opciones están disponibles:

  • --cache o -c: Almacena en caché el DataFrame.
  • --eager o -e: Cachés con carga anticipada.
  • --view VIEW o -v VIEW: Crea una vista temporal.
  • --limit N o -l N: Anula el límite de visualización de filas predeterminado.
  • variable_name: Almacena el resultado en una variable.

¿Qué sigue?