Use Gemini to develop Spark applications

This page shows you how to use Gemini CLI to develop Apache Spark applications and then submit them to the Dataproc service.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc, Cloud Storage, and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Cloud Storage, and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Required roles

Certain IAM roles are required to run the examples on this page. Depending on organization policies, these roles may have already been granted. To check role grants, see Do you need to grant roles?.

For more information about granting roles, see Manage access to projects, folders, and organizations.

User roles

To get the permissions that you need to use the Dataproc service, ask your administrator to grant you the following IAM roles :

Service account roles

To ensure that the Compute Engine default service account has the necessary permissions to submit Dataproc jobs and access secrets, ask your administrator to grant the following IAM roles to the Compute Engine default service account on the project:

Gemini best practices

Follow these best practices:

  • Before you launch Gemini, set up your code development environment in your project, including environment variables and Python virtual environments.
  • Save project guidelines, coding style, and architecture, in a Gemini.md file.
  • Ask Gemini to develop a plan.
  • Enable checkpoints to roll back to a previous state.
  • Use specific prompts that define code language, framework, libraries, and results. Use the @ symbol to reference codebase files.
  • Run shell commands from within the Gemini CLI using the ! symbol.

Migrate a Hive table to BigQuery

This section shows how to prompt Gemini to generate a PySpark script that migrates a Hive table to BigQuery.

  1. Input the following prompt to Gemini CLI:

    Create a PySpark transform_hive_to_bigquery.py script to extract and transform a Hive
    table, adding an insertion_time column using the add_insertion_time_column
    function in @data_transformer.py. Save this table to BigQuery,
    providing detailed instructions to run this script against a dataproc cluster.
    Save a summary of this session to hive_to_BQReadme.md.
    

    Gemini CLI generates a transform_hive_to_bigquery.py file that is similar to the following:

      import argparse
      from pyspark.sql import SparkSession
      from data_transformer import add_insertion_time_column
    
      def transform_hive_to_bigquery(spark: SparkSession, hive_database: str, hive_table: str, bq_table: str, bq_temp_gcs_bucket: str):
          """
          Reads a Hive table, adds an insertion_time column, and writes to a BigQuery table.
    
          Args:
              spark: The SparkSession object.
              hive_database: The name of the source Hive database.
              hive_table: The name of the source Hive table.
              bq_table: The destination BigQuery table (e.g., 'dataset.table').
              bq_temp_gcs_bucket: The Cloud Storage bucket for temporary BigQuery
              connector data.
          """
          # Read data from Hive table.
          input_df = spark.table(f'{hive_database}.{hive_table}')
    
          # Add the insertion time column.
          transformed_df = add_insertion_time_column(input_df)
    
          # Write the transformed data to BigQuery.
          transformed_df.write \
              .format('bigquery') \
              .option('table', bq_table) \
              .option('temporaryGcsBucket', bq_temp_gcs_bucket) \
              .mode('append') \
              .save()
    
          print(f"Data successfully written to BigQuery table: {bq_table}")
    
      if __name__ == '__main__':
          parser = argparse.ArgumentParser(description='PySpark Hive to BigQuery transformation script')
          parser.add_argument('--hive_database', required=True, help='Source Hive database')
          parser.add_argument('--hive_table', required=True, help='Source Hive table')
          parser.add_argument('--bq_table', required=True, help='Destination BigQuery table (dataset.table)')
          parser.add_argument('--bq_temp_gcs_bucket', required=True, help='Cloud Storage bucket for temporary BigQuery connector data')
          args = parser.parse_args()
    
          # Create a SparkSession with Hive support.
          spark = SparkSession.builder \
              .appName('Hive to BigQuery Transformation') \
              .enableHiveSupport() \
              .getOrCreate()
    
          transform_hive_to_bigquery(spark, args.hive_database, args.hive_table, args.bq_table, args.bq_temp_gcs_bucket)
    
          spark.stop()
    
  2. Upload the transform_hive_to_bigquery.py file to a Cloud Storage bucket (see Upload an object to a bucket for instructions).

  3. Run the following command to submit the job to a Dataproc cluster:

    gcloud dataproc jobs submit pyspark gs://BUCKET_NAME/transform_hive_to_bigquery.py \
        --cluster=CLUSTER_NAME
        --py-files=gs://BUCKET_NAME/data_transformer.py \
        --properties=spark.hadoop.hive.metastore.uris=METASTORE_URI \
        -- --hive_database=HIVE_DATABASE --hive_table=HIVE_TABLE --bq_table=BQ_DATASET.BQ_TABLE \
        --bq_temp_gcs_bucket=BUCKET_NAME
    

