探索和提取数据

多阶段数据架构将数据划分为质量不断提升的层级:青铜、白银和黄金。此架构提供了一种结构化方法,用于将原始数据转换为干净可靠的资产。

本文档将介绍如何创建青铜层,您可以在其中放置来自外部源系统的数据。此层级提供单一可信来源,确保完整的数据沿袭,并支持重新处理流水线。

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. Create a service account:

    1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
    2. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    3. Select your project.
    4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    5. Click Create and continue.
    6. Grant the following roles to the service account: Storage Object Admin, Dataproc Worker.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    7. Click Continue.
    8. Click Done to finish creating the service account.

  6. 安装 Google Cloud CLI。

  7. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  8. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  9. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  10. Verify that billing is enabled for your Google Cloud project.

  11. Enable the Compute Engine, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  12. Create a service account:

    1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
    2. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    3. Select your project.
    4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    5. Click Create and continue.
    6. Grant the following roles to the service account: Storage Object Admin, Dataproc Worker.

      To grant a role, find the Select a role list, then select the role.

      To grant additional roles, click Add another role and add each additional role.

    7. Click Continue.
    8. Click Done to finish creating the service account.

  13. 安装 Google Cloud CLI。

  14. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  15. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init

将原始数据放入 Cloud Storage 中

将原始数据提取到 Cloud Storage 中的初始存储层。为了模拟数据提取,请将公共数据集从 BigQuery 导出到 Cloud Storage 存储桶。此过程模拟了外部系统将原始数据文件交付到着陆区的过程。

  1. 创建一个 Cloud Storage 存储桶作为数据湖。在 Managed Service for Apache Spark 集群所在的同一区域中创建,以优化性能。

    gsutil mb -l REGION gs://BUCKET_NAME/
    
  2. 以 CSV 格式将 bigquery-public-data:samples.shakespeare 表导出到您的 Cloud Storage 存储桶。

    bq extract \
        --destination_format CSV \
        "bigquery-public-data:samples.shakespeare" \
        "gs://BUCKET_NAME/raw/shakespeare/shakespeare.csv"
    

    此命令会启动一个导出作业,将表的内容写入指定的 Cloud Storage 路径。

使用 PySpark 读取原始数据

将数据放入 Cloud Storage 后,使用 Managed Service for Apache Spark 集群上的 PySpark 作业读取并探索数据。Apache Spark 通过 Cloud Storage 连接器与 Cloud Storage 进行交互,该连接器可让您使用 gs:// URI scheme 读取和写入数据。

  1. 使用以下 PySpark 脚本创建 SparkSession,将其配置为可访问 Cloud Storage,并将原始 CSV 文件读入 DataFrame。

    from pyspark.sql import SparkSession
    
    # --- Configuration ---
    gcs_bucket = "BUCKET_NAME"
    raw_path = f"gs://{gcs_bucket}/raw/shakespeare/shakespeare.csv"
    # For local development only.
    service_account_key_path = "/path/to/your/service-account-key.json"
    
    # --- Spark Session Initialization ---
    spark = SparkSession.builder \
      .appName("DataprocETL-RawIngestion") \
      .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
      .getOrCreate()
    
    # --- Authentication for local development ---
    # This step is not necessary when running on a Dataproc cluster
    # with the service account attached to the cluster VMs.
    spark.conf.set("google.cloud.auth.service.account.json.keyfile", service_account_key_path)
    
    # --- Read Raw Data from Cloud Storage ---
    # Read the raw CSV data into a DataFrame.
    # inferSchema=True scans the data to determine column types.
    raw_df = spark.read.csv(raw_path, header=True, inferSchema=True)
    
    # --- Initial Exploration ---
    print("Raw data count:", raw_df.count())
    print("Schema:")
    raw_df.printSchema()
    print("Sample of raw data:")
    raw_df.show(10, truncate=False)
    
    # --- Stop Spark Session ---
    spark.stop()
    
  2. 以 Managed Service for Apache Spark 作业的形式运行该脚本,以注入和探索原始数据。

数据注入模式

除了批量文件上传之外,青铜层还可以支持各种提取模式。Managed Service for Apache Spark 是一款功能多样的引擎,能够处理各种不同的提取场景。

流式注入

对于 IoT 传感器数据或应用日志等连续数据源,请使用流式传输流水线。您可以使用 Managed Service for Apache Spark 处理来自 Pub/Sub 或 Apache Kafka 等服务的大量数据流,并将数据放入 Bronze 层。

数据库注入

为了使数据湖与运营数据库保持同步,请使用变更数据捕获 (CDC)。Apache Spark Spark 作业的托管服务可以订阅接收更改事件的 Pub/Sub 主题,处理该流,并将更改应用到 Cloud Storage 中的原始数据存储区。

后续步骤