Usar o Gemini para desenvolver aplicativos Spark
Nesta página, mostramos como usar a CLI do Gemini para desenvolver aplicativos do Apache Spark e enviá-los ao Managed Service para Apache Spark.
Antes de começar
- Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
Funções exigidas
Alguns papéis do IAM são necessários para executar os exemplos nesta página. Dependendo das políticas da organização, essas funções já podem ter sido concedidas. Para verificar as concessões de papéis, consulte Você precisa conceder papéis?.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Papéis do usuário
Para receber as permissões necessárias para usar o Managed Service for Apache Spark, peça ao administrador para conceder a você os seguintes papéis do IAM:
-
Editor do Dataproc (
roles/dataproc.editor) no projeto -
Usuário da conta de serviço (
roles/iam.serviceAccountUser) na conta de serviço padrão do Compute Engine
Papéis da conta de serviço
Para garantir que a conta de serviço padrão do Compute Engine tenha as permissões necessárias para enviar jobs do Managed Service para Apache Spark e acessar secrets, peça ao administrador para conceder os seguintes papéis do IAM à conta de serviço padrão do Compute Engine no projeto:
-
Envie um job:
Worker do Dataproc (
roles/dataproc.worker) -
Acessar secrets:
Acessador de secrets do Secret Manager (
roles/secretmanager.secretAccessor)
Práticas recomendadas do Gemini
Siga estas práticas recomendadas:
- Antes de iniciar o Gemini, configure o ambiente de desenvolvimento de código no seu projeto, incluindo variáveis de ambiente e ambientes virtuais do Python.
- Salve as diretrizes, o estilo de programação e a arquitetura do projeto em um arquivo
Gemini.md. - Peça ao Gemini para desenvolver um plano.
- Ative os checkpoints para reverter a um estado anterior.
- Use comandos específicos que definam a linguagem de programação, a estrutura, as bibliotecas e os resultados. Use o símbolo
@para referenciar arquivos da base de código. - Execute comandos do shell na CLI do Gemini usando o símbolo
!.
Migrar uma tabela do Hive para o BigQuery
Nesta seção, mostramos como pedir ao Gemini para gerar um script do PySpark que migra uma tabela do Hive para o BigQuery.
Insira o seguinte comando na CLI do Gemini:
Create a PySpark transform_hive_to_bigquery.py script to extract and transform a Hive table, adding an insertion_time column using the add_insertion_time_column function in @data_transformer.py. Save this table to BigQuery, providing detailed instructions to run this script against a dataproc cluster. Save a summary of this session to hive_to_BQReadme.md.A CLI do Gemini gera um arquivo
transform_hive_to_bigquery.pysemelhante a este:import argparse from pyspark.sql import SparkSession from data_transformer import add_insertion_time_column def transform_hive_to_bigquery(spark: SparkSession, hive_database: str, hive_table: str, bq_table: str, bq_temp_gcs_bucket: str): """ Reads a Hive table, adds an insertion_time column, and writes to a BigQuery table. Args: spark: The SparkSession object. hive_database: The name of the source Hive database. hive_table: The name of the source Hive table. bq_table: The destination BigQuery table (e.g., 'dataset.table'). bq_temp_gcs_bucket: The Cloud Storage bucket for temporary BigQuery connector data. """ # Read data from Hive table. input_df = spark.table(f'{hive_database}.{hive_table}') # Add the insertion time column. transformed_df = add_insertion_time_column(input_df) # Write the transformed data to BigQuery. transformed_df.write \ .format('bigquery') \ .option('table', bq_table) \ .option('temporaryGcsBucket', bq_temp_gcs_bucket) \ .mode('append') \ .save() print(f"Data successfully written to BigQuery table: {bq_table}") if __name__ == '__main__': parser = argparse.ArgumentParser(description='PySpark Hive to BigQuery transformation script') parser.add_argument('--hive_database', required=True, help='Source Hive database') parser.add_argument('--hive_table', required=True, help='Source Hive table') parser.add_argument('--bq_table', required=True, help='Destination BigQuery table (dataset.table)') parser.add_argument('--bq_temp_gcs_bucket', required=True, help='Cloud Storage bucket for temporary BigQuery connector data') args = parser.parse_args() # Create a SparkSession with Hive support. spark = SparkSession.builder \ .appName('Hive to BigQuery Transformation') \ .enableHiveSupport() \ .getOrCreate() transform_hive_to_bigquery(spark, args.hive_database, args.hive_table, args.bq_table, args.bq_temp_gcs_bucket) spark.stop()Faça upload do arquivo
transform_hive_to_bigquery.pypara um bucket do Cloud Storage. Consulte Fazer upload de um objeto para um bucket para instruções.Execute o comando a seguir para enviar o job a um cluster do serviço gerenciado para Apache Spark:
gcloud dataproc jobs submit pyspark gs://BUCKET_NAME/transform_hive_to_bigquery.py \ --cluster=CLUSTER_NAME --py-files=gs://BUCKET_NAME/data_transformer.py \ --properties=spark.hadoop.hive.metastore.uris=METASTORE_URI \ -- --hive_database=HIVE_DATABASE --hive_table=HIVE_TABLE --bq_table=BQ_DATASET.BQ_TABLE \ --bq_temp_gcs_bucket=BUCKET_NAME
Migrar uma tabela do Postgres para o MySQL
Nesta seção, mostramos como pedir ao Gemini para gerar um aplicativo Java Spark que migra uma tabela de um banco de dados Postgres para um banco de dados MySQL.
Insira o seguinte comando na CLI do Gemini:
Create a Spark `PostgresToMySql.java` job to migrate data from a table in a Postgres database to a table in MySQL, both accessible via JDBC. The JDBC URL strings are stored in Secret Manager. The URL string includes the username and password. Read and write data in parallel based on user-provided partitioning information. Write data in batches for efficiency. Use the addInsertionTimeColumn to add a column to the data before writing it to the MySQL destination table. Provide instructions to run this job on Managed Service for Apache Spark in migrateJdbcToJdbc.md. Provide a summary of the job in migration-README.mdO Gemini examina seu diretório e cria o aplicativo Java no pacote adequado junto com um arquivo
pom.xmlpara criar o aplicativo. O arquivoPostgresToMySql.javaserá semelhante a este:package com.customer.app; import com.customer.util.DataframeUtils; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretVersionName; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import java.io.IOException; public class PostgresToMySql { public static void main(String[] args) { if (args.length != 6) { System.err.println("Usage: PostgresToMySql <postgres.table> <mysql.table> <postgres.secret.id> <mysql.secret.id> <partition.column> <batch.size> <project.id>"); System.exit(1); } String postgresTable = args[0]; String mysqlTable = args[1]; String postgresSecretId = args[2]; String mysqlSecretId = args[3]; String partitionColumn = args[4]; String batchSize = args[5]; String projectId = args[6]; SparkSession spark = SparkSession.builder() .appName("Postgres to MySQL JDBC Migration") .getOrCreate(); String postgresUrl = getSecret(projectId, postgresSecretId); String mysqlUrl = getSecret(projectId, mysqlSecretId); // Read data from Postgres in parallel. Dataset<Row> df = spark.read() .format("jdbc") .option("url", postgresUrl) .option("dbtable", postgresTable) .option("partitionColumn", partitionColumn) .option("lowerBound", "2012-01-03") .option("upperBound", "2025-10-31") // These bounds should be configured based on data .option("numPartitions", "1") .option("driver", "org.postgresql.Driver") .load(); // Add insertion time column. Dataset<Row> transformedDf = DataframeUtils.addInsertionTimeColumn(df); // Write data to MySQL in batches. transformedDf.write() .format("jdbc") .option("url", mysqlUrl) .option("dbtable", mysqlTable) .option("batchsize", Integer.parseInt(batchSize)) .option("driver", "com.mysql.cj.jdbc.Driver") .mode(SaveMode.Overwrite) .save(); spark.stop(); } private static String getSecret(String secretId) { try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest"); return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8(); } catch (IOException e) { throw new RuntimeException("Failed to retrieve secret", e); } } }Observações:
- Defina o
lowerBounde oupperBoundcom base nos seus dados. - Se o
pom.xmlgerado não funcionar corretamente, use este pom.xml do GitHub.
- Defina o
Faça o upload do arquivo
postgres-to-mysql-migration-VERSION.jarpara um bucket do Cloud Storage. Consulte Fazer upload de um objeto para um bucket para instruções.Execute o comando a seguir para enviar o job ao cluster do Managed Service for Apache Spark:
gcloud dataproc jobs submit spark --cluster=CLUSTER_NAME --class=com.customer.app.PostgresToMySql \ --jars=BUCKET/postgres-to-mysql-migration-VERSION.jar \ -- POSTGRES_TABLE MYSQL-TABLE \ POSTGRES_SECRET MYSQL-SECRET COLUMN BATCH_SIZE