Usar o cliente do Serverless Spark Connect

O cliente do Dataproc Spark Connect é um wrapper do cliente do Apache Spark Connect. Ele permite que os aplicativos se comuniquem com uma sessão remota do Serverless para Apache Spark usando o protocolo Spark Connect. Este documento mostra como instalar, configurar e usar o cliente.

Antes de começar

  1. Verifique se você tem os papéis do Identity and Access Management que contêm as permissões necessárias para gerenciar sessões interativas e modelos de sessão.

  2. Se você executar o cliente fora do Google Cloud, forneça credenciais de autenticação. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo de chave da conta de serviço.

Instalar ou desinstalar o cliente

É possível instalar ou desinstalar o pacote dataproc-spark-connect usando pip.

Instalar

Para instalar a versão mais recente do cliente, execute o seguinte comando:

pip install -U dataproc-spark-connect

Desinstalar

Para desinstalar o cliente, execute o seguinte comando:

pip uninstall dataproc-spark-connect

Configurar o cliente

Especifique o projeto e a região da sua sessão. É possível definir esses valores usando variáveis de ambiente ou a API builder no código.

Variáveis de ambiente

Defina as variáveis de ambiente GOOGLE_CLOUD_PROJECT e GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


API Builder

Use os métodos .projectId() e .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Iniciar uma sessão do Spark

Para iniciar uma sessão do Spark, adicione as importações necessárias ao aplicativo ou notebook PySpark e chame a API DataprocSparkSession.builder.getOrCreate().

  1. Importe a classe DataprocSparkSession.

  2. Chame o método getOrCreate() para iniciar a sessão.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Configurar propriedades do Spark

Para configurar propriedades do Spark, encadeie um ou mais métodos .config() ao builder.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Usar configuração avançada

Para configurações avançadas, use a classe Session para personalizar configurações como a sub-rede ou a versão do ambiente de execução.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Reutilizar uma sessão nomeada

Com as sessões nomeadas, é possível compartilhar uma única sessão do Spark em vários notebooks e evitar atrasos repetidos no tempo de inicialização da sessão.

  1. No primeiro notebook, crie uma sessão com um ID personalizado.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. Em outro notebook, reutilize a sessão especificando o mesmo ID.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Os IDs de sessão precisam ter de 4 a 63 caracteres, começar com uma letra minúscula e conter apenas letras minúsculas, números e hifens. O ID não pode terminar com um hífen. Não é possível reutilizar uma sessão com um ID no estado TERMINATED.

Usar comandos mágicos do Spark SQL

O pacote é compatível com a biblioteca sparksql-magic para executar consultas do Spark SQL em notebooks Jupyter. Os comandos mágicos são um recurso opcional.

  1. Instale as dependências necessárias.

    pip install IPython sparksql-magic
    
  2. Carregue a extensão mágica.

    %load_ext sparksql_magic
    
  3. Opcional: configure as definições padrão.

    %config SparkSql.limit=20
    
  4. Executar consultas SQL.

    %%sparksql
    SELECT * FROM your_table
    

Para usar opções avançadas, adicione flags ao comando %%sparksql. Por exemplo, para armazenar resultados em cache e criar uma visualização, execute o seguinte comando:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

As seguintes opções estão disponíveis:

  • --cache ou -c: armazena o DataFrame em cache.
  • --eager ou -e: caches com carregamento antecipado.
  • --view VIEW ou -v VIEW: cria uma visualização temporária.
  • --limit N ou -l N: substitui o limite padrão de exibição de linhas.
  • variable_name: armazena o resultado em uma variável.

A seguir