Spark-Anwendungen mit Gemini entwickeln
Auf dieser Seite wird beschrieben, wie Sie mit der Gemini CLI Apache Spark-Anwendungen entwickeln und sie dann an den Managed Service for Apache Spark senden.
Hinweis
- Melden Sie sich in Ihrem Google Cloud Konto an. Wenn Sie noch kein Konto bei Google Cloudhaben, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
Erforderliche Rollen
Bestimmte IAM-Rollen sind erforderlich, um die Beispiele auf dieser Seite auszuführen. Je nach Organisationsrichtlinien wurden diese Rollen möglicherweise bereits gewährt. Informationen zum Prüfen von Rollenzuweisungen finden Sie unter Müssen Sie Rollen zuweisen?.
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Nutzerrollen
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zur Verwendung des Managed Service for Apache Spark benötigen:
-
Dataproc-Bearbeiter (
roles/dataproc.editor) für das Projekt -
Dienstkontonutzer (
roles/iam.serviceAccountUser) für das Compute Engine-Standarddienstkonto
Dienstkontorollen
Damit das Compute Engine-Standarddienstkonto die erforderlichen Berechtigungen zum Senden von Managed Service for Apache Spark-Jobs und zum Zugriff auf Secrets hat, bitten Sie Ihren Administrator, dem Compute Engine-Standarddienstkonto die folgenden IAM-Rollen für das Projekt zu gewähren:
-
Job senden:
Dataproc-Worker (
roles/dataproc.worker) -
Auf Secrets zugreifen:
Secret Manager Secret Accessor (
roles/secretmanager.secretAccessor)
Best Practices für Gemini
Wir empfehlen folgende Best Practices:
- Bevor Sie Gemini starten, richten Sie Ihre Codeentwicklungsumgebung in Ihrem Projekt ein, einschließlich Umgebungsvariablen und virtueller Python-Umgebungen.
- Speichern Sie Projektrichtlinien, Codierungsstil und Architektur in einer
Gemini.md-Datei. - Bitten Sie Gemini, einen Plan zu entwickeln.
- Aktivieren Sie Prüfpunkte, um zu einem vorherigen Zustand zurückzukehren.
- Verwenden Sie spezifische Prompts, die Codesprache, Framework, Bibliotheken und Ergebnisse definieren. Verwenden Sie das Symbol
@, um auf Codebasisdateien zu verweisen. - Führen Sie Shell-Befehle über die Gemini CLI mit dem Symbol
!aus.
Hive-Tabelle zu BigQuery migrieren
In diesem Abschnitt wird beschrieben, wie Sie Gemini auffordern, ein PySpark-Skript zu generieren, mit dem eine Hive-Tabelle zu BigQuery migriert wird.
Geben Sie den folgenden Prompt in der Gemini CLI ein:
Create a PySpark transform_hive_to_bigquery.py script to extract and transform a Hive table, adding an insertion_time column using the add_insertion_time_column function in @data_transformer.py. Save this table to BigQuery, providing detailed instructions to run this script against a dataproc cluster. Save a summary of this session to hive_to_BQReadme.md.Die Gemini CLI generiert eine
transform_hive_to_bigquery.py-Datei, die in etwa so aussieht:import argparse from pyspark.sql import SparkSession from data_transformer import add_insertion_time_column def transform_hive_to_bigquery(spark: SparkSession, hive_database: str, hive_table: str, bq_table: str, bq_temp_gcs_bucket: str): """ Reads a Hive table, adds an insertion_time column, and writes to a BigQuery table. Args: spark: The SparkSession object. hive_database: The name of the source Hive database. hive_table: The name of the source Hive table. bq_table: The destination BigQuery table (e.g., 'dataset.table'). bq_temp_gcs_bucket: The Cloud Storage bucket for temporary BigQuery connector data. """ # Read data from Hive table. input_df = spark.table(f'{hive_database}.{hive_table}') # Add the insertion time column. transformed_df = add_insertion_time_column(input_df) # Write the transformed data to BigQuery. transformed_df.write \ .format('bigquery') \ .option('table', bq_table) \ .option('temporaryGcsBucket', bq_temp_gcs_bucket) \ .mode('append') \ .save() print(f"Data successfully written to BigQuery table: {bq_table}") if __name__ == '__main__': parser = argparse.ArgumentParser(description='PySpark Hive to BigQuery transformation script') parser.add_argument('--hive_database', required=True, help='Source Hive database') parser.add_argument('--hive_table', required=True, help='Source Hive table') parser.add_argument('--bq_table', required=True, help='Destination BigQuery table (dataset.table)') parser.add_argument('--bq_temp_gcs_bucket', required=True, help='Cloud Storage bucket for temporary BigQuery connector data') args = parser.parse_args() # Create a SparkSession with Hive support. spark = SparkSession.builder \ .appName('Hive to BigQuery Transformation') \ .enableHiveSupport() \ .getOrCreate() transform_hive_to_bigquery(spark, args.hive_database, args.hive_table, args.bq_table, args.bq_temp_gcs_bucket) spark.stop()Laden Sie die Datei
transform_hive_to_bigquery.pyin einen Cloud Storage-Bucket hoch . Eine Anleitung finden Sie unter Objekt in einen Bucket hochladen .Führen Sie den folgenden Befehl aus, um den Job an einen Managed Service for Apache Spark-Cluster zu senden:
gcloud dataproc jobs submit pyspark gs://BUCKET_NAME/transform_hive_to_bigquery.py \ --cluster=CLUSTER_NAME --py-files=gs://BUCKET_NAME/data_transformer.py \ --properties=spark.hadoop.hive.metastore.uris=METASTORE_URI \ -- --hive_database=HIVE_DATABASE --hive_table=HIVE_TABLE --bq_table=BQ_DATASET.BQ_TABLE \ --bq_temp_gcs_bucket=BUCKET_NAME
Tabelle von PostgreSQL zu MySQL migrieren
In diesem Abschnitt wird beschrieben, wie Sie Gemini auffordern, eine Java Spark-Anwendung zu generieren, mit der eine Tabelle aus einer PostgreSQL-Datenbank zu einer MySQL-Datenbank migriert wird.
Geben Sie den folgenden Prompt in der Gemini CLI ein:
Create a Spark `PostgresToMySql.java` job to migrate data from a table in a Postgres database to a table in MySQL, both accessible via JDBC. The JDBC URL strings are stored in Secret Manager. The URL string includes the username and password. Read and write data in parallel based on user-provided partitioning information. Write data in batches for efficiency. Use the addInsertionTimeColumn to add a column to the data before writing it to the MySQL destination table. Provide instructions to run this job on Managed Service for Apache Spark in migrateJdbcToJdbc.md. Provide a summary of the job in migration-README.mdGemini untersucht Ihr Verzeichnis und erstellt die Java-Anwendung im entsprechenden Paket zusammen mit einer
pom.xml-Datei, um die Anwendung zu erstellen. Die DateiPostgresToMySql.javasieht in etwa so aus:package com.customer.app; import com.customer.util.DataframeUtils; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretVersionName; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import java.io.IOException; public class PostgresToMySql { public static void main(String[] args) { if (args.length != 6) { System.err.println("Usage: PostgresToMySql <postgres.table> <mysql.table> <postgres.secret.id> <mysql.secret.id> <partition.column> <batch.size> <project.id>"); System.exit(1); } String postgresTable = args[0]; String mysqlTable = args[1]; String postgresSecretId = args[2]; String mysqlSecretId = args[3]; String partitionColumn = args[4]; String batchSize = args[5]; String projectId = args[6]; SparkSession spark = SparkSession.builder() .appName("Postgres to MySQL JDBC Migration") .getOrCreate(); String postgresUrl = getSecret(projectId, postgresSecretId); String mysqlUrl = getSecret(projectId, mysqlSecretId); // Read data from Postgres in parallel. Dataset<Row> df = spark.read() .format("jdbc") .option("url", postgresUrl) .option("dbtable", postgresTable) .option("partitionColumn", partitionColumn) .option("lowerBound", "2012-01-03") .option("upperBound", "2025-10-31") // These bounds should be configured based on data .option("numPartitions", "1") .option("driver", "org.postgresql.Driver") .load(); // Add insertion time column. Dataset<Row> transformedDf = DataframeUtils.addInsertionTimeColumn(df); // Write data to MySQL in batches. transformedDf.write() .format("jdbc") .option("url", mysqlUrl) .option("dbtable", mysqlTable) .option("batchsize", Integer.parseInt(batchSize)) .option("driver", "com.mysql.cj.jdbc.Driver") .mode(SaveMode.Overwrite) .save(); spark.stop(); } private static String getSecret(String secretId) { try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest"); return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8(); } catch (IOException e) { throw new RuntimeException("Failed to retrieve secret", e); } } }Hinweise:
- Legen Sie
lowerBoundundupperBoundbasierend auf Ihren Daten fest. - Wenn die generierte
pom.xmlnicht ordnungsgemäß funktioniert, verwenden Sie diese GitHub pom.xml.
- Legen Sie
Laden Sie die Datei
postgres-to-mysql-migration-VERSION.jarin einen Cloud Storage-Bucket hoch. Eine Anleitung finden Sie unter Objekt in einen Bucket hochladen .Führen Sie den folgenden Befehl aus, um den Job an Ihren Managed Service for Apache Spark-Cluster zu senden:
gcloud dataproc jobs submit spark --cluster=CLUSTER_NAME --class=com.customer.app.PostgresToMySql \ --jars=BUCKET/postgres-to-mysql-migration-VERSION.jar \ -- POSTGRES_TABLE MYSQL-TABLE \ POSTGRES_SECRET MYSQL-SECRET COLUMN BATCH_SIZE