서버리스 Spark Connect 클라이언트 사용

Dataproc Spark Connect 클라이언트는 Apache Spark Connect 클라이언트의 래퍼입니다. 이를 통해 애플리케이션은 Spark Connect 프로토콜을 사용하여 원격 Apache Spark용 서버리스 세션과 통신할 수 있습니다. 이 문서에서는 클라이언트를 설치, 구성, 사용하는 방법을 보여줍니다.

시작하기 전에

  1. 대화형 세션과 세션 템플릿을 관리하는 데 필요한 권한이 포함된 Identity and Access Management 역할이 있는지 확인합니다.

  2. Google Cloud외부에서 클라이언트를 실행하는 경우 인증 사용자 인증 정보를 제공합니다. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키 파일의 경로로 설정합니다.

클라이언트 설치 또는 제거

pip를 사용하여 dataproc-spark-connect 패키지를 설치하거나 제거할 수 있습니다.

설치

최신 버전의 클라이언트를 설치하려면 다음 명령어를 실행합니다.

pip install -U dataproc-spark-connect

제거

클라이언트를 제거하려면 다음 명령어를 실행합니다.

pip uninstall dataproc-spark-connect

클라이언트 구성

세션의 프로젝트와 리전을 지정합니다. 환경 변수를 사용하거나 코드에서 빌더 API를 사용하여 이러한 값을 설정할 수 있습니다.

환경 변수

GOOGLE_CLOUD_PROJECTGOOGLE_CLOUD_REGION 환경 변수를 설정합니다.

# Google Cloud Configuration for Dataproc Spark Connect Integration Tests
# Copy this file to .env and fill in your actual values

# ============================================================================
# REQUIRED CONFIGURATION
# ============================================================================

# Your Google Cloud Project ID
GOOGLE_CLOUD_PROJECT="your-project-id"

# Google Cloud Region where Dataproc sessions will be created
GOOGLE_CLOUD_REGION="us-central1"

# Path to service account key file (if using SERVICE_ACCOUNT auth)
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================

# Authentication type (SERVICE_ACCOUNT or END_USER_CREDENTIALS). If not set, API default is used.
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="SERVICE_ACCOUNT"
# DATAPROC_SPARK_CONNECT_AUTH_TYPE="END_USER_CREDENTIALS"

# Service account email for workload authentication (optional)
# DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT="your-service-account@your-project.iam.gserviceaccount.com"

# ============================================================================
# SESSION CONFIGURATION
# ============================================================================

# Session timeout in seconds (how long session stays active)
# DATAPROC_SPARK_CONNECT_TTL_SECONDS="3600"

# Session idle timeout in seconds (how long session stays active when idle)
# DATAPROC_SPARK_CONNECT_IDLE_TTL_SECONDS="900"

# Automatically terminate session when Python process exits (true/false)
# DATAPROC_SPARK_CONNECT_SESSION_TERMINATE_AT_EXIT="false"

# Custom file path for storing active session information
# DATAPROC_SPARK_CONNECT_ACTIVE_SESSION_FILE_PATH="/tmp/dataproc_spark_connect_session"

# ============================================================================
# DATA SOURCE CONFIGURATION
# ============================================================================

# Default data source for Spark SQL (currently only supports "bigquery")
# Only available for Dataproc runtime version 2.3
# DATAPROC_SPARK_CONNECT_DEFAULT_DATASOURCE="bigquery"

# ============================================================================
# ADVANCED CONFIGURATION
# ============================================================================

# Custom Dataproc API endpoint (uncomment if needed)
# GOOGLE_CLOUD_DATAPROC_API_ENDPOINT="your-region-dataproc.googleapis.com"

# Subnet URI for Dataproc Spark Connect (full resource name format)
# Example: projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name
# DATAPROC_SPARK_CONNECT_SUBNET="projects/your-project-id/regions/us-central1/subnetworks/your-subnet-name"


Builder API

.projectId().location() 메서드를 사용합니다.

spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()

Spark 세션 시작

Spark 세션을 시작하려면 PySpark 애플리케이션 또는 노트북에 필요한 가져오기를 추가한 다음 DataprocSparkSession.builder.getOrCreate() API를 호출합니다.

  1. DataprocSparkSession 클래스를 가져옵니다.

  2. getOrCreate() 메서드를 호출하여 세션을 시작합니다.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    spark = DataprocSparkSession.builder.getOrCreate()
    

Spark 속성 구성

Spark 속성을 구성하려면 하나 이상의 .config() 메서드를 빌더에 연결합니다.

from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()

고급 구성 사용

고급 구성의 경우 Session 클래스를 사용하여 서브넷 또는 런타임 버전과 같은 설정을 맞춤설정합니다.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'

spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()

명명된 세션 재사용

명명된 세션을 사용하면 세션 시작 시간 지연을 반복하지 않으면서 여러 노트북에서 단일 Spark 세션을 공유할 수 있습니다.

  1. 첫 번째 노트북에서 맞춤 ID로 세션을 만듭니다.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(1, 'data')], ['id', 'value'])
    df.show()
    
  2. 다른 노트북에서 동일한 세션 ID를 지정하여 세션을 재사용합니다.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    session_id = 'my-ml-pipeline-session'
    spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate()
    df = spark.createDataFrame([(2, 'more-data')], ['id', 'value'])
    df.show()
    

세션 ID는 4~63자(영문 기준)여야 하고 소문자로 시작해야 하며 소문자, 숫자, 하이픈만 포함해야 합니다. ID는 하이픈으로 끝날 수 없습니다. ID가 TERMINATED 상태인 세션은 재사용할 수 없습니다.

Spark SQL 매직 명령어 사용

이 패키지는 sparksql-magic 라이브러리를 지원하여 Jupyter 노트북에서 Spark SQL 쿼리를 실행합니다. 매직 명령어는 선택적 기능입니다.

  1. 필수 종속 항목을 설치합니다.

    pip install IPython sparksql-magic
    
  2. 매직 확장 프로그램을 로드합니다.

    %load_ext sparksql_magic
    
  3. 선택사항: 기본 설정을 구성합니다.

    %config SparkSql.limit=20
    
  4. SQL 쿼리를 실행합니다.

    %%sparksql
    SELECT * FROM your_table
    

고급 옵션을 사용하려면 %%sparksql 명령어에 플래그를 추가하세요. 예를 들어 결과를 캐시하고 뷰를 만들려면 다음 명령어를 실행합니다.

%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true

사용할 수 있는 옵션은 다음과 같습니다.

  • --cache 또는 -c: DataFrame을 캐시합니다.
  • --eager 또는 -e: 즉시 로드되는 캐시입니다.
  • --view VIEW 또는 -v VIEW: 임시 보기를 만듭니다.
  • --limit N 또는 -l N: 기본 행 표시 한도를 재정의합니다.
  • variable_name: 결과를 변수에 저장합니다.

다음 단계