서버리스 Spark Connect 클라이언트 사용

Managed Service for Apache Spark Connect 클라이언트는 Apache Spark Connect 클라이언트의 래퍼입니다. 이를 통해 애플리케이션은 Spark Connect 프로토콜을 사용하여 원격 Managed Service for Apache Spark 세션과 통신할 수 있습니다. 이 문서에서는 클라이언트를 설치, 구성, 사용하는 방법을 보여줍니다.

시작하기 전에

  1. 대화형 세션과 세션 템플릿을 관리하는 데 필요한 권한이 포함된 Identity and Access Management 역할이 있는지 확인합니다.

  2. Google Cloud외부에서 클라이언트를 실행하는 경우 인증 사용자 인증 정보를 제공합니다. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키 파일의 경로로 설정합니다.

클라이언트 설치 또는 제거

pip를 사용하여 dataproc-spark-connect 패키지를 설치하거나 제거할 수 있습니다.

설치

최신 버전의 클라이언트를 설치하려면 다음 명령어를 실행합니다.

pip install -U dataproc-spark-connect

제거

클라이언트를 제거하려면 다음 명령어를 실행합니다.

pip uninstall dataproc-spark-connect

클라이언트 구성

세션의 프로젝트와 리전을 지정합니다. 환경 변수를 사용하거나 코드에서 빌더 API를 사용하여 이러한 값을 설정할 수 있습니다.

환경 변수

GOOGLE_CLOUD_PROJECTGOOGLE_CLOUD_REGION 환경 변수를 설정합니다.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


Builder API

.projectId().location() 메서드를 사용합니다.

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Spark 세션 시작

Spark 세션을 시작하려면 PySpark 애플리케이션 또는 노트북에 필요한 가져오기를 추가한 다음 DataprocSparkSession.builder.getOrCreate() API를 호출합니다.

  1. DataprocSparkSession 클래스를 가져옵니다.

  2. getOrCreate() 메서드를 호출하여 세션을 시작합니다.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Spark 속성 구성

Spark 속성을 구성하려면 하나 이상의 .config() 메서드를 빌더에 연결합니다.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

고급 구성 사용

고급 구성의 경우 Session 클래스를 사용하여 서브네트워크 또는 런타임 버전과 같은 설정을 맞춤설정합니다.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

명명된 세션 재사용

명명된 세션을 사용하면 반복되는 세션 시작 시간 지연을 방지하면서 여러 노트북에서 단일 Spark 세션을 공유할 수 있습니다.

  1. 첫 번째 노트북에서 맞춤 ID로 세션을 만듭니다.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. 다른 노트북에서 동일한 세션 ID를 지정하여 세션을 재사용합니다.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

세션 ID는 4~63자(영문 기준)여야 하고 소문자로 시작해야 하며 소문자, 숫자, 하이픈만 포함해야 합니다. ID는 하이픈으로 끝날 수 없습니다. TERMINATED 상태인 ID가 있는 세션은 재사용할 수 없습니다.

Spark SQL 매직 명령어 사용

이 패키지는 Jupyter 노트북에서 Spark SQL 쿼리를 실행하는 sparksql-magic 라이브러리를 지원합니다. 매직 명령어는 선택적 기능입니다.

  1. 필수 종속 항목을 설치합니다.

    pip install IPython sparksql-magic
    
  2. 매직 확장 프로그램을 로드합니다.

    %load_ext sparksql_magic
    
  3. 선택사항: 기본 설정을 구성합니다.

    %config SparkSql.limit=20
    
  4. SQL 쿼리를 실행합니다.

    %%sparksql
    SELECT * FROM your_table
    

고급 옵션을 사용하려면 %%sparksql 명령어에 플래그를 추가하세요. 예를 들어 결과를 캐시하고 뷰를 만들려면 다음 명령어를 실행합니다.

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

사용할 수 있는 옵션은 다음과 같습니다.

  • --cache 또는 -c: DataFrame을 캐시합니다.
  • --eager 또는 -e: 즉시 로딩으로 캐시합니다.
  • --view VIEW 또는 -v VIEW: 임시 보기를 만듭니다.
  • --limit N 또는 -l N: 기본 행 표시 한도를 재정의합니다.
  • variable_name: 결과를 변수에 저장합니다.

다음 단계