Menggunakan klien Serverless Spark Connect

Klien Dataproc Spark Connect adalah wrapper klien Apache Spark Connect. Hal ini memungkinkan aplikasi berkomunikasi dengan sesi Serverless untuk Apache Spark jarak jauh menggunakan protokol Spark Connect. Dokumen ini menunjukkan cara menginstal, mengonfigurasi, dan menggunakan klien.

Sebelum memulai

  1. Pastikan Anda memiliki peran Identity and Access Management yang berisi izin yang diperlukan untuk mengelola sesi interaktif dan template sesi.

  2. Jika Anda menjalankan klien di luar Google Cloud, berikan kredensial autentikasi. Tetapkan variabel lingkungan GOOGLE_APPLICATION_CREDENTIALS ke jalur file kunci akun layanan Anda.

Menginstal atau meng-uninstal klien

Anda dapat menginstal atau meng-uninstal paket dataproc-spark-connect menggunakan pip.

Instal

Untuk menginstal klien versi terbaru, jalankan perintah berikut:

pip install -U dataproc-spark-connect

Uninstal

Untuk meng-uninstal klien, jalankan perintah berikut:

pip uninstall dataproc-spark-connect

Mengonfigurasi klien

Tentukan project dan region untuk sesi Anda. Anda dapat menetapkan nilai ini menggunakan variabel lingkungan atau dengan menggunakan builder API dalam kode Anda.

Variabel lingkungan

Tetapkan variabel lingkungan GOOGLE_CLOUD_PROJECT dan GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


Builder API

Gunakan metode .projectId() dan .location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Mulai sesi Spark

Untuk memulai sesi Spark, tambahkan impor yang diperlukan ke aplikasi atau notebook PySpark Anda, lalu panggil DataprocSparkSession.builder.getOrCreate() API.

  1. Impor class DataprocSparkSession.

  2. Panggil metode getOrCreate() untuk memulai sesi.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Mengonfigurasi properti Spark

Untuk mengonfigurasi properti Spark, gabungkan satu atau beberapa metode .config() ke builder.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

Menggunakan konfigurasi lanjutan

Untuk konfigurasi lanjutan, gunakan class Session untuk menyesuaikan setelan seperti subnetwork atau versi runtime.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

Menggunakan kembali sesi bernama

Sesi bernama memungkinkan Anda membagikan satu sesi Spark di beberapa notebook sekaligus menghindari penundaan waktu mulai sesi yang berulang.

  1. Di notebook pertama Anda, buat sesi dengan ID kustom.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. Di notebook lain, gunakan kembali sesi dengan menentukan ID sesi yang sama.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

ID sesi harus memiliki panjang 4-63 karakter, diawali dengan huruf kecil, dan hanya berisi huruf kecil, angka, dan tanda hubung. ID tidak boleh diakhiri dengan tanda hubung. Sesi dengan ID yang berada dalam status TERMINATED tidak dapat digunakan kembali.

Menggunakan perintah magic Spark SQL

Paket ini mendukung library sparksql-magic untuk menjalankan kueri Spark SQL di notebook Jupyter. Perintah ajaib adalah fitur opsional.

  1. Instal dependensi yang diperlukan.

    pip install IPython sparksql-magic
    
  2. Muat ekstensi magic.

    %load_ext sparksql_magic
    
  3. Opsional: Konfigurasi setelan default.

    %config SparkSql.limit=20
    
  4. Menjalankan kueri SQL.

    %%sparksql
    SELECT * FROM your_table
    

Untuk menggunakan opsi lanjutan, tambahkan tanda ke perintah %%sparksql. Misalnya, untuk meng-cache hasil dan membuat tampilan, jalankan perintah berikut:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

Tersedia opsi-opsi berikut:

  • --cache atau -c: menyimpan DataFrame dalam cache.
  • --eager atau -e: cache dengan pemuatan cepat.
  • --view VIEW atau -v VIEW: membuat tampilan sementara.
  • --limit N atau -l N: menggantikan batas tampilan baris default.
  • variable_name: menyimpan hasilnya dalam variabel.

Langkah berikutnya