Dataproc Spark Connect 클라이언트는 Apache Spark Connect 클라이언트의 래퍼입니다. 이를 통해 애플리케이션은 Spark Connect 프로토콜을 사용하여 원격 Apache Spark용 서버리스 세션과 통신할 수 있습니다. 이 문서에서는 클라이언트를 설치, 구성, 사용하는 방법을 보여줍니다.
시작하기 전에
대화형 세션과 세션 템플릿을 관리하는 데 필요한 권한이 포함된 Identity and Access Management 역할이 있는지 확인합니다.
Google Cloud외부에서 클라이언트를 실행하는 경우 인증 사용자 인증 정보를 제공합니다.
GOOGLE_APPLICATION_CREDENTIALS환경 변수를 서비스 계정 키 파일의 경로로 설정합니다.
클라이언트 설치 또는 제거
pip를 사용하여 dataproc-spark-connect 패키지를 설치하거나 제거할 수 있습니다.
설치
최신 버전의 클라이언트를 설치하려면 다음 명령어를 실행합니다.
pip install -U dataproc-spark-connect
제거
클라이언트를 제거하려면 다음 명령어를 실행합니다.
pip uninstall dataproc-spark-connect
클라이언트 구성
세션의 프로젝트와 리전을 지정합니다. 환경 변수를 사용하거나 코드에서 빌더 API를 사용하여 이러한 값을 설정할 수 있습니다.
환경 변수
GOOGLE_CLOUD_PROJECT 및 GOOGLE_CLOUD_REGION 환경 변수를 설정합니다.
Builder API
.projectId() 및 .location() 메서드를 사용합니다.
spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()
Spark 세션 시작
Spark 세션을 시작하려면 PySpark 애플리케이션 또는 노트북에 필요한 가져오기를 추가한 다음 DataprocSparkSession.builder.getOrCreate() API를 호출합니다.
DataprocSparkSession클래스를 가져옵니다.getOrCreate()메서드를 호출하여 세션을 시작합니다.from google.cloud.dataproc_spark_connect import DataprocSparkSession spark = DataprocSparkSession.builder.getOrCreate()
Spark 속성 구성
Spark 속성을 구성하려면 하나 이상의 .config() 메서드를 빌더에 연결합니다.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()
고급 구성 사용
고급 구성의 경우 Session 클래스를 사용하여 서브넷 또는 런타임 버전과 같은 설정을 맞춤설정합니다.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'
spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()
명명된 세션 재사용
명명된 세션을 사용하면 세션 시작 시간 지연을 반복하지 않으면서 여러 노트북에서 단일 Spark 세션을 공유할 수 있습니다.
첫 번째 노트북에서 맞춤 ID로 세션을 만듭니다.
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(1, 'data')], ['id', 'value']) df.show()다른 노트북에서 동일한 세션 ID를 지정하여 세션을 재사용합니다.
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(2, 'more-data')], ['id', 'value']) df.show()
세션 ID는 4~63자(영문 기준)여야 하고 소문자로 시작해야 하며 소문자, 숫자, 하이픈만 포함해야 합니다. ID는 하이픈으로 끝날 수 없습니다. ID가 TERMINATED 상태인 세션은 재사용할 수 없습니다.
Spark SQL 매직 명령어 사용
이 패키지는 sparksql-magic 라이브러리를 지원하여 Jupyter 노트북에서 Spark SQL 쿼리를 실행합니다. 매직 명령어는 선택적 기능입니다.
필수 종속 항목을 설치합니다.
pip install IPython sparksql-magic매직 확장 프로그램을 로드합니다.
%load_ext sparksql_magic선택사항: 기본 설정을 구성합니다.
%config SparkSql.limit=20SQL 쿼리를 실행합니다.
%%sparksql SELECT * FROM your_table
고급 옵션을 사용하려면 %%sparksql 명령어에 플래그를 추가하세요. 예를 들어 결과를 캐시하고 뷰를 만들려면 다음 명령어를 실행합니다.
%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true
사용할 수 있는 옵션은 다음과 같습니다.
--cache또는-c: DataFrame을 캐시합니다.--eager또는-e: 즉시 로드되는 캐시입니다.--view VIEW또는-v VIEW: 임시 보기를 만듭니다.--limit N또는-l N: 기본 행 표시 한도를 재정의합니다.variable_name: 결과를 변수에 저장합니다.
다음 단계
Dataproc 세션에 대해 자세히 알아보세요.