Usa el cliente de Serverless Spark Connect

El cliente de Managed Service para Apache Spark Connect es un wrapper del cliente de Apache Spark Connect. Permite que las aplicaciones se comuniquen con una sesión remota de Managed Service para Apache Spark mediante el protocolo de Spark Connect. En este documento, se muestra cómo instalar, configurar y usar el cliente.

Antes de comenzar

  1. Asegúrate de tener los roles de Identity and Access Management que contienen los permisos necesarios para administrar las sesiones interactivas y las plantillas de sesión.

  2. Si ejecutas el cliente fuera de Google Cloud, proporciona credenciales de autenticación. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta de acceso del archivo de claves de tu cuenta de servicio.

Instala o desinstala el cliente

Puedes instalar o desinstalar el paquete dataproc-spark-connect con pip.

Instalar

Para instalar la versión más reciente del cliente, ejecuta el siguiente comando:

pip install -U dataproc-spark-connect

Desinstalar

Para desinstalar el cliente, ejecuta el siguiente comando:

pip uninstall dataproc-spark-connect

Configura el cliente

Especifica el proyecto y la región de tu sesión. Puedes establecer estos valores con variables de entorno o con la API de compilador en tu código.

Variables de entorno

Establece las variables de entorno GOOGLE_CLOUD_PROJECT y GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


API de compilador

Usa los métodos .projectId() y .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Inicia una sesión de Spark

Para iniciar una sesión de Spark, agrega las importaciones necesarias a tu aplicación o notebook de PySpark y, luego, llama a la API de DataprocSparkSession.builder.getOrCreate().

  1. Importa la clase DataprocSparkSession.

  2. Llama al método getOrCreate() para iniciar la sesión.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Configura las propiedades de Spark

Para configurar las propiedades de Spark, encadena uno o más métodos .config() al compilador.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Usa la configuración avanzada

Para la configuración avanzada, usa la clase Session para personalizar parámetros como la subred o la versión del entorno de ejecución.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Reutiliza una sesión con nombre

Las sesiones con nombre te permiten compartir una sola sesión de Spark en varios notebooks y, al mismo tiempo, evitar retrasos repetidos en el tiempo de inicio de la sesión.

  1. En tu primer notebook, crea una sesión con un ID personalizado.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. En otro notebook, reutiliza la sesión especificando el mismo ID de sesión.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Los IDs de sesión deben tener entre 4 y 63 caracteres, comenzar con una letra minúscula y contener solo letras minúsculas, números y guiones. El ID no puede terminar con un guion. No se puede reutilizar una sesión con un ID que esté en estado TERMINATED.

Usa comandos mágicos de Spark SQL

El paquete admite la sparksql-magic biblioteca para ejecutar consultas de Spark SQL en notebooks de Jupyter. Los comandos mágicos son una función opcional.

  1. Instala las dependencias requeridas.

    pip install IPython sparksql-magic
    
  2. Carga la extensión mágica.

    %load_ext sparksql_magic
    
  3. Opcional: Configura los parámetros predeterminados.

    %config SparkSql.limit=20
    
  4. Ejecuta consultas en SQL.

    %%sparksql
    SELECT * FROM your_table
    

Para usar opciones avanzadas, agrega marcas al comando %%sparksql. Por ejemplo, para almacenar en caché los resultados y crear una vista, ejecuta el siguiente comando:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

Las siguientes opciones están disponibles:

  • --cache o -c: Almacena en caché el DataFrame.
  • --eager o -e: Almacena en caché con carga ansiosa.
  • --view VIEW o -v VIEW: Crea una vista temporal.
  • --limit N o -l N: Anula el límite de visualización de filas predeterminado.
  • variable_name: Almacena el resultado en una variable.

¿Qué sigue?