Utiliser Gemini pour développer des applications Spark
Cette page vous explique comment utiliser la Gemini CLI pour développer des applications Apache Spark, puis les envoyer au service Managed Service for Apache Spark.
Avant de commencer
- Connectez-vous à votre compte Google Cloud . Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $de crédits sans frais pour exécuter, tester et déployer des charges de travail.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
Rôles requis
Certains rôles IAM sont requis pour exécuter les exemples de cette page. En fonction des règles d'administration, ces rôles peuvent déjà avoir été accordés. Pour vérifier les attributions de rôles, consultez Do you need to grant roles? (Devez-vous attribuer des rôles ?).
Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.
Rôles utilisateur
Pour obtenir les autorisations nécessaires pour utiliser le service Managed Service for Apache Spark, demandez à votre administrateur de vous accorder les rôles IAM suivants :
-
Éditeur Dataproc (
roles/dataproc.editor) sur le projet -
Utilisateur du compte de service (
roles/iam.serviceAccountUser) sur le compte de service Compute Engine par défaut
Rôles de compte de service
Pour vous assurer que le compte de service Compute Engine par défaut dispose des autorisations nécessaires pour envoyer des jobs Managed Service for Apache Spark et accéder aux secrets, demandez à votre administrateur d'accorder les rôles IAM suivants au compte de service Compute Engine par défaut sur le projet :
-
Envoyer un job :
Nœud de calcul Dataproc (
roles/dataproc.worker) -
Accéder aux secrets :
Accesseur de secrets Secret Manager (
roles/secretmanager.secretAccessor)
Bonnes pratiques Gemini
Appliquez les bonnes pratiques suivantes :
- Avant de lancer Gemini, configurez votre environnement de développement de code dans votre projet, y compris les variables d'environnement et les environnements virtuels Python.
- Enregistrez les consignes, le style de programmation et l'architecture du projet dans un fichier
Gemini.md. - Demandez à Gemini d'élaborer un plan.
- Activez les points de contrôle pour revenir à un état antérieur.
- Utilisez des requêtes spécifiques qui définissent le langage de programmation, le framework, les bibliothèques et les résultats. Utilisez le symbole
@pour faire référence aux fichiers de la codebase. - Exécutez des commandes shell depuis Gemini CLI à l'aide du symbole
!.
Migrer une table Hive vers BigQuery
Cette section explique comment demander à Gemini de générer un script PySpark qui migre une table Hive vers BigQuery.
Saisissez le prompt suivant dans Gemini CLI :
Create a PySpark transform_hive_to_bigquery.py script to extract and transform a Hive table, adding an insertion_time column using the add_insertion_time_column function in @data_transformer.py. Save this table to BigQuery, providing detailed instructions to run this script against a dataproc cluster. Save a summary of this session to hive_to_BQReadme.md.Gemini CLI génère un fichier
transform_hive_to_bigquery.pysemblable à celui-ci :import argparse from pyspark.sql import SparkSession from data_transformer import add_insertion_time_column def transform_hive_to_bigquery(spark: SparkSession, hive_database: str, hive_table: str, bq_table: str, bq_temp_gcs_bucket: str): """ Reads a Hive table, adds an insertion_time column, and writes to a BigQuery table. Args: spark: The SparkSession object. hive_database: The name of the source Hive database. hive_table: The name of the source Hive table. bq_table: The destination BigQuery table (e.g., 'dataset.table'). bq_temp_gcs_bucket: The Cloud Storage bucket for temporary BigQuery connector data. """ # Read data from Hive table. input_df = spark.table(f'{hive_database}.{hive_table}') # Add the insertion time column. transformed_df = add_insertion_time_column(input_df) # Write the transformed data to BigQuery. transformed_df.write \ .format('bigquery') \ .option('table', bq_table) \ .option('temporaryGcsBucket', bq_temp_gcs_bucket) \ .mode('append') \ .save() print(f"Data successfully written to BigQuery table: {bq_table}") if __name__ == '__main__': parser = argparse.ArgumentParser(description='PySpark Hive to BigQuery transformation script') parser.add_argument('--hive_database', required=True, help='Source Hive database') parser.add_argument('--hive_table', required=True, help='Source Hive table') parser.add_argument('--bq_table', required=True, help='Destination BigQuery table (dataset.table)') parser.add_argument('--bq_temp_gcs_bucket', required=True, help='Cloud Storage bucket for temporary BigQuery connector data') args = parser.parse_args() # Create a SparkSession with Hive support. spark = SparkSession.builder \ .appName('Hive to BigQuery Transformation') \ .enableHiveSupport() \ .getOrCreate() transform_hive_to_bigquery(spark, args.hive_database, args.hive_table, args.bq_table, args.bq_temp_gcs_bucket) spark.stop()Importez le fichier
transform_hive_to_bigquery.pydans un bucket Cloud Storage (pour obtenir des instructions, consultez Importer un objet dans un bucket).Exécutez la commande suivante pour envoyer le job à un cluster Managed Service for Apache Spark :
gcloud dataproc jobs submit pyspark gs://BUCKET_NAME/transform_hive_to_bigquery.py \ --cluster=CLUSTER_NAME --py-files=gs://BUCKET_NAME/data_transformer.py \ --properties=spark.hadoop.hive.metastore.uris=METASTORE_URI \ -- --hive_database=HIVE_DATABASE --hive_table=HIVE_TABLE --bq_table=BQ_DATASET.BQ_TABLE \ --bq_temp_gcs_bucket=BUCKET_NAME
Migrer une table de Postgres vers MySQL
Cette section explique comment demander à Gemini de générer une application Java Spark qui migre une table d'une base de données Postgres vers une base de données MySQL.
Saisissez le prompt suivant dans Gemini CLI :
Create a Spark `PostgresToMySql.java` job to migrate data from a table in a Postgres database to a table in MySQL, both accessible via JDBC. The JDBC URL strings are stored in Secret Manager. The URL string includes the username and password. Read and write data in parallel based on user-provided partitioning information. Write data in batches for efficiency. Use the addInsertionTimeColumn to add a column to the data before writing it to the MySQL destination table. Provide instructions to run this job on Managed Service for Apache Spark in migrateJdbcToJdbc.md. Provide a summary of the job in migration-README.mdGemini examine votre répertoire et crée l'application Java dans le package approprié, ainsi qu'un fichier
pom.xmlpour compiler l'application. Le fichierPostgresToMySql.javadoit ressembler à ce qui suit :package com.customer.app; import com.customer.util.DataframeUtils; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretVersionName; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import java.io.IOException; public class PostgresToMySql { public static void main(String[] args) { if (args.length != 6) { System.err.println("Usage: PostgresToMySql <postgres.table> <mysql.table> <postgres.secret.id> <mysql.secret.id> <partition.column> <batch.size> <project.id>"); System.exit(1); } String postgresTable = args[0]; String mysqlTable = args[1]; String postgresSecretId = args[2]; String mysqlSecretId = args[3]; String partitionColumn = args[4]; String batchSize = args[5]; String projectId = args[6]; SparkSession spark = SparkSession.builder() .appName("Postgres to MySQL JDBC Migration") .getOrCreate(); String postgresUrl = getSecret(projectId, postgresSecretId); String mysqlUrl = getSecret(projectId, mysqlSecretId); // Read data from Postgres in parallel. Dataset<Row> df = spark.read() .format("jdbc") .option("url", postgresUrl) .option("dbtable", postgresTable) .option("partitionColumn", partitionColumn) .option("lowerBound", "2012-01-03") .option("upperBound", "2025-10-31") // These bounds should be configured based on data .option("numPartitions", "1") .option("driver", "org.postgresql.Driver") .load(); // Add insertion time column. Dataset<Row> transformedDf = DataframeUtils.addInsertionTimeColumn(df); // Write data to MySQL in batches. transformedDf.write() .format("jdbc") .option("url", mysqlUrl) .option("dbtable", mysqlTable) .option("batchsize", Integer.parseInt(batchSize)) .option("driver", "com.mysql.cj.jdbc.Driver") .mode(SaveMode.Overwrite) .save(); spark.stop(); } private static String getSecret(String secretId) { try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest"); return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8(); } catch (IOException e) { throw new RuntimeException("Failed to retrieve secret", e); } } }Remarques :
- Définissez
lowerBoundetupperBounden fonction de vos données. - Si le
pom.xmlgénéré ne fonctionne pas correctement, utilisez ce pom.xml GitHub.
- Définissez
Importez le fichier
postgres-to-mysql-migration-VERSION.jardans un bucket Cloud Storage (pour obtenir des instructions, consultez Importer un objet dans un bucket).Exécutez la commande suivante pour envoyer le job à votre cluster Managed Service pour Apache Spark :
gcloud dataproc jobs submit spark --cluster=CLUSTER_NAME --class=com.customer.app.PostgresToMySql \ --jars=BUCKET/postgres-to-mysql-migration-VERSION.jar \ -- POSTGRES_TABLE MYSQL-TABLE \ POSTGRES_SECRET MYSQL-SECRET COLUMN BATCH_SIZE