Usar o cliente do Spark Connect sem servidor

O cliente do Serviço Gerenciado para Apache Spark Connect é um wrapper do cliente do Apache Spark Connect. Ele permite que os aplicativos se comuniquem com uma sessão remota do Serviço Gerenciado para Apache Spark usando o Spark Connect protocolo. Este documento mostra como instalar, configurar e usar o cliente.

Antes de começar

  1. Verifique se você tem os papéis do Identity and Access Management que contêm as permissões necessárias para gerenciar sessões interativas e modelos de sessão.

  2. Se você executar o cliente fora do Google Cloud, forneça credenciais de autenticação. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo da chave da conta de serviço.

Instalar ou desinstalar o cliente

É possível instalar ou desinstalar o pacote dataproc-spark-connect usando o pip.

Instalar

Para instalar a versão mais recente do cliente, execute o seguinte comando:

pip install -U dataproc-spark-connect

Desinstalar

Para desinstalar o cliente, execute o seguinte comando:

pip uninstall dataproc-spark-connect

Configurar o cliente

Especifique o projeto e a região da sua sessão. É possível definir esses valores usando variáveis de ambiente ou a API Builder no código.

Variáveis de ambiente

Defina as variáveis de ambiente GOOGLE_CLOUD_PROJECT e GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


API Builder

Use os métodos .projectId() e .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Iniciar uma sessão do Spark

Para iniciar uma sessão do Spark, adicione as importações necessárias ao aplicativo ou notebook do PySpark e chame a API DataprocSparkSession.builder.getOrCreate().

  1. Importe a classe DataprocSparkSession.

  2. Chame o método getOrCreate() para iniciar a sessão.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Configurar propriedades do Spark

Para configurar as propriedades do Spark, encadeie um ou mais métodos .config() ao builder.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Usar configuração avançada

Para configuração avançada, use a classe Session para personalizar configurações como a sub-rede ou a versão do ambiente de execução.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Reutilizar uma sessão nomeada

As sessões nomeadas permitem compartilhar uma única sessão do Spark em vários notebooks, evitando atrasos repetidos no tempo de inicialização da sessão.

  1. No primeiro notebook, crie uma sessão com um ID personalizado.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. Em outro notebook, reutilize a sessão especificando o mesmo ID.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Os IDs de sessão precisam ter de 4 a 63 caracteres, começar com uma letra minúscula e conter apenas letras minúsculas, números e hífens. O ID não pode terminar com um hífen. Uma sessão com um ID no estado TERMINATED não pode ser reutilizada.

Usar comandos mágicos do Spark SQL

O pacote oferece suporte à sparksql-magic biblioteca para executar consultas do Spark SQL em notebooks Jupyter. Os comandos mágicos são um recurso opcional.

  1. Instale as dependências necessárias.

    pip install IPython sparksql-magic
    
  2. Carregue a extensão mágica.

    %load_ext sparksql_magic
    
  3. Opcional: configure as definições padrão.

    %config SparkSql.limit=20
    
  4. Execute consultas SQL.

    %%sparksql
    SELECT * FROM your_table
    

Para usar opções avançadas, adicione flags ao comando %%sparksql. Por exemplo, para armazenar resultados em cache e criar uma visualização, execute o seguinte comando:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

As seguintes opções estão disponíveis:

  • --cache ou -c: armazena o DataFrame em cache.
  • --eager ou -e: armazena em cache com carregamento imediato.
  • --view VIEW ou -v VIEW: cria uma visualização temporária.
  • --limit N ou -l N: substitui o limite de exibição de linhas padrão.
  • variable_name: armazena o resultado em uma variável.

A seguir