BigQuery에 데이터 로드

이 문서에서는 Apache Spark용 관리형 서비스를 사용하여 Cloud Storage에서 처리된 데이터를 BigQuery 테이블로 로드하는 Spark 작업을 실행하는 방법을 보여줍니다. Apache Spark용 관리형 서비스는 Spark 환경과 필요한 커넥터를 관리하여 이 프로세스를 간소화합니다.

시작하기 전에

  1. 계정에 로그인하세요. Google Cloud 를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. Google Cloud신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Cloud Storage 버킷을 만듭니다.
  9. 이미지 버전 2.1 이상을 사용하는 Apache Spark용 관리형 서비스 클러스터를 만듭니다.
  10. BigQuery 데이터세트 만들기.

PySpark 스크립트 준비

  1. load_analytics_data.py라는 로컬 Python 파일을 만듭니다.

  2. 아래 코드를 파일에 추가합니다. 이 스크립트는 Cloud Storage 경로에서 데이터를 읽고 집계를 수행한 후 결과를 BigQuery에 씁니다.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import sum as _sum
    import sys
    
    # --- Configuration ---
    gcs_bucket = "BUCKET_NAME"
    bq_project = "PROJECT_ID"
    bq_dataset = "DATASET"
    bq_table = "corpus_word_counts"
    
    # --- Paths ---
    processed_path = f"gs://{gcs_bucket}/processed/processed_data"
    temp_gcs_path = f"{gcs_bucket}"
    
    # --- Spark Session Initialization ---
    spark = SparkSession.builder \
       .appName("Dataproc-BigQuery-Load") \
       .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \
       .getOrCreate()
    
    # --- Read Processed Data from Cloud Storage ---
    processed_df = spark.read.parquet(processed_path)
    
    # --- Final Aggregation for Analytics-Ready Stage ---
    analytics_df = processed_df.groupBy("corpus") \
       .agg(_sum("word_count_int").alias("total_word_count")) \
       .orderBy("corpus")
    
    print("Aggregated Analytics-Ready data:")
    analytics_df.show()
    
    # --- Write DataFrame to BigQuery ---
    print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}")
    
    analytics_df.write \
       .format("bigquery") \
       .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \
       .mode("append") \
       .save()
    
    print("Successfully wrote data to BigQuery.")
    
    # --- Stop Spark Session ---
    spark.stop()
    
  3. 다음 자리표시자를 바꿉니다.

    • BUCKET_NAME: Cloud Storage 버킷 이름.
    • PROJECT_ID: 프로젝트 ID Google Cloud
  4. Cloud Storage 버킷에 load_analytics_data.py 스크립트를 업로드합니다.

Apache Spark용 관리형 서비스 작업 제출

PySpark 스크립트를 Apache Spark용 관리형 서비스 클러스터에 작업으로 제출합니다.

  1. 터미널에서 gcloud dataproc jobs submit pyspark 명령어를 실행합니다.

    gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \
        --cluster=CLUSTER_NAME \
        --region=REGION
    
  2. 다음 자리표시자를 바꿉니다.

    • BUCKET_NAME: Cloud Storage 버킷 이름.
    • CLUSTER_NAME: Apache Spark용 관리형 서비스 클러스터의 이름.
    • REGION: 클러스터가 있는 리전입니다.

    이 명령어는 PySpark 작업을 Apache Spark용 관리형 서비스에 제출합니다. Apache Spark용 관리형 서비스 작업자는 지정된 Cloud Storage 경로에서 스크립트를 가져와 클러스터에서 실행합니다.

데이터 로드 확인

  1. 콘솔에서 Apache Spark용 관리형 서비스 작업 페이지로 이동하여 작업 실행을 모니터링하고 드라이버 출력 로그를 확인합니다. Google Cloud

  2. 작업이 완료되면 BigQuery 페이지로 이동합니다.

  3. 탐색기 패널에서 프로젝트와 데이터 세트를 찾은 후 corpus_word_counts 테이블을 선택합니다.

  4. 미리보기 탭을 클릭하여 로드된 데이터를 검사합니다.

Spark-BigQuery 커넥터 작동 방식

Spark-BigQuery 커넥터를 사용하면 Spark 애플리케이션이 BigQuery에서 읽고 BigQuery에 쓸 수 있습니다. 이미지 버전 2.1 이상이 있는 Apache Spark용 관리형 서비스 클러스터에는 커넥터가 사전 설치되어 있습니다.

커넥터는 간접 쓰기 메서드를 사용하여 데이터를 로드합니다. 이 메서드는 Apache Spark용 관리형 서비스와 BigQuery를 모두 활용하여 고성능을 제공합니다.

  1. Apache Spark용 관리형 서비스 클러스터의 Spark 작업은 최종 DataFrame을 Cloud Storage 버킷의 임시 위치에 씁니다.

  2. Cloud Storage에 쓰기가 완료되면 커넥터가 BigQuery 로드 작업을 트리거합니다.

  3. BigQuery는 Cloud Storage의 임시 위치에서 대상 테이블로 데이터를 수집합니다.

이 간접 접근 방식은 Spark 계산을 BigQuery 수집과 분리합니다. 이 분리를 통해 각 서비스가 효율적으로 작동하고 대규모 데이터 로드의 처리량을 높일 수 있습니다.

다음 단계