Usa Gemini para desarrollar aplicaciones de Spark

En esta página, se muestra cómo usar la CLI de Gemini para desarrollar aplicaciones de Apache Spark y, luego, enviarlas al servicio de Managed Service for Apache Spark.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud . Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Roles obligatorios

Se requieren ciertos roles de IAM para ejecutar los ejemplos de esta página. Según las políticas de la organización, es posible que estos roles ya se hayan otorgado. Para verificar las asignaciones de roles, consulta ¿Necesitas otorgar roles?.

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Funciones de usuario

Para obtener los permisos que necesitas para usar el servicio de Managed Service for Apache Spark, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Roles de la cuenta de servicio

Para asegurarte de que la cuenta de servicio predeterminada de Compute Engine tenga los permisos necesarios para enviar trabajos de Managed Service for Apache Spark y acceder a secretos, pídele a tu administrador que otorgue los siguientes roles de IAM a la cuenta de servicio predeterminada de Compute Engine en el proyecto:

Prácticas recomendadas de Gemini

Sigue las prácticas recomendadas a continuación:

  • Antes de lanzar Gemini, configura tu entorno de desarrollo de código en tu proyecto, incluidas las variables de entorno y los entornos virtuales de Python.
  • Guarda las instrucciones, el estilo de codificación y la arquitectura del proyecto en un archivo Gemini.md.
  • Pídele a Gemini que desarrolle un plan.
  • Habilita los puntos de control para revertir el notebook a un estado anterior.
  • Usa instrucciones específicas que definan el lenguaje de código, el framework, las bibliotecas y los resultados. Usa el símbolo @ para hacer referencia a los archivos de la base de código.
  • Ejecuta comandos de shell desde Gemini CLI con el símbolo !.

Migra una tabla de Hive a BigQuery

En esta sección, se muestra cómo solicitarle a Gemini que genere un script de PySpark que migre una tabla de Hive a BigQuery.

  1. Ingresa la siguiente instrucción en Gemini CLI:

    Create a PySpark transform_hive_to_bigquery.py script to extract and transform a Hive
    table, adding an insertion_time column using the add_insertion_time_column
    function in @data_transformer.py. Save this table to BigQuery,
    providing detailed instructions to run this script against a dataproc cluster.
    Save a summary of this session to hive_to_BQReadme.md.
    

    Gemini CLI genera un archivo transform_hive_to_bigquery.py similar al siguiente:

      import argparse
      from pyspark.sql import SparkSession
      from data_transformer import add_insertion_time_column
    
      def transform_hive_to_bigquery(spark: SparkSession, hive_database: str, hive_table: str, bq_table: str, bq_temp_gcs_bucket: str):
          """
          Reads a Hive table, adds an insertion_time column, and writes to a BigQuery table.
    
          Args:
              spark: The SparkSession object.
              hive_database: The name of the source Hive database.
              hive_table: The name of the source Hive table.
              bq_table: The destination BigQuery table (e.g., 'dataset.table').
              bq_temp_gcs_bucket: The Cloud Storage bucket for temporary BigQuery
              connector data.
          """
          # Read data from Hive table.
          input_df = spark.table(f'{hive_database}.{hive_table}')
    
          # Add the insertion time column.
          transformed_df = add_insertion_time_column(input_df)
    
          # Write the transformed data to BigQuery.
          transformed_df.write \
              .format('bigquery') \
              .option('table', bq_table) \
              .option('temporaryGcsBucket', bq_temp_gcs_bucket) \
              .mode('append') \
              .save()
    
          print(f"Data successfully written to BigQuery table: {bq_table}")
    
      if __name__ == '__main__':
          parser = argparse.ArgumentParser(description='PySpark Hive to BigQuery transformation script')
          parser.add_argument('--hive_database', required=True, help='Source Hive database')
          parser.add_argument('--hive_table', required=True, help='Source Hive table')
          parser.add_argument('--bq_table', required=True, help='Destination BigQuery table (dataset.table)')
          parser.add_argument('--bq_temp_gcs_bucket', required=True, help='Cloud Storage bucket for temporary BigQuery connector data')
          args = parser.parse_args()
    
          # Create a SparkSession with Hive support.
          spark = SparkSession.builder \
              .appName('Hive to BigQuery Transformation') \
              .enableHiveSupport() \
              .getOrCreate()
    
          transform_hive_to_bigquery(spark, args.hive_database, args.hive_table, args.bq_table, args.bq_temp_gcs_bucket)
    
          spark.stop()
    
  2. Sube el archivo transform_hive_to_bigquery.py a un bucket de Cloud Storage (consulta Cómo subir un objeto a un bucket para obtener instrucciones).

  3. Ejecuta el siguiente comando para enviar el trabajo a un clúster de Managed Service for Apache Spark:

    gcloud dataproc jobs submit pyspark gs://BUCKET_NAME/transform_hive_to_bigquery.py \
        --cluster=CLUSTER_NAME
        --py-files=gs://BUCKET_NAME/data_transformer.py \
        --properties=spark.hadoop.hive.metastore.uris=METASTORE_URI \
        -- --hive_database=HIVE_DATABASE --hive_table=HIVE_TABLE --bq_table=BQ_DATASET.BQ_TABLE \
        --bq_temp_gcs_bucket=BUCKET_NAME
    

Migra una tabla de PostgreSQL a MySQL

En esta sección, se muestra cómo solicitarle a Gemini que genere una aplicación de Java Spark que migre una tabla de una base de datos de Postgres a una base de datos de MySQL.

  1. Ingresa la siguiente instrucción en Gemini CLI:

    Create a Spark `PostgresToMySql.java` job to migrate data from a table in a
    Postgres database to a table in MySQL, both accessible via JDBC. The JDBC
    URL strings are stored in Secret Manager. The URL string includes
    the username and password. Read and write data in parallel based on
    user-provided partitioning information. Write data in batches for efficiency.
    Use the addInsertionTimeColumn to add a column to the data before writing it
    to the MySQL destination table. Provide instructions to run this job on
    Managed Service for Apache Spark in migrateJdbcToJdbc.md. Provide a
    summary of the job in migration-README.md
    

    Gemini examina tu directorio y crea la aplicación de Java en el paquete adecuado junto con un archivo pom.xml para compilar la aplicación. El archivo PostgresToMySql.java será similar al siguiente:

    package com.customer.app;
    
    import com.customer.util.DataframeUtils;
    import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
    import com.google.cloud.secretmanager.v1.SecretVersionName;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.IOException;
    
    public class PostgresToMySql {
    
        public static void main(String[] args) {
            if (args.length != 6) {
                System.err.println("Usage: PostgresToMySql <postgres.table> <mysql.table> <postgres.secret.id> <mysql.secret.id> <partition.column> <batch.size> <project.id>");
                System.exit(1);
            }
    
            String postgresTable = args[0];
            String mysqlTable = args[1];
            String postgresSecretId = args[2];
            String mysqlSecretId = args[3];
            String partitionColumn = args[4];
            String batchSize = args[5];
            String projectId = args[6];
    
            SparkSession spark = SparkSession.builder()
                    .appName("Postgres to MySQL JDBC Migration")
                    .getOrCreate();
    
            String postgresUrl = getSecret(projectId, postgresSecretId);
            String mysqlUrl = getSecret(projectId, mysqlSecretId);
    
            // Read data from Postgres in parallel.
            Dataset<Row> df = spark.read()
                    .format("jdbc")
                    .option("url", postgresUrl)
                    .option("dbtable", postgresTable)
                    .option("partitionColumn", partitionColumn)
                    .option("lowerBound", "2012-01-03")
                    .option("upperBound", "2025-10-31") // These bounds should be configured based on data
                    .option("numPartitions", "1")
                    .option("driver", "org.postgresql.Driver")
                    .load();
    
            // Add insertion time column.
            Dataset<Row> transformedDf = DataframeUtils.addInsertionTimeColumn(df);
    
            // Write data to MySQL in batches.
            transformedDf.write()
                    .format("jdbc")
                    .option("url", mysqlUrl)
                    .option("dbtable", mysqlTable)
                    .option("batchsize", Integer.parseInt(batchSize))
                    .option("driver", "com.mysql.cj.jdbc.Driver")
                    .mode(SaveMode.Overwrite)
                    .save();
    
            spark.stop();
        }
    
        private static String getSecret(String secretId) {
            try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
                SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest");
                return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8();
            } catch (IOException e) {
                throw new RuntimeException("Failed to retrieve secret", e);
            }
        }
    }
    

    Notas:

    • Establece lowerBound y upperBound según tus datos.
    • Si el pom.xml generado no funciona correctamente, usa este pom.xml de GitHub.
  2. Sube el archivo postgres-to-mysql-migration-VERSION.jar a un bucket de Cloud Storage (consulta Sube un objeto a un bucket para obtener instrucciones).

  3. Ejecuta el siguiente comando para enviar el trabajo a tu clúster del servicio administrado para Apache Spark:

    gcloud dataproc jobs submit spark
        --cluster=CLUSTER_NAME
        --class=com.customer.app.PostgresToMySql \
        --jars=BUCKET/postgres-to-mysql-migration-VERSION.jar \
        -- POSTGRES_TABLE MYSQL-TABLE \
        POSTGRES_SECRET MYSQL-SECRET COLUMN BATCH_SIZE