Gemini를 사용하여 Spark 애플리케이션 개발
이 페이지에서는 Gemini CLI를 사용하여 Apache Spark 애플리케이션을 개발한 다음 Apache Spark용 관리 서비스에 제출하는 방법을 보여줍니다.
시작하기 전에
- Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
필요한 역할
이 페이지의 예시를 실행하려면 특정 IAM 역할이 필요합니다. 조직 정책에 따라 이러한 역할이 이미 부여되었을 수 있습니다. 역할 부여를 확인하려면 역할을 부여해야 하나요?를 참고하세요.
역할 부여에 대한 상세 설명은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
사용자 역할
Managed Service for Apache Spark 서비스를 사용하는 데 필요한 권한을 얻으려면 관리자에게 다음 IAM 역할을 부여해 달라고 요청하세요.
-
프로젝트에 대한 Dataproc 편집자 (
roles/dataproc.editor) -
Compute Engine 기본 서비스 계정의 서비스 계정 사용자 (
roles/iam.serviceAccountUser)
서비스 계정 역할
Compute Engine 기본 서비스 계정에 Apache Spark용 관리 서비스 작업을 제출하고 보안 비밀에 액세스하는 데 필요한 권한이 있는지 확인하려면 관리자에게 프로젝트의 Compute Engine 기본 서비스 계정에 다음 IAM 역할을 부여해 달라고 요청하세요.
-
작업 제출:
Dataproc 작업자 (
roles/dataproc.worker) -
보안 비밀에 액세스:
Secret Manager 보안 비밀 접근자 (
roles/secretmanager.secretAccessor)
Gemini 권장사항
다음 권장사항을 따르세요.
- Gemini를 실행하기 전에 환경 변수와 Python 가상 환경을 비롯한 프로젝트의 코드 개발 환경을 설정하세요.
- 프로젝트 가이드라인, 코딩 스타일, 아키텍처를
Gemini.md파일에 저장합니다. - Gemini에게 계획을 수립해 달라고 요청합니다.
- 이전 상태로 롤백할 수 있도록 체크포인트를 사용 설정합니다.
- 코드 언어, 프레임워크, 라이브러리, 결과를 정의하는 구체적인 프롬프트를 사용합니다.
@기호를 사용하여 코드베이스 파일을 참조합니다. !기호를 사용하여 Gemini CLI 내에서 셸 명령어를 실행합니다.
Hive 테이블을 BigQuery로 이전
이 섹션에서는 Gemini에 Hive 테이블을 BigQuery로 이전하는 PySpark 스크립트를 생성하라는 메시지를 표시하는 방법을 보여줍니다.
Gemini CLI에 다음 프롬프트를 입력합니다.
Create a PySpark transform_hive_to_bigquery.py script to extract and transform a Hive table, adding an insertion_time column using the add_insertion_time_column function in @data_transformer.py. Save this table to BigQuery, providing detailed instructions to run this script against a dataproc cluster. Save a summary of this session to hive_to_BQReadme.md.Gemini CLI는 다음과 유사한
transform_hive_to_bigquery.py파일을 생성합니다.import argparse from pyspark.sql import SparkSession from data_transformer import add_insertion_time_column def transform_hive_to_bigquery(spark: SparkSession, hive_database: str, hive_table: str, bq_table: str, bq_temp_gcs_bucket: str): """ Reads a Hive table, adds an insertion_time column, and writes to a BigQuery table. Args: spark: The SparkSession object. hive_database: The name of the source Hive database. hive_table: The name of the source Hive table. bq_table: The destination BigQuery table (e.g., 'dataset.table'). bq_temp_gcs_bucket: The Cloud Storage bucket for temporary BigQuery connector data. """ # Read data from Hive table. input_df = spark.table(f'{hive_database}.{hive_table}') # Add the insertion time column. transformed_df = add_insertion_time_column(input_df) # Write the transformed data to BigQuery. transformed_df.write \ .format('bigquery') \ .option('table', bq_table) \ .option('temporaryGcsBucket', bq_temp_gcs_bucket) \ .mode('append') \ .save() print(f"Data successfully written to BigQuery table: {bq_table}") if __name__ == '__main__': parser = argparse.ArgumentParser(description='PySpark Hive to BigQuery transformation script') parser.add_argument('--hive_database', required=True, help='Source Hive database') parser.add_argument('--hive_table', required=True, help='Source Hive table') parser.add_argument('--bq_table', required=True, help='Destination BigQuery table (dataset.table)') parser.add_argument('--bq_temp_gcs_bucket', required=True, help='Cloud Storage bucket for temporary BigQuery connector data') args = parser.parse_args() # Create a SparkSession with Hive support. spark = SparkSession.builder \ .appName('Hive to BigQuery Transformation') \ .enableHiveSupport() \ .getOrCreate() transform_hive_to_bigquery(spark, args.hive_database, args.hive_table, args.bq_table, args.bq_temp_gcs_bucket) spark.stop()Cloud Storage 버킷에
transform_hive_to_bigquery.py파일을 업로드합니다(안내는 버킷에 객체 업로드 참고).다음 명령어를 실행하여 Apache Spark용 관리 서비스 클러스터에 작업을 제출합니다.
gcloud dataproc jobs submit pyspark gs://BUCKET_NAME/transform_hive_to_bigquery.py \ --cluster=CLUSTER_NAME --py-files=gs://BUCKET_NAME/data_transformer.py \ --properties=spark.hadoop.hive.metastore.uris=METASTORE_URI \ -- --hive_database=HIVE_DATABASE --hive_table=HIVE_TABLE --bq_table=BQ_DATASET.BQ_TABLE \ --bq_temp_gcs_bucket=BUCKET_NAME
Postgres에서 MySQL로 테이블 마이그레이션
이 섹션에서는 Gemini에게 Postgres 데이터베이스의 테이블을 MySQL 데이터베이스로 이전하는 Java Spark 애플리케이션을 생성하도록 프롬프트를 표시하는 방법을 보여줍니다.
Gemini CLI에 다음 프롬프트를 입력합니다.
Create a Spark `PostgresToMySql.java` job to migrate data from a table in a Postgres database to a table in MySQL, both accessible via JDBC. The JDBC URL strings are stored in Secret Manager. The URL string includes the username and password. Read and write data in parallel based on user-provided partitioning information. Write data in batches for efficiency. Use the addInsertionTimeColumn to add a column to the data before writing it to the MySQL destination table. Provide instructions to run this job on Managed Service for Apache Spark in migrateJdbcToJdbc.md. Provide a summary of the job in migration-README.mdGemini가 디렉터리를 검사하고 애플리케이션을 빌드하기 위한
pom.xml파일과 함께 적절한 패키지에 Java 애플리케이션을 만듭니다.PostgresToMySql.java파일은 다음과 유사합니다.package com.customer.app; import com.customer.util.DataframeUtils; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretVersionName; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import java.io.IOException; public class PostgresToMySql { public static void main(String[] args) { if (args.length != 6) { System.err.println("Usage: PostgresToMySql <postgres.table> <mysql.table> <postgres.secret.id> <mysql.secret.id> <partition.column> <batch.size> <project.id>"); System.exit(1); } String postgresTable = args[0]; String mysqlTable = args[1]; String postgresSecretId = args[2]; String mysqlSecretId = args[3]; String partitionColumn = args[4]; String batchSize = args[5]; String projectId = args[6]; SparkSession spark = SparkSession.builder() .appName("Postgres to MySQL JDBC Migration") .getOrCreate(); String postgresUrl = getSecret(projectId, postgresSecretId); String mysqlUrl = getSecret(projectId, mysqlSecretId); // Read data from Postgres in parallel. Dataset<Row> df = spark.read() .format("jdbc") .option("url", postgresUrl) .option("dbtable", postgresTable) .option("partitionColumn", partitionColumn) .option("lowerBound", "2012-01-03") .option("upperBound", "2025-10-31") // These bounds should be configured based on data .option("numPartitions", "1") .option("driver", "org.postgresql.Driver") .load(); // Add insertion time column. Dataset<Row> transformedDf = DataframeUtils.addInsertionTimeColumn(df); // Write data to MySQL in batches. transformedDf.write() .format("jdbc") .option("url", mysqlUrl) .option("dbtable", mysqlTable) .option("batchsize", Integer.parseInt(batchSize)) .option("driver", "com.mysql.cj.jdbc.Driver") .mode(SaveMode.Overwrite) .save(); spark.stop(); } private static String getSecret(String secretId) { try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest"); return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8(); } catch (IOException e) { throw new RuntimeException("Failed to retrieve secret", e); } } }참고:
- 데이터를 기반으로
lowerBound및upperBound을 설정합니다. - 생성된
pom.xml이 올바르게 작동하지 않으면 이 GitHub pom.xml을 사용하세요.
- 데이터를 기반으로
postgres-to-mysql-migration-VERSION.jar파일을 Cloud Storage 버킷에 업로드합니다 (자세한 내용은 버킷에 객체 업로드 참고).다음 명령어를 실행하여 Apache Spark용 관리형 서비스 클러스터에 작업을 제출합니다.
gcloud dataproc jobs submit spark --cluster=CLUSTER_NAME --class=com.customer.app.PostgresToMySql \ --jars=BUCKET/postgres-to-mysql-migration-VERSION.jar \ -- POSTGRES_TABLE MYSQL-TABLE \ POSTGRES_SECRET MYSQL-SECRET COLUMN BATCH_SIZE