Gemini を使用して Spark アプリケーションを開発する

このページでは、Gemini CLI を使用して Apache Spark アプリケーションを開発し、Managed Service for Apache Spark サービスに送信する方法について説明します。

始める前に

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Managed Service for Apache Spark, Cloud Storage, and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

必要なロール

このページの例を実行するには、特定の IAM ロールが必要です。組織のポリシーによっては、これらのロールがすでに付与されている場合があります。ロール付与を確認するには、ロールを付与する必要がありますか?をご覧ください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

ユーザーロール

Managed Service for Apache Spark サービスの使用に必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

サービス アカウントのロール

Compute Engine のデフォルト サービス アカウントに、Managed Service for Apache Spark ジョブを送信してシークレットにアクセスするために必要な権限を付与するには、プロジェクトに対する次の IAM ロールを Compute Engine のデフォルト サービス アカウントに付与するよう管理者に依頼してください。

Gemini のベスト プラクティス

次のベスト プラクティスに従ってください。

  • Gemini を起動する前に、環境変数や Python 仮想環境など、プロジェクトでコード開発環境を設定します。
  • プロジェクトのガイドライン、コーディング スタイル、アーキテクチャを Gemini.md ファイルに保存します。
  • Gemini に相談して計画を作成します。
  • チェックポイントを有効にして、前の状態にロールバックします。
  • コード言語、フレームワーク、ライブラリ、結果を定義する特定のプロンプトを使用します。コードベース ファイルを参照するには、@ 記号を使用します。
  • ! 記号を使用して、Gemini CLI 内からシェル コマンドを実行します。

Hive テーブルを BigQuery に移行する

このセクションでは、Gemini に Hive テーブルを BigQuery に移行する PySpark スクリプトを生成するようにプロンプトを表示する方法について説明します。

  1. Gemini CLI に次のプロンプトを入力します。

    Create a PySpark transform_hive_to_bigquery.py script to extract and transform a Hive
    table, adding an insertion_time column using the add_insertion_time_column
    function in @data_transformer.py. Save this table to BigQuery,
    providing detailed instructions to run this script against a dataproc cluster.
    Save a summary of this session to hive_to_BQReadme.md.
    

    Gemini CLI は、次のような transform_hive_to_bigquery.py ファイルを生成します。

      import argparse
      from pyspark.sql import SparkSession
      from data_transformer import add_insertion_time_column
    
      def transform_hive_to_bigquery(spark: SparkSession, hive_database: str, hive_table: str, bq_table: str, bq_temp_gcs_bucket: str):
          """
          Reads a Hive table, adds an insertion_time column, and writes to a BigQuery table.
    
          Args:
              spark: The SparkSession object.
              hive_database: The name of the source Hive database.
              hive_table: The name of the source Hive table.
              bq_table: The destination BigQuery table (e.g., 'dataset.table').
              bq_temp_gcs_bucket: The Cloud Storage bucket for temporary BigQuery
              connector data.
          """
          # Read data from Hive table.
          input_df = spark.table(f'{hive_database}.{hive_table}')
    
          # Add the insertion time column.
          transformed_df = add_insertion_time_column(input_df)
    
          # Write the transformed data to BigQuery.
          transformed_df.write \
              .format('bigquery') \
              .option('table', bq_table) \
              .option('temporaryGcsBucket', bq_temp_gcs_bucket) \
              .mode('append') \
              .save()
    
          print(f"Data successfully written to BigQuery table: {bq_table}")
    
      if __name__ == '__main__':
          parser = argparse.ArgumentParser(description='PySpark Hive to BigQuery transformation script')
          parser.add_argument('--hive_database', required=True, help='Source Hive database')
          parser.add_argument('--hive_table', required=True, help='Source Hive table')
          parser.add_argument('--bq_table', required=True, help='Destination BigQuery table (dataset.table)')
          parser.add_argument('--bq_temp_gcs_bucket', required=True, help='Cloud Storage bucket for temporary BigQuery connector data')
          args = parser.parse_args()
    
          # Create a SparkSession with Hive support.
          spark = SparkSession.builder \
              .appName('Hive to BigQuery Transformation') \
              .enableHiveSupport() \
              .getOrCreate()
    
          transform_hive_to_bigquery(spark, args.hive_database, args.hive_table, args.bq_table, args.bq_temp_gcs_bucket)
    
          spark.stop()
    
  2. transform_hive_to_bigquery.py ファイルを Cloud Storage バケットにアップロードします(手順については、バケットにオブジェクトをアップロードするをご覧ください)。

  3. 次のコマンドを実行して、ジョブを Managed Service for Apache Spark クラスタに送信します。

    gcloud dataproc jobs submit pyspark gs://BUCKET_NAME/transform_hive_to_bigquery.py \
        --cluster=CLUSTER_NAME
        --py-files=gs://BUCKET_NAME/data_transformer.py \
        --properties=spark.hadoop.hive.metastore.uris=METASTORE_URI \
        -- --hive_database=HIVE_DATABASE --hive_table=HIVE_TABLE --bq_table=BQ_DATASET.BQ_TABLE \
        --bq_temp_gcs_bucket=BUCKET_NAME
    

Postgres から MySQL にテーブルを移行する

このセクションでは、Postgres データベースから MySQL データベースにテーブルを移行する Java Spark アプリケーションを生成するように Gemini にプロンプトを表示する方法について説明します。

  1. Gemini CLI に次のプロンプトを入力します。

    Create a Spark `PostgresToMySql.java` job to migrate data from a table in a
    Postgres database to a table in MySQL, both accessible via JDBC. The JDBC
    URL strings are stored in Secret Manager. The URL string includes
    the username and password. Read and write data in parallel based on
    user-provided partitioning information. Write data in batches for efficiency.
    Use the addInsertionTimeColumn to add a column to the data before writing it
    to the MySQL destination table. Provide instructions to run this job on
    Managed Service for Apache Spark in migrateJdbcToJdbc.md. Provide a
    summary of the job in migration-README.md
    

    Gemini はディレクトリを調べて、アプリケーションをビルドするための pom.xml ファイルとともに、適切なパッケージに Java アプリケーションを作成します。PostgresToMySql.java ファイルは次のようになります。

    package com.customer.app;
    
    import com.customer.util.DataframeUtils;
    import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
    import com.google.cloud.secretmanager.v1.SecretVersionName;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.IOException;
    
    public class PostgresToMySql {
    
        public static void main(String[] args) {
            if (args.length != 6) {
                System.err.println("Usage: PostgresToMySql <postgres.table> <mysql.table> <postgres.secret.id> <mysql.secret.id> <partition.column> <batch.size> <project.id>");
                System.exit(1);
            }
    
            String postgresTable = args[0];
            String mysqlTable = args[1];
            String postgresSecretId = args[2];
            String mysqlSecretId = args[3];
            String partitionColumn = args[4];
            String batchSize = args[5];
            String projectId = args[6];
    
            SparkSession spark = SparkSession.builder()
                    .appName("Postgres to MySQL JDBC Migration")
                    .getOrCreate();
    
            String postgresUrl = getSecret(projectId, postgresSecretId);
            String mysqlUrl = getSecret(projectId, mysqlSecretId);
    
            // Read data from Postgres in parallel.
            Dataset<Row> df = spark.read()
                    .format("jdbc")
                    .option("url", postgresUrl)
                    .option("dbtable", postgresTable)
                    .option("partitionColumn", partitionColumn)
                    .option("lowerBound", "2012-01-03")
                    .option("upperBound", "2025-10-31") // These bounds should be configured based on data
                    .option("numPartitions", "1")
                    .option("driver", "org.postgresql.Driver")
                    .load();
    
            // Add insertion time column.
            Dataset<Row> transformedDf = DataframeUtils.addInsertionTimeColumn(df);
    
            // Write data to MySQL in batches.
            transformedDf.write()
                    .format("jdbc")
                    .option("url", mysqlUrl)
                    .option("dbtable", mysqlTable)
                    .option("batchsize", Integer.parseInt(batchSize))
                    .option("driver", "com.mysql.cj.jdbc.Driver")
                    .mode(SaveMode.Overwrite)
                    .save();
    
            spark.stop();
        }
    
        private static String getSecret(String secretId) {
            try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
                SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest");
                return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8();
            } catch (IOException e) {
                throw new RuntimeException("Failed to retrieve secret", e);
            }
        }
    }
    

    注:

    • データに基づいて lowerBoundupperBound を設定します。
    • 生成された pom.xml が正しく動作しない場合は、この GitHub pom.xml を使用します。
  2. postgres-to-mysql-migration-VERSION.jar ファイルを Cloud Storage バケットにアップロードします(手順については、バケットにオブジェクトをアップロードするをご覧ください)。

  3. 次のコマンドを実行して、ジョブを Managed Service for Apache Spark クラスタに送信します。

    gcloud dataproc jobs submit spark
        --cluster=CLUSTER_NAME
        --class=com.customer.app.PostgresToMySql \
        --jars=BUCKET/postgres-to-mysql-migration-VERSION.jar \
        -- POSTGRES_TABLE MYSQL-TABLE \
        POSTGRES_SECRET MYSQL-SECRET COLUMN BATCH_SIZE