Cargar datos en BigQuery

En este documento, se muestra cómo usar el Servicio administrado para Apache Spark para ejecutar un trabajo de Spark que carga datos procesados de Cloud Storage en una tabla de BigQuery. El Servicio administrado para Apache Spark optimiza este proceso mediante la administración del entorno de Spark y los conectores necesarios.

Antes de comenzar

  1. Accede a tu Google Cloud cuenta de. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Crea un bucket de Cloud Storage.
  9. Crea un clúster del Servicio administrado para Apache Spark que use la versión de imagen 2.1 o posterior.
  10. Crea un conjunto de datos de BigQuery.

Prepara la secuencia de comandos de PySpark

  1. Crea un archivo Python local llamado load_analytics_data.py.

  2. Agrega el siguiente código al archivo. Esta secuencia de comandos lee datos de una ruta de acceso de Cloud Storage, realiza una agregación y escribe el resultado en BigQuery.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import sum as _sum
    import sys
    
    # --- Configuration ---
    gcs_bucket = "BUCKET_NAME"
    bq_project = "PROJECT_ID"
    bq_dataset = "DATASET"
    bq_table = "corpus_word_counts"
    
    # --- Paths ---
    processed_path = f"gs://{gcs_bucket}/processed/processed_data"
    temp_gcs_path = f"{gcs_bucket}"
    
    # --- Spark Session Initialization ---
    spark = SparkSession.builder \
       .appName("Dataproc-BigQuery-Load") \
       .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \
       .getOrCreate()
    
    # --- Read Processed Data from Cloud Storage ---
    processed_df = spark.read.parquet(processed_path)
    
    # --- Final Aggregation for Analytics-Ready Stage ---
    analytics_df = processed_df.groupBy("corpus") \
       .agg(_sum("word_count_int").alias("total_word_count")) \
       .orderBy("corpus")
    
    print("Aggregated Analytics-Ready data:")
    analytics_df.show()
    
    # --- Write DataFrame to BigQuery ---
    print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}")
    
    analytics_df.write \
       .format("bigquery") \
       .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \
       .mode("append") \
       .save()
    
    print("Successfully wrote data to BigQuery.")
    
    # --- Stop Spark Session ---
    spark.stop()
    
  3. Reemplaza los marcadores de posición que se indican más abajo:

    • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
    • PROJECT_ID: Es el ID de tu Google Cloud proyecto.
  4. Sube la secuencia de comandos load_analytics_data.py a tu bucket de Cloud Storage.

Envía el trabajo del Servicio administrado para Apache Spark

Envía la secuencia de comandos de PySpark como un trabajo a tu clúster del Servicio administrado para Apache Spark.

  1. En una terminal, ejecuta el comando gcloud dataproc jobs submit pyspark:

    gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \
        --cluster=CLUSTER_NAME \
        --region=REGION
    
  2. Reemplaza los marcadores de posición que se indican más abajo:

    • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
    • CLUSTER_NAME: Es el nombre de tu clúster del Servicio administrado para Apache Spark.
    • REGION: Es la región en la que se encuentra el clúster.

    El comando envía un trabajo de PySpark al Servicio administrado para Apache Spark. Los trabajadores del Servicio administrado para Apache Spark recuperan la secuencia de comandos de la ruta de acceso especificada de Cloud Storage y la ejecutan en el clúster.

Verifica la carga de datos

  1. En la Google Cloud consola de, ve a la página Trabajos del Servicio administrado para Apache Spark para supervisar la ejecución del trabajo y ver los registros de salida del controlador.

  2. Cuando se complete el trabajo, ve a la página BigQuery.

  3. En el panel Explorador, busca tu proyecto y conjunto de datos y, luego, selecciona la tabla corpus_word_counts.

  4. Haz clic en la pestaña Vista previa para inspeccionar los datos cargados.

Cómo funciona el conector de Spark-BigQuery

El conector de Spark-BigQuery permite que las aplicaciones de Spark lean y escriban en BigQuery. En los clústeres del Servicio administrado para Apache Spark con versiones de imagen 2.1 o posteriores, el conector está preinstalado.

El conector usa un método de escritura indirecta para cargar datos. Este método aprovecha el Servicio administrado para Apache Spark y BigQuery para obtener un alto rendimiento.

  1. El trabajo de Spark en el clúster del Servicio administrado para Apache Spark escribe el DataFrame final en una ubicación temporal en un bucket de Cloud Storage.

  2. Una vez que se completa la escritura en Cloud Storage, el conector activa un trabajo de carga de BigQuery.

  3. BigQuery transfiere los datos de la ubicación temporal de Cloud Storage a la tabla de destino.

Este enfoque indirecto desacopla el cálculo de Spark de la transferencia de BigQuery. Este desacoplamiento permite que cada servicio opere de manera eficiente y garantiza un alto rendimiento para cargas de datos grandes.

¿Qué sigue?