데이터 소스에 연결
Apache Spark용 관리형 서비스는 데이터베이스, 데이터 웨어하우스, 스트리밍 서비스와 통합되어 여러 시스템의 데이터를 처리합니다. 이 문서에서는 Apache Spark용 관리형 서비스 클러스터를 내부 및 외부의 데이터 소스에 연결하는 방법을 보여줍니다. 내부 및 외부의 Google Cloud
시작하기 전에
- 계정에 로그인합니다. Google Cloud 를 처음 사용하는 경우 Google Cloud 계정을 만들어 실제 시나리오에서 제품이 어떻게 작동하는지 평가하세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Apache Spark용 관리형 서비스 클러스터를 만듭니다.
- Cloud Storage 버킷을 만듭니다.
- 데이터 소스에 액세스하는 데 필요한 권한이 있는지 확인합니다.
필요한 역할
이 페이지의 예시를 실행하려면 특정 IAM 역할이 필요합니다. 조직 정책에 따라 이러한 역할이 이미 부여되었을 수 있습니다. 역할 부여를 확인하려면 역할을 부여해야 하나요?를 참고하세요.
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참고하세요.
사용자 역할
Apache Spark용 관리형 서비스 클러스터를 만드는 데 필요한 권한을 얻으려면 관리자에게 다음 IAM 역할을 부여해 달라고 요청하세요.
-
Dataproc 편집자 (
roles/dataproc.editor) 프로젝트에 대한 -
Compute Engine 기본 서비스 계정의 서비스 계정 사용자 역할 (
roles/iam.serviceAccountUser)
서비스 계정 역할
Compute Engine 기본 서비스 계정에 Apache Spark용 관리형 서비스 클러스터를 만드는 데 필요한 권한이 있는지 확인하려면 관리자에게 Compute Engine 기본 서비스 계정에 프로젝트에 대한 Dataproc 작업자 (roles/dataproc.worker) IAM 역할을 부여해 달라고 요청하세요.
서비스에 연결 Google Cloud
이 섹션에서는 Apache Spark용 관리형 서비스를 Google Cloud 및 외부 데이터 소스에 연결하는 방법을 보여줍니다.
Cloud Storage에 연결
Apache Spark용 관리형 서비스는 Cloud Storage를 기본 분산 파일 시스템으로 사용합니다. Cloud Storage 버킷에서 데이터를 읽고 쓰려면 gs:// URI 접두어를 사용하세요. 추가 구성은 필요하지 않습니다.
다음 예시에서는 Cloud Storage 버킷에서 CSV 파일을 읽은 다음 Parquet 파일을 버킷에 쓰는 방법을 보여줍니다.
# Read a CSV file from a Cloud Storage bucket into a Spark DataFrame
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("gs://BUCKET_NAME/path/to/data.csv")
df.show()
# Write a Spark DataFrame to a Cloud Storage bucket
df.write.format("parquet") \
.mode("overwrite") \
.save("gs://BUCKET_NAME/path/to/output.parquet")
BigQuery에 연결
BigQuery 커넥터는 Apache Spark용 관리형 서비스 2.1 이상 이미지 버전에 사전 설치됩니다. 이 커넥터를 사용하면 Apache Spark용 관리형 서비스와 BigQuery 간에 대규모 데이터 전송이 가능합니다.
다음 예시에서는 BigQuery 테이블을 Spark DataFrame으로 읽은 다음 쿼리를 실행하는 방법을 보여줍니다.
# Set up the BigQuery connector.
spark.conf.set("spark.sql.sources.provider", "bigquery")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "YOUR_BQ_DATASET")
# Read a BigQuery table into a Spark DataFrame,
df = spark.read.format("bigquery") \
.option("table", "bigquery-public-data.samples.shakespeare") \
.load()
df.createOrReplaceTempView("shakespeare")
# Perform a query on the data,
word_counts = spark.sql("SELECT word, SUM(word_count) as count FROM shakespeare GROUP BY word ORDER BY count DESC")
word_counts.show()
JDBC를 사용하여 외부 데이터베이스에 연결
Spark 기본 제공 JDBC 데이터 소스를 사용하여 Apache Spark용 관리형 서비스를 Java Database Connectivity(JDBC) 드라이버를 제공하는 데이터베이스(예: PostgreSQL 또는 MySQL)에 연결할 수 있습니다.
JDBC 드라이버를 사용할 수 있도록 만들기
JDBC를 사용하여 데이터베이스에 연결하려면 드라이버의 JAR 파일을 Spark 클래스 경로에서 사용할 수 있도록 만드세요.
작업별
드라이버 JAR을 Cloud Storage 버킷에 저장하고 작업을 제출할 때 --jars 플래그로 참조하세요. Spark는 JAR 파일을 작업에 필요한 노드에 배포합니다. 작업별 종속 항목에 권장되는 접근 방식입니다.
클러스터 전체
클러스터의 모든 작업에 특정 드라이버가 필요한 경우 클러스터를 만들 때 초기화 작업을 사용하여 Cloud Storage에서 모든 노드의 Spark JAR 디렉터리로 드라이버 JAR을 복사하세요.
초기화 작업 스크립트를 만듭니다.
#!/bin/bash set -e -x gsutil cp gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jar /usr/lib/spark/jars/클러스터를 만들고 스크립트를 참조합니다.
gcloud dataproc clusters create my-jdbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://YOUR_BUCKET_NAME/init-actions/install-jdbc-driver.sh
PostgreSQL 데이터베이스에서 읽기
이 예시에서는 작업별 메서드를 사용하여 PostgreSQL 데이터베이스에 연결합니다.
PostgreSQL JDBC 드라이버 JAR을 다운로드하고 Cloud Storage 버킷에 업로드합니다.
PySpark 작업을 제출하고
--jars플래그로 드라이버의 Cloud Storage 경로를 참조합니다.gcloud dataproc jobs submit pyspark my_job.py \ --cluster=MY_CLUSTER \ --region=REGION \ --jars=gs://YOUR_BUCKET_NAME/drivers/postgresql-42.7.3.jarjob.py파일에서 다음 코드를 사용하여 데이터베이스에서 읽고 씁니다.from pyspark.sql import SparkSession def main(): # WARNING: Do not hardcode credentials. Use a service like Secret Manager # to handle sensitive credentials. db_properties = { "user": "USERNAME", "password": "PASSWORD", "driver": "org.postgresql.Driver" } jdbc_url = "jdbc:postgresql://DB_HOST:DB_PORT/DATABASE" spark = SparkSession.builder.appName("Postgres JDBC Example").getOrCreate() # Read data from a PostgreSQL table df = spark.read.jdbc(url=jdbc_url, table="public.my_table", properties=db_properties) df.printSchema() df.show() # Write data to a new PostgreSQL table (df.write .jdbc(url=jdbc_url, table="public.new_table", mode="overwrite", properties=db_properties)) if __name__ == "__main__": main()
ODBC를 사용하여 외부 데이터베이스에 연결
JDBC 드라이버를 사용할 수 없는 데이터 소스에 연결하려면 Open Database Connectivity (ODBC) 드라이버를 사용하세요. 클러스터를 만들 때 초기화 작업을 사용하여 각 노드에 드라이버와 종속 항목을 설치합니다.
이 예시에서는 Microsoft SQL Server 인스턴스에 연결하는 방법을 보여줍니다.
초기화 작업 스크립트를 만듭니다.
- Debian용 Microsoft SQL Server ODBC 드라이버를 설치하는 스크립트를 만듭니다.
#!/bin/bash # Initialization action for installing MS SQL ODBC driver on the cluster. set -e -x # Install dependencies for the driver. apt-get update apt-get install -y --no-install-recommends curl gnupg unixodbc-dev # Add Microsoft's official repository. curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list # Install the driver. apt-get update ACCEPT_EULA=Y apt-get install -y msodbcsql18 # Install the pyodbc library for Python. pip install pyodbc ``` 1. Upload the script to a Cloud Storage bucket. ```shell gsutil cp install-odbc-mssql.sh gs://YOUR_BUCKET_NAME/init-actions/ ```초기화 작업을 실행하는 Apache Spark용 관리형 서비스 클러스터를 만듭니다.
클러스터를 만들 때 Cloud Storage의 초기화 작업 스크립트를 가리킵니다.
gcloud dataproc clusters create my-odbc-cluster \ --region=REGION \ --image-version=2.2-debian11 \ --initialization-actions=gs://BUCKET_NAME/init-actions/install-odbc-mssql.shPySpark를 사용하여 연결 및 쿼리
클러스터가 실행되면 ODBC 드라이버와
pyodbc라이브러리를 사용할 수 있습니다. 다음 코드는 드라이버 노드에서pyodbc를 사용하여 데이터를 Pandas DataFrame으로 가져온 다음 분산 처리를 위해 Spark DataFrame으로 변환합니다.import pyodbc import pandas as pd from pyspark.sql import SparkSession def get_sql_data(connection_string, query): """ Connects to the database using pyodbc, executes a query, and returns the result as a Pandas DataFrame. """ cnxn = pyodbc.connect(connection_string) pdf = pd.read_sql(query, cnxn) cnxn.close() return pdf def main(): spark = SparkSession.builder.appName("ODBC Example").getOrCreate() # WARNING: Do not hardcode credentials. Use Secret Manager. server = 'SERVER.database.windows.net' database = 'DATABASE' username = 'USERNAME' password = 'PASSWORD' driver = '{ODBC Driver 18 for SQL Server}' # The driver name must match the installed driver. # Create the connection string. connection_string = f'DRIVER={driver};SERVER=tcp:{server},1433;DATABASE={database};UID={username};PWD={password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;' sql_query = "SELECT * FROM Sales.Customer" # Fetch data as a Pandas DataFrame. pandas_df = get_sql_data(connection_string, sql_query) # Convert the Pandas DataFrame to a Spark DataFrame for distributed processing. spark_df = spark.createDataFrame(pandas_df) print("Successfully read data from SQL Server:") spark_df.printSchema() spark_df.show(5) # You can now perform distributed operations on the Spark DataFrame. print(f"Total number of customers: {spark_df.count()}") if __name__ == "__main__": main()
권장사항
이 섹션에서는 사용자 인증 정보 및 종속 항목 관리에 관한 권장사항을 설명합니다.
사용자 인증 정보 관리
코드에 비밀번호 또는 기타 보안 비밀을 하드코딩하지 마세요. Secret Manager를 사용하여 사용자 인증 정보를 안전하게 저장하고 Apache Spark용 관리형 서비스 작업에서 액세스하세요.
종속 항목 관리
클러스터를 만들 때 초기화 작업을 사용하여 드라이버와 라이브러리를 설치하세요. 복잡한 환경의 경우 모든 종속 항목이 사전 설치된 커스텀 Apache Spark용 관리형 서비스 이미지를 빌드하는 것이 좋습니다. 커스텀 이미지를 사용하면 클러스터 시작 시간이 단축되고 일관성이 보장됩니다.
프로덕션에서 사용하기 전에 공개 소스의 초기화 작업 스크립트를 버전 관리된 자체 Cloud Storage 버킷에 복사하세요. 공개 스크립트는 예고 없이 변경될 수 있으므로 직접 참조하지 마세요.
다음 단계
- Secret Manager 자세히 알아보기
- 커스텀 이미지를 만드는 방법 알아보기.
- 초기화 작업에 대해 자세히 알아보기