Usar o Gemini para desenvolver aplicativos Spark

Nesta página, mostramos como usar a CLI do Gemini para desenvolver aplicativos do Apache Spark e enviá-los ao Managed Service para Apache Spark.

Antes de começar

  1. Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Funções exigidas

Alguns papéis do IAM são necessários para executar os exemplos nesta página. Dependendo das políticas da organização, essas funções já podem ter sido concedidas. Para verificar as concessões de papéis, consulte Você precisa conceder papéis?.

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Papéis do usuário

Para receber as permissões necessárias para usar o Managed Service for Apache Spark, peça ao administrador para conceder a você os seguintes papéis do IAM:

Papéis da conta de serviço

Para garantir que a conta de serviço padrão do Compute Engine tenha as permissões necessárias para enviar jobs do Managed Service para Apache Spark e acessar secrets, peça ao administrador para conceder os seguintes papéis do IAM à conta de serviço padrão do Compute Engine no projeto:

Práticas recomendadas do Gemini

Siga estas práticas recomendadas:

  • Antes de iniciar o Gemini, configure o ambiente de desenvolvimento de código no seu projeto, incluindo variáveis de ambiente e ambientes virtuais do Python.
  • Salve as diretrizes, o estilo de programação e a arquitetura do projeto em um arquivo Gemini.md.
  • Peça ao Gemini para desenvolver um plano.
  • Ative os checkpoints para reverter a um estado anterior.
  • Use comandos específicos que definam a linguagem de programação, a estrutura, as bibliotecas e os resultados. Use o símbolo @ para referenciar arquivos da base de código.
  • Execute comandos do shell na CLI do Gemini usando o símbolo !.

Migrar uma tabela do Hive para o BigQuery

Nesta seção, mostramos como pedir ao Gemini para gerar um script do PySpark que migra uma tabela do Hive para o BigQuery.

  1. Insira o seguinte comando na CLI do Gemini:

    Create a PySpark transform_hive_to_bigquery.py script to extract and transform a Hive
    table, adding an insertion_time column using the add_insertion_time_column
    function in @data_transformer.py. Save this table to BigQuery,
    providing detailed instructions to run this script against a dataproc cluster.
    Save a summary of this session to hive_to_BQReadme.md.
    

    A CLI do Gemini gera um arquivo transform_hive_to_bigquery.py semelhante a este:

      import argparse
      from pyspark.sql import SparkSession
      from data_transformer import add_insertion_time_column
    
      def transform_hive_to_bigquery(spark: SparkSession, hive_database: str, hive_table: str, bq_table: str, bq_temp_gcs_bucket: str):
          """
          Reads a Hive table, adds an insertion_time column, and writes to a BigQuery table.
    
          Args:
              spark: The SparkSession object.
              hive_database: The name of the source Hive database.
              hive_table: The name of the source Hive table.
              bq_table: The destination BigQuery table (e.g., 'dataset.table').
              bq_temp_gcs_bucket: The Cloud Storage bucket for temporary BigQuery
              connector data.
          """
          # Read data from Hive table.
          input_df = spark.table(f'{hive_database}.{hive_table}')
    
          # Add the insertion time column.
          transformed_df = add_insertion_time_column(input_df)
    
          # Write the transformed data to BigQuery.
          transformed_df.write \
              .format('bigquery') \
              .option('table', bq_table) \
              .option('temporaryGcsBucket', bq_temp_gcs_bucket) \
              .mode('append') \
              .save()
    
          print(f"Data successfully written to BigQuery table: {bq_table}")
    
      if __name__ == '__main__':
          parser = argparse.ArgumentParser(description='PySpark Hive to BigQuery transformation script')
          parser.add_argument('--hive_database', required=True, help='Source Hive database')
          parser.add_argument('--hive_table', required=True, help='Source Hive table')
          parser.add_argument('--bq_table', required=True, help='Destination BigQuery table (dataset.table)')
          parser.add_argument('--bq_temp_gcs_bucket', required=True, help='Cloud Storage bucket for temporary BigQuery connector data')
          args = parser.parse_args()
    
          # Create a SparkSession with Hive support.
          spark = SparkSession.builder \
              .appName('Hive to BigQuery Transformation') \
              .enableHiveSupport() \
              .getOrCreate()
    
          transform_hive_to_bigquery(spark, args.hive_database, args.hive_table, args.bq_table, args.bq_temp_gcs_bucket)
    
          spark.stop()
    
  2. Faça upload do arquivo transform_hive_to_bigquery.py para um bucket do Cloud Storage. Consulte Fazer upload de um objeto para um bucket para instruções.

  3. Execute o comando a seguir para enviar o job a um cluster do serviço gerenciado para Apache Spark:

    gcloud dataproc jobs submit pyspark gs://BUCKET_NAME/transform_hive_to_bigquery.py \
        --cluster=CLUSTER_NAME
        --py-files=gs://BUCKET_NAME/data_transformer.py \
        --properties=spark.hadoop.hive.metastore.uris=METASTORE_URI \
        -- --hive_database=HIVE_DATABASE --hive_table=HIVE_TABLE --bq_table=BQ_DATASET.BQ_TABLE \
        --bq_temp_gcs_bucket=BUCKET_NAME
    

Migrar uma tabela do Postgres para o MySQL

Nesta seção, mostramos como pedir ao Gemini para gerar um aplicativo Java Spark que migra uma tabela de um banco de dados Postgres para um banco de dados MySQL.

  1. Insira o seguinte comando na CLI do Gemini:

    Create a Spark `PostgresToMySql.java` job to migrate data from a table in a
    Postgres database to a table in MySQL, both accessible via JDBC. The JDBC
    URL strings are stored in Secret Manager. The URL string includes
    the username and password. Read and write data in parallel based on
    user-provided partitioning information. Write data in batches for efficiency.
    Use the addInsertionTimeColumn to add a column to the data before writing it
    to the MySQL destination table. Provide instructions to run this job on
    Managed Service for Apache Spark in migrateJdbcToJdbc.md. Provide a
    summary of the job in migration-README.md
    

    O Gemini examina seu diretório e cria o aplicativo Java no pacote adequado junto com um arquivo pom.xml para criar o aplicativo. O arquivo PostgresToMySql.java será semelhante a este:

    package com.customer.app;
    
    import com.customer.util.DataframeUtils;
    import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
    import com.google.cloud.secretmanager.v1.SecretVersionName;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.IOException;
    
    public class PostgresToMySql {
    
        public static void main(String[] args) {
            if (args.length != 6) {
                System.err.println("Usage: PostgresToMySql <postgres.table> <mysql.table> <postgres.secret.id> <mysql.secret.id> <partition.column> <batch.size> <project.id>");
                System.exit(1);
            }
    
            String postgresTable = args[0];
            String mysqlTable = args[1];
            String postgresSecretId = args[2];
            String mysqlSecretId = args[3];
            String partitionColumn = args[4];
            String batchSize = args[5];
            String projectId = args[6];
    
            SparkSession spark = SparkSession.builder()
                    .appName("Postgres to MySQL JDBC Migration")
                    .getOrCreate();
    
            String postgresUrl = getSecret(projectId, postgresSecretId);
            String mysqlUrl = getSecret(projectId, mysqlSecretId);
    
            // Read data from Postgres in parallel.
            Dataset<Row> df = spark.read()
                    .format("jdbc")
                    .option("url", postgresUrl)
                    .option("dbtable", postgresTable)
                    .option("partitionColumn", partitionColumn)
                    .option("lowerBound", "2012-01-03")
                    .option("upperBound", "2025-10-31") // These bounds should be configured based on data
                    .option("numPartitions", "1")
                    .option("driver", "org.postgresql.Driver")
                    .load();
    
            // Add insertion time column.
            Dataset<Row> transformedDf = DataframeUtils.addInsertionTimeColumn(df);
    
            // Write data to MySQL in batches.
            transformedDf.write()
                    .format("jdbc")
                    .option("url", mysqlUrl)
                    .option("dbtable", mysqlTable)
                    .option("batchsize", Integer.parseInt(batchSize))
                    .option("driver", "com.mysql.cj.jdbc.Driver")
                    .mode(SaveMode.Overwrite)
                    .save();
    
            spark.stop();
        }
    
        private static String getSecret(String secretId) {
            try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
                SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest");
                return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8();
            } catch (IOException e) {
                throw new RuntimeException("Failed to retrieve secret", e);
            }
        }
    }
    

    Observações:

    • Defina o lowerBound e o upperBound com base nos seus dados.
    • Se o pom.xml gerado não funcionar corretamente, use este pom.xml do GitHub.
  2. Faça o upload do arquivo postgres-to-mysql-migration-VERSION.jar para um bucket do Cloud Storage. Consulte Fazer upload de um objeto para um bucket para instruções.

  3. Execute o comando a seguir para enviar o job ao cluster do Managed Service for Apache Spark:

    gcloud dataproc jobs submit spark
        --cluster=CLUSTER_NAME
        --class=com.customer.app.PostgresToMySql \
        --jars=BUCKET/postgres-to-mysql-migration-VERSION.jar \
        -- POSTGRES_TABLE MYSQL-TABLE \
        POSTGRES_SECRET MYSQL-SECRET COLUMN BATCH_SIZE