Menggunakan Gemini untuk mengembangkan aplikasi Spark
Halaman ini menunjukkan cara menggunakan Gemini CLI untuk mengembangkan aplikasi Apache Spark, lalu mengirimkannya ke layanan Managed Service for Apache Spark.
Sebelum memulai
- Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
Peran yang diperlukan
Peran IAM tertentu diperlukan untuk menjalankan contoh di halaman ini. Bergantung pada kebijakan organisasi, peran ini mungkin sudah diberikan. Untuk memeriksa pemberian peran, lihat Apakah Anda perlu memberikan peran?.
Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project, folder, dan organisasi.
Peran pengguna
Untuk mendapatkan izin yang Anda perlukan untuk menggunakan layanan Managed Service for Apache Spark, minta administrator untuk memberi Anda peran IAM berikut:
-
Dataproc Editor (
roles/dataproc.editor) di project -
Pengguna Akun Layanan (
roles/iam.serviceAccountUser) di akun layanan default Compute Engine
Peran akun layanan
Untuk memastikan bahwa akun layanan default Compute Engine memiliki izin yang diperlukan untuk mengirimkan tugas Managed Service for Apache Spark dan mengakses secret, minta administrator Anda untuk memberikan peran IAM berikut kepada akun layanan default Compute Engine di project:
-
Kirimkan tugas:
Dataproc Worker (
roles/dataproc.worker) -
Mengakses secret:
Secret Manager Secret Accessor (
roles/secretmanager.secretAccessor)
Praktik terbaik Gemini
Ikuti praktik terbaik berikut:
- Sebelum meluncurkan Gemini, siapkan lingkungan pengembangan kode di project Anda, termasuk variabel lingkungan dan lingkungan virtual Python.
- Simpan pedoman project, gaya coding, dan arsitektur, dalam file
Gemini.md. - Minta Gemini membuat rencana.
- Aktifkan titik pemeriksaan untuk melakukan roll back ke status sebelumnya.
- Gunakan perintah spesifik yang menentukan bahasa kode, framework, library, dan hasil. Gunakan simbol
@untuk mereferensikan file codebase. - Jalankan perintah shell dari dalam Gemini CLI menggunakan simbol
!.
Memigrasikan tabel Hive ke BigQuery
Bagian ini menunjukkan cara meminta Gemini untuk membuat skrip PySpark yang memigrasikan tabel Hive ke BigQuery.
Masukkan perintah berikut ke Gemini CLI:
Create a PySpark transform_hive_to_bigquery.py script to extract and transform a Hive table, adding an insertion_time column using the add_insertion_time_column function in @data_transformer.py. Save this table to BigQuery, providing detailed instructions to run this script against a dataproc cluster. Save a summary of this session to hive_to_BQReadme.md.Gemini CLI membuat file
transform_hive_to_bigquery.pyyang mirip dengan berikut ini:import argparse from pyspark.sql import SparkSession from data_transformer import add_insertion_time_column def transform_hive_to_bigquery(spark: SparkSession, hive_database: str, hive_table: str, bq_table: str, bq_temp_gcs_bucket: str): """ Reads a Hive table, adds an insertion_time column, and writes to a BigQuery table. Args: spark: The SparkSession object. hive_database: The name of the source Hive database. hive_table: The name of the source Hive table. bq_table: The destination BigQuery table (e.g., 'dataset.table'). bq_temp_gcs_bucket: The Cloud Storage bucket for temporary BigQuery connector data. """ # Read data from Hive table. input_df = spark.table(f'{hive_database}.{hive_table}') # Add the insertion time column. transformed_df = add_insertion_time_column(input_df) # Write the transformed data to BigQuery. transformed_df.write \ .format('bigquery') \ .option('table', bq_table) \ .option('temporaryGcsBucket', bq_temp_gcs_bucket) \ .mode('append') \ .save() print(f"Data successfully written to BigQuery table: {bq_table}") if __name__ == '__main__': parser = argparse.ArgumentParser(description='PySpark Hive to BigQuery transformation script') parser.add_argument('--hive_database', required=True, help='Source Hive database') parser.add_argument('--hive_table', required=True, help='Source Hive table') parser.add_argument('--bq_table', required=True, help='Destination BigQuery table (dataset.table)') parser.add_argument('--bq_temp_gcs_bucket', required=True, help='Cloud Storage bucket for temporary BigQuery connector data') args = parser.parse_args() # Create a SparkSession with Hive support. spark = SparkSession.builder \ .appName('Hive to BigQuery Transformation') \ .enableHiveSupport() \ .getOrCreate() transform_hive_to_bigquery(spark, args.hive_database, args.hive_table, args.bq_table, args.bq_temp_gcs_bucket) spark.stop()Upload file
transform_hive_to_bigquery.pyke bucket Cloud Storage (lihat Mengupload objek ke bucket untuk mengetahui petunjuknya).Jalankan perintah berikut untuk mengirimkan tugas ke cluster Managed Service for Apache Spark:
gcloud dataproc jobs submit pyspark gs://BUCKET_NAME/transform_hive_to_bigquery.py \ --cluster=CLUSTER_NAME --py-files=gs://BUCKET_NAME/data_transformer.py \ --properties=spark.hadoop.hive.metastore.uris=METASTORE_URI \ -- --hive_database=HIVE_DATABASE --hive_table=HIVE_TABLE --bq_table=BQ_DATASET.BQ_TABLE \ --bq_temp_gcs_bucket=BUCKET_NAME
Memigrasikan tabel dari Postgres ke MySQL
Bagian ini menunjukkan cara meminta Gemini untuk membuat aplikasi Java Spark yang memigrasikan tabel dari database Postgres ke database MySQL.
Masukkan perintah berikut ke Gemini CLI:
Create a Spark `PostgresToMySql.java` job to migrate data from a table in a Postgres database to a table in MySQL, both accessible via JDBC. The JDBC URL strings are stored in Secret Manager. The URL string includes the username and password. Read and write data in parallel based on user-provided partitioning information. Write data in batches for efficiency. Use the addInsertionTimeColumn to add a column to the data before writing it to the MySQL destination table. Provide instructions to run this job on Managed Service for Apache Spark in migrateJdbcToJdbc.md. Provide a summary of the job in migration-README.mdGemini akan memeriksa direktori Anda dan membuat aplikasi Java dalam paket yang sesuai beserta file
pom.xmluntuk membangun aplikasi. FilePostgresToMySql.javaakan mirip dengan berikut ini:package com.customer.app; import com.customer.util.DataframeUtils; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretVersionName; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import java.io.IOException; public class PostgresToMySql { public static void main(String[] args) { if (args.length != 6) { System.err.println("Usage: PostgresToMySql <postgres.table> <mysql.table> <postgres.secret.id> <mysql.secret.id> <partition.column> <batch.size> <project.id>"); System.exit(1); } String postgresTable = args[0]; String mysqlTable = args[1]; String postgresSecretId = args[2]; String mysqlSecretId = args[3]; String partitionColumn = args[4]; String batchSize = args[5]; String projectId = args[6]; SparkSession spark = SparkSession.builder() .appName("Postgres to MySQL JDBC Migration") .getOrCreate(); String postgresUrl = getSecret(projectId, postgresSecretId); String mysqlUrl = getSecret(projectId, mysqlSecretId); // Read data from Postgres in parallel. Dataset<Row> df = spark.read() .format("jdbc") .option("url", postgresUrl) .option("dbtable", postgresTable) .option("partitionColumn", partitionColumn) .option("lowerBound", "2012-01-03") .option("upperBound", "2025-10-31") // These bounds should be configured based on data .option("numPartitions", "1") .option("driver", "org.postgresql.Driver") .load(); // Add insertion time column. Dataset<Row> transformedDf = DataframeUtils.addInsertionTimeColumn(df); // Write data to MySQL in batches. transformedDf.write() .format("jdbc") .option("url", mysqlUrl) .option("dbtable", mysqlTable) .option("batchsize", Integer.parseInt(batchSize)) .option("driver", "com.mysql.cj.jdbc.Driver") .mode(SaveMode.Overwrite) .save(); spark.stop(); } private static String getSecret(String secretId) { try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest"); return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8(); } catch (IOException e) { throw new RuntimeException("Failed to retrieve secret", e); } } }Catatan:
- Tetapkan
lowerBounddanupperBoundberdasarkan data Anda. - Jika
pom.xmlyang dihasilkan tidak berfungsi dengan benar, gunakan pom.xml GitHub ini.
- Tetapkan
Upload file
postgres-to-mysql-migration-VERSION.jarke bucket Cloud Storage (lihat Mengupload objek ke bucket untuk mengetahui petunjuknya).Jalankan perintah berikut untuk mengirimkan tugas ke cluster Managed Service for Apache Spark:
gcloud dataproc jobs submit spark --cluster=CLUSTER_NAME --class=com.customer.app.PostgresToMySql \ --jars=BUCKET/postgres-to-mysql-migration-VERSION.jar \ -- POSTGRES_TABLE MYSQL-TABLE \ POSTGRES_SECRET MYSQL-SECRET COLUMN BATCH_SIZE