טעינת נתונים ל-BigQuery

במאמר הזה נסביר איך להשתמש ב-Managed Service for Apache Spark כדי להריץ משימת Spark שמעלה נתונים מעובדים מ-Cloud Storage לטבלה ב-BigQuery. ‫Managed Service for Apache Spark מייעל את התהליך הזה על ידי ניהול סביבת Spark והמחברים הנדרשים.

לפני שמתחילים

  1. נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. יוצרים קטגוריה של Cloud Storage.
  9. יוצרים אשכול Managed Service for Apache Spark שמשתמש בגרסה 2.1 ואילך של אימג'ים.
  10. יצירת מערך נתונים ב-BigQuery

הכנת סקריפט PySpark

  1. יוצרים קובץ Python מקומי בשם load_analytics_data.py.

  2. מוסיפים את הקוד הבא לקובץ. הסקריפט הזה קורא נתונים מנתיב ב-Cloud Storage, מבצע צבירה וכותב את התוצאה ל-BigQuery.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import sum as _sum
    import sys
    
    # --- Configuration ---
    gcs_bucket = "BUCKET_NAME"
    bq_project = "PROJECT_ID"
    bq_dataset = "DATASET"
    bq_table = "corpus_word_counts"
    
    # --- Paths ---
    processed_path = f"gs://{gcs_bucket}/processed/processed_data"
    temp_gcs_path = f"{gcs_bucket}"
    
    # --- Spark Session Initialization ---
    spark = SparkSession.builder \
       .appName("Dataproc-BigQuery-Load") \
       .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \
       .getOrCreate()
    
    # --- Read Processed Data from Cloud Storage ---
    processed_df = spark.read.parquet(processed_path)
    
    # --- Final Aggregation for Analytics-Ready Stage ---
    analytics_df = processed_df.groupBy("corpus") \
       .agg(_sum("word_count_int").alias("total_word_count")) \
       .orderBy("corpus")
    
    print("Aggregated Analytics-Ready data:")
    analytics_df.show()
    
    # --- Write DataFrame to BigQuery ---
    print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}")
    
    analytics_df.write \
       .format("bigquery") \
       .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \
       .mode("append") \
       .save()
    
    print("Successfully wrote data to BigQuery.")
    
    # --- Stop Spark Session ---
    spark.stop()
    
  3. מחליפים את ה-placeholders הבאים:

    • BUCKET_NAME: שם הקטגוריה של Cloud Storage.
    • PROJECT_ID: מזהה הפרויקט ב- Google Cloud.
  4. מעלים את סקריפט load_analytics_data.py לקטגוריה של Cloud Storage.

שליחת העבודה של Managed Service for Apache Spark

שליחת סקריפט PySpark כמשימה לאשכול Managed Service for Apache Spark.

  1. בטרמינל, מריצים את הפקודה gcloud dataproc jobs submit pyspark:

    gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \
        --cluster=CLUSTER_NAME \
        --region=REGION
    
  2. מחליפים את ה-placeholders הבאים:

    • BUCKET_NAME: שם הקטגוריה של Cloud Storage.
    • CLUSTER_NAME: השם של אשכול Managed Service for Apache Spark.
    • REGION: האזור שבו נמצא האשכול.

    הפקודה שולחת משימת PySpark אל Managed Service for Apache Spark. תהליך העבודה של Managed Service for Apache Spark הוא כזה: הוא מאחזר את הסקריפט מהנתיב שצוין ב-Cloud Storage ומבצע אותו באשכול.

אימות טעינת הנתונים

  1. במסוף Google Cloud , נכנסים לדף Jobs של Managed Service for Apache Spark כדי לעקוב אחרי ההרצה של העבודה ולראות את יומני הפלט של מנהל ההתקן.

  2. אחרי שהעבודה מסתיימת, עוברים לדף BigQuery.

  3. בחלונית Explorer, מאתרים את הפרויקט ואת מערך הנתונים, ואז בוחרים את הטבלה corpus_word_counts.

  4. לוחצים על הכרטיסייה תצוגה מקדימה כדי לבדוק את הנתונים שנטענו.

איך פועל המחבר Spark-BigQuery

המחבר Spark-BigQuery מאפשר לאפליקציות Spark לקרוא מ-BigQuery ולכתוב ל-BigQuery. ב-Managed Service for Apache Spark באשכולות עם גרסאות אימג' 2.1 ומעלה, המחבר מותקן מראש.

המחבר משתמש בשיטת כתיבה עקיפה כדי לטעון נתונים. השיטה הזו משתמשת ב-Managed Service for Apache Spark וב-BigQuery כדי להשיג ביצועים גבוהים.

  1. משימת Spark באשכול Managed Service for Apache Spark כותבת את ה-DataFrame הסופי למיקום זמני בקטגוריה של Cloud Storage.

  2. אחרי שהכתיבה ל-Cloud Storage מסתיימת, המחבר מפעיל משימת טעינה של BigQuery.

  3. מערכת BigQuery קולטת את הנתונים ממיקום Cloud Storage הזמני לטבלת היעד.

הגישה העקיפה הזו מפרידה בין החישוב של Spark לבין ההטמעה של BigQuery. ההפרדה הזו מאפשרת לכל שירות לפעול ביעילות ומבטיחה תפוקה גבוהה לטעינות נתונים גדולות.

המאמרים הבאים