Use o cliente do Serverless Spark Connect

O cliente Dataproc Spark Connect é um wrapper do cliente Apache Spark Connect. Permite que as aplicações comuniquem com uma sessão remota do Serverless para Apache Spark através do protocolo Spark Connect. Este documento mostra como instalar, configurar e usar o cliente.

Antes de começar

  1. Certifique-se de que tem as funções de gestão de identidade e de acesso que contêm as autorizações necessárias para gerir sessões interativas e modelos de sessões.

  2. Se executar o cliente fora do Google Cloud, indique as credenciais de autenticação. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS para o caminho do ficheiro de chave da conta de serviço.

Instale ou desinstale o cliente

Pode instalar ou desinstalar o pacote dataproc-spark-connect através do pip.

Instalação

Para instalar a versão mais recente do cliente, execute o seguinte comando:

pip install -U dataproc-spark-connect

Desinstalar

Para desinstalar o cliente, execute o seguinte comando:

pip uninstall dataproc-spark-connect

Configure o cliente

Especifique o projeto e a região da sua sessão. Pode definir estes valores através de variáveis de ambiente ou da API Builder no seu código.

Variáveis de ambiente

Defina as variáveis de ambiente GOOGLE_CLOUD_PROJECT e GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


API Builder

Use os métodos .projectId() e .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Inicie uma sessão do Spark

Para iniciar uma sessão do Spark, adicione as importações necessárias à sua aplicação ou bloco de notas do PySpark e, em seguida, chame a API DataprocSparkSession.builder.getOrCreate().

  1. Importe a turma do DataprocSparkSession.

  2. Chame o método getOrCreate() para iniciar a sessão.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Configure as propriedades do Spark

Para configurar propriedades do Spark, encadeie um ou mais métodos ao criador..config()

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Use a configuração avançada

Para uma configuração avançada, use a classe Session para personalizar definições como a sub-rede ou a versão de tempo de execução.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Reutilize uma sessão com nome

As sessões com nome permitem-lhe partilhar uma única sessão do Spark em vários blocos de notas, evitando atrasos repetidos no tempo de arranque da sessão.

  1. No seu primeiro notebook, crie uma sessão com um ID personalizado.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. Noutro bloco de notas, reutilize a sessão especificando o mesmo ID da sessão.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

Os IDs das sessões têm de ter entre 4 e 63 carateres, começar por uma letra minúscula e conter apenas letras minúsculas, números e hífenes. O ID não pode terminar com um hífen. Não é possível reutilizar uma sessão com um ID que se encontra num estado TERMINATED.

Use comandos mágicos do Spark SQL

O pacote suporta a biblioteca sparksql-magic para executar consultas SQL do Spark em blocos de notas do Jupyter. Os comandos mágicos são uma funcionalidade opcional.

  1. Instale as dependências necessárias.

    pip install IPython sparksql-magic
    
  2. Carregue a extensão mágica.

    %load_ext sparksql_magic
    
  3. Opcional: configure as predefinições.

    %config SparkSql.limit=20
    
  4. Executar consultas SQL.

    %%sparksql
    SELECT * FROM your_table
    

Para usar opções avançadas, adicione flags ao comando %%sparksql. Por exemplo, para colocar os resultados em cache e criar uma vista, execute o seguinte comando:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

Estão disponíveis as seguintes opções:

  • --cache ou -c: coloca o DataFrame em cache.
  • --eager ou -e: caches com carregamento rápido.
  • --view VIEW ou -v VIEW: cria uma vista temporária.
  • --limit N ou -l N: substitui o limite de apresentação de linhas predefinido.
  • variable_name: armazena o resultado numa variável.

O que se segue?