Migrate a table from Postgres to MySQL

This section shows how to prompt Gemini to generate a Java Spark application that migrates a table from a Postgres database to a MySQL database.

  1. Input the following prompt to Gemini CLI:

    Create a Spark `PostgresToMySql.java` job to migrate data from a table in a
    Postgres database to a table in MySQL, both accessible via JDBC. The JDBC
    URL strings are stored in Secret Manager. The URL string includes
    the username and password. Read and write data in parallel based on
    user-provided partitioning information. Write data in batches for efficiency.
    Use the addInsertionTimeColumn to add a column to the data before writing it
    to the MySQL destination table. Provide instructions to run this job on
    Dataproc on Compute Engine in migrateJdbcToJdbc.md. Provide a
    summary of the job in migration-README.md
    

    Gemini examines your directory and creates the Java application in the appropriate package along with a pom.xml file to build the application. The PostgresToMySql.java file will be similar to the following:

    package com.customer.app;
    
    import com.customer.util.DataframeUtils;
    import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
    import com.google.cloud.secretmanager.v1.SecretVersionName;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.IOException;
    
    public class PostgresToMySql {
    
        public static void main(String[] args) {
            if (args.length != 6) {
                System.err.println("Usage: PostgresToMySql <postgres.table> <mysql.table> <postgres.secret.id> <mysql.secret.id> <partition.column> <batch.size> <project.id>");
                System.exit(1);
            }
    
            String postgresTable = args[0];
            String mysqlTable = args[1];
            String postgresSecretId = args[2];
            String mysqlSecretId = args[3];
            String partitionColumn = args[4];
            String batchSize = args[5];
            String projectId = args[6];
    
            SparkSession spark = SparkSession.builder()
                    .appName("Postgres to MySQL JDBC Migration")
                    .getOrCreate();
    
            String postgresUrl = getSecret(projectId, postgresSecretId);
            String mysqlUrl = getSecret(projectId, mysqlSecretId);
    
            // Read data from Postgres in parallel.
            Dataset<Row> df = spark.read()
                    .format("jdbc")
                    .option("url", postgresUrl)
                    .option("dbtable", postgresTable)
                    .option("partitionColumn", partitionColumn)
                    .option("lowerBound", "2012-01-03")
                    .option("upperBound", "2025-10-31") // These bounds should be configured based on data
                    .option("numPartitions", "1")
                    .option("driver", "org.postgresql.Driver")
                    .load();
    
            // Add insertion time column.
            Dataset<Row> transformedDf = DataframeUtils.addInsertionTimeColumn(df);
    
            // Write data to MySQL in batches.
            transformedDf.write()
                    .format("jdbc")
                    .option("url", mysqlUrl)
                    .option("dbtable", mysqlTable)
                    .option("batchsize", Integer.parseInt(batchSize))
                    .option("driver", "com.mysql.cj.jdbc.Driver")
                    .mode(SaveMode.Overwrite)
                    .save();
    
            spark.stop();
        }
    
        private static String getSecret(String secretId) {
            try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
                SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest");
                return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8();
            } catch (IOException e) {
                throw new RuntimeException("Failed to retrieve secret", e);
            }
        }
    }
    

    Notes:

    • Set the lowerBound and upperBound based on your data.
    • If the generated pom.xml does not perform correctly, use this GitHub pom.xml.
  2. Upload the postgres-to-mysql-migration-VERSION.jar file to a Cloud Storage bucket (see Upload an object to a bucket for instructions).

  3. Run the following command to submit the job to your Dataproc cluster:

    gcloud dataproc jobs submit spark
        --cluster=CLUSTER_NAME
        --class=com.customer.app.PostgresToMySql \
        --jars=BUCKET/postgres-to-mysql-migration-VERSION.jar \
        -- POSTGRES_TABLE MYSQL-TABLE \
        POSTGRES_SECRET MYSQL-SECRET COLUMN BATCH_SIZE