使用 Gemini 开发 Spark 应用

本页面介绍了如何使用 Gemini CLI 开发 Apache Spark 应用,然后将其提交到 Managed Service for Apache Spark 服务。

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

所需的角色

您需要拥有某些 IAM 角色才能运行本页上的示例。这些角色可能已获授予,具体取决于组织政策。如需检查角色授予情况,请参阅您是否需要授予角色?

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

用户角色

如需获得使用 Managed Service for Apache Spark 服务所需的权限,请让您的管理员为您授予以下 IAM 角色:

服务账号角色

为确保 Compute Engine 默认服务账号具有提交 Managed Service for Apache Spark 作业和访问 Secret 所需的权限,请让您的管理员为 Compute Engine 默认服务账号授予项目的以下 IAM 角色:

Gemini 最佳实践

请遵循以下最佳做法:

  • 在启动 Gemini 之前,请在项目中设置代码开发环境,包括环境变量和 Python 虚拟环境。
  • 将项目指南、编码风格和架构保存在 Gemini.md 文件中。
  • 问问 Gemini 制定方案。
  • 启用检查点以回滚到之前的状态。
  • 使用可定义代码语言、框架、库和结果的具体提示。使用 @ 符号引用代码库文件。
  • 使用 ! 符号从 Gemini CLI 中运行 shell 命令。

将 Hive 表迁移到 BigQuery

本部分展示了如何提示 Gemini 生成将 Hive 表迁移到 BigQuery 的 PySpark 脚本。

  1. 向 Gemini CLI 输入以下提示:

    Create a PySpark transform_hive_to_bigquery.py script to extract and transform a Hive
    table, adding an insertion_time column using the add_insertion_time_column
    function in @data_transformer.py. Save this table to BigQuery,
    providing detailed instructions to run this script against a dataproc cluster.
    Save a summary of this session to hive_to_BQReadme.md.
    

    Gemini CLI 会生成类似于以下内容的 transform_hive_to_bigquery.py 文件:

      import argparse
      from pyspark.sql import SparkSession
      from data_transformer import add_insertion_time_column
    
      def transform_hive_to_bigquery(spark: SparkSession, hive_database: str, hive_table: str, bq_table: str, bq_temp_gcs_bucket: str):
          """
          Reads a Hive table, adds an insertion_time column, and writes to a BigQuery table.
    
          Args:
              spark: The SparkSession object.
              hive_database: The name of the source Hive database.
              hive_table: The name of the source Hive table.
              bq_table: The destination BigQuery table (e.g., 'dataset.table').
              bq_temp_gcs_bucket: The Cloud Storage bucket for temporary BigQuery
              connector data.
          """
          # Read data from Hive table.
          input_df = spark.table(f'{hive_database}.{hive_table}')
    
          # Add the insertion time column.
          transformed_df = add_insertion_time_column(input_df)
    
          # Write the transformed data to BigQuery.
          transformed_df.write \
              .format('bigquery') \
              .option('table', bq_table) \
              .option('temporaryGcsBucket', bq_temp_gcs_bucket) \
              .mode('append') \
              .save()
    
          print(f"Data successfully written to BigQuery table: {bq_table}")
    
      if __name__ == '__main__':
          parser = argparse.ArgumentParser(description='PySpark Hive to BigQuery transformation script')
          parser.add_argument('--hive_database', required=True, help='Source Hive database')
          parser.add_argument('--hive_table', required=True, help='Source Hive table')
          parser.add_argument('--bq_table', required=True, help='Destination BigQuery table (dataset.table)')
          parser.add_argument('--bq_temp_gcs_bucket', required=True, help='Cloud Storage bucket for temporary BigQuery connector data')
          args = parser.parse_args()
    
          # Create a SparkSession with Hive support.
          spark = SparkSession.builder \
              .appName('Hive to BigQuery Transformation') \
              .enableHiveSupport() \
              .getOrCreate()
    
          transform_hive_to_bigquery(spark, args.hive_database, args.hive_table, args.bq_table, args.bq_temp_gcs_bucket)
    
          spark.stop()
    
  2. transform_hive_to_bigquery.py 文件上传到 Cloud Storage 存储桶(有关说明,请参阅将对象上传到存储桶)。

  3. 运行以下命令,将作业提交到 Managed Service for Apache Spark 集群:

    gcloud dataproc jobs submit pyspark gs://BUCKET_NAME/transform_hive_to_bigquery.py \
        --cluster=CLUSTER_NAME
        --py-files=gs://BUCKET_NAME/data_transformer.py \
        --properties=spark.hadoop.hive.metastore.uris=METASTORE_URI \
        -- --hive_database=HIVE_DATABASE --hive_table=HIVE_TABLE --bq_table=BQ_DATASET.BQ_TABLE \
        --bq_temp_gcs_bucket=BUCKET_NAME
    

将表从 Postgres 迁移到 MySQL

本部分展示了如何提示 Gemini 生成一个 Java Spark 应用,该应用可将表从 Postgres 数据库迁移到 MySQL 数据库。

  1. 向 Gemini CLI 输入以下提示:

    Create a Spark `PostgresToMySql.java` job to migrate data from a table in a
    Postgres database to a table in MySQL, both accessible via JDBC. The JDBC
    URL strings are stored in Secret Manager. The URL string includes
    the username and password. Read and write data in parallel based on
    user-provided partitioning information. Write data in batches for efficiency.
    Use the addInsertionTimeColumn to add a column to the data before writing it
    to the MySQL destination table. Provide instructions to run this job on
    Managed Service for Apache Spark in migrateJdbcToJdbc.md. Provide a
    summary of the job in migration-README.md
    

    Gemini 会检查您的目录,并在相应的软件包中创建 Java 应用,同时创建一个 pom.xml 文件来构建应用。PostgresToMySql.java 文件将类似于以下内容:

    package com.customer.app;
    
    import com.customer.util.DataframeUtils;
    import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
    import com.google.cloud.secretmanager.v1.SecretVersionName;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.IOException;
    
    public class PostgresToMySql {
    
        public static void main(String[] args) {
            if (args.length != 6) {
                System.err.println("Usage: PostgresToMySql <postgres.table> <mysql.table> <postgres.secret.id> <mysql.secret.id> <partition.column> <batch.size> <project.id>");
                System.exit(1);
            }
    
            String postgresTable = args[0];
            String mysqlTable = args[1];
            String postgresSecretId = args[2];
            String mysqlSecretId = args[3];
            String partitionColumn = args[4];
            String batchSize = args[5];
            String projectId = args[6];
    
            SparkSession spark = SparkSession.builder()
                    .appName("Postgres to MySQL JDBC Migration")
                    .getOrCreate();
    
            String postgresUrl = getSecret(projectId, postgresSecretId);
            String mysqlUrl = getSecret(projectId, mysqlSecretId);
    
            // Read data from Postgres in parallel.
            Dataset<Row> df = spark.read()
                    .format("jdbc")
                    .option("url", postgresUrl)
                    .option("dbtable", postgresTable)
                    .option("partitionColumn", partitionColumn)
                    .option("lowerBound", "2012-01-03")
                    .option("upperBound", "2025-10-31") // These bounds should be configured based on data
                    .option("numPartitions", "1")
                    .option("driver", "org.postgresql.Driver")
                    .load();
    
            // Add insertion time column.
            Dataset<Row> transformedDf = DataframeUtils.addInsertionTimeColumn(df);
    
            // Write data to MySQL in batches.
            transformedDf.write()
                    .format("jdbc")
                    .option("url", mysqlUrl)
                    .option("dbtable", mysqlTable)
                    .option("batchsize", Integer.parseInt(batchSize))
                    .option("driver", "com.mysql.cj.jdbc.Driver")
                    .mode(SaveMode.Overwrite)
                    .save();
    
            spark.stop();
        }
    
        private static String getSecret(String secretId) {
            try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
                SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest");
                return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8();
            } catch (IOException e) {
                throw new RuntimeException("Failed to retrieve secret", e);
            }
        }
    }
    

    注意:

    • 根据您的数据设置 lowerBoundupperBound
    • 如果生成的 pom.xml 无法正常运行,请使用此 GitHub pom.xml
  2. postgres-to-mysql-migration-VERSION.jar 文件上传到 Cloud Storage 存储桶(如需相关说明,请参阅将对象上传到存储桶)。

  3. 运行以下命令,将作业提交到 Managed Service for Apache Spark 集群:

    gcloud dataproc jobs submit spark
        --cluster=CLUSTER_NAME
        --class=com.customer.app.PostgresToMySql \
        --jars=BUCKET/postgres-to-mysql-migration-VERSION.jar \
        -- POSTGRES_TABLE MYSQL-TABLE \
        POSTGRES_SECRET MYSQL-SECRET COLUMN BATCH_SIZE