שימוש בלקוח Serverless Spark Connect

לקוח Dataproc Spark Connect הוא wrapper של לקוח Apache Spark Connect. הוא מאפשר לאפליקציות לתקשר עם סשן מרוחק של Serverless for Apache Spark באמצעות פרוטוקול Spark Connect. במאמר הזה מוסבר איך להתקין את הלקוח, להגדיר אותו ולהשתמש בו.

לפני שמתחילים

  1. חשוב לוודא שיש לכם תפקידים בניהול זהויות והרשאות גישה (IAM) שכוללים את ההרשאות שנדרשות לניהול של סשנים אינטראקטיביים ותבניות של סשנים.

  2. אם מריצים את הלקוח מחוץ ל- Google Cloud, צריך לספק פרטי כניסה לאימות. מגדירים את משתנה הסביבה GOOGLE_APPLICATION_CREDENTIALS לנתיב של קובץ המפתח של חשבון השירות.

התקנה או הסרה של הלקוח

אפשר להתקין או להסיר את החבילה dataproc-spark-connect באמצעות pip.

התקנה

כדי להתקין את הגרסה האחרונה של הלקוח, מריצים את הפקודה הבאה:

pip install -U dataproc-spark-connect

הסרה

כדי להסיר את הלקוח, מריצים את הפקודה הבאה:

pip uninstall dataproc-spark-connect

הגדרת הלקוח

מציינים את הפרויקט והאזור של הסשן. אפשר להגדיר את הערכים האלה באמצעות משתני סביבה או באמצעות builder API בקוד.

משתני סביבה

מגדירים את משתני הסביבה GOOGLE_CLOUD_PROJECT ו-GOOGLE_CLOUD_REGION.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


Builder API

משתמשים בשיטות .projectId() ו-.location().

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

התחלת סשן Spark

כדי להתחיל סשן Spark, מוסיפים את הייבוא הנדרש לאפליקציית PySpark או למחברת, ואז קוראים ל-API‏ DataprocSparkSession.builder.getOrCreate().

  1. מייבאים את הכיתה DataprocSparkSession.

  2. מבצעים קריאה ל-getOrCreate() כדי להתחיל את הסשן.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

הגדרת מאפייני Spark

כדי להגדיר מאפייני Spark, משרשרים שיטה אחת או יותר ל-builder..config()

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

שימוש בהגדרות מתקדמות

להגדרות מתקדמות, אפשר להשתמש במחלקת Session כדי להתאים אישית הגדרות כמו רשת המשנה או גרסת זמן הריצה.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

שימוש חוזר בסשן עם שם

סשנים עם שם מאפשרים לכם לשתף סשן Spark יחיד בין כמה מחברות, תוך הימנעות מעיכובים חוזרים בזמן ההפעלה של הסשן.

  1. ב-Notebook הראשון, יוצרים סשן עם מזהה בהתאמה אישית.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. במחברת אחרת, אפשר להשתמש שוב בסשן על ידי ציון אותו מזהה סשן.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

מזהי הסשנים צריכים להיות באורך של 4 עד 63 תווים, להתחיל באות קטנה ולכלול רק אותיות קטנות, מספרים ומקפים. המזהה לא יכול להסתיים במקף. אי אפשר לעשות שימוש חוזר בסשן עם מזהה שנמצא במצב TERMINATED.

שימוש בפקודות Magic של Spark SQL

החבילה תומכת בספרייה sparksql-magic להרצת שאילתות Spark SQL במחברות Jupyter. פקודות קסם הן תכונה אופציונלית.

  1. מתקינים את יחסי התלות הנדרשים.

    pip install IPython sparksql-magic
    
  2. טוענים את התוסף הקסום.

    %load_ext sparksql_magic
    
  3. אופציונלי: הגדרת הגדרות ברירת מחדל.

    %config SparkSql.limit=20
    
  4. הפעלת שאילתות SQL.

    %%sparksql
    SELECT * FROM your_table
    

כדי להשתמש באפשרויות מתקדמות, מוסיפים דגלים לפקודה %%sparksql. לדוגמה, כדי לשמור תוצאות במטמון וליצור תצוגה, מריצים את הפקודה הבאה:

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

אלו האפשרויות הזמינות:

  • --cache או -c: שמירת ה-DataFrame במטמון.
  • --eager או -e: מטמונים עם טעינה מראש.
  • --view VIEW או -v VIEW: יוצר תצוגה זמנית.
  • --limit N או -l N: מבטל את מגבלת ברירת המחדל של הצגת השורות.
  • variable_name: מאחסן את התוצאה במשתנה.

המאמרים הבאים