データを探索して抽出する
マルチステージ データ アーキテクチャでは、データがブロンズ、シルバー、ゴールドの 3 つの品質階層に整理されます。このアーキテクチャは、元データをクリーンで信頼性の高いアセットに変換するための構造化された方法を提供します。
このドキュメントでは、外部ソースシステムからデータを取得するブロンズ階層を作成する方法について説明します。この階層は、信頼できる唯一の情報源を提供し、完全なデータリネージを保証し、パイプラインの再処理を可能にします。
始める前に
- アカウントにログインします。 Google Cloud を初めて使用する場合は、 アカウントを作成して、実際のシナリオで Google プロダクトのパフォーマンスを評価してください。 Google Cloud新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Compute Engine, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
Create a service account:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles. -
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart. - Click Create and continue.
-
Grant the following roles to the service account: Storage Object Admin, Dataproc Worker.
To grant a role, find the Select a role list, then select the role.
To grant additional roles, click Add another role and add each additional role.
- Click Continue.
-
Click Done to finish creating the service account.
-
Ensure that you have the Create Service Accounts IAM role
(
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します:
gcloud init -
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Compute Engine, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
Create a service account:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles. -
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart. - Click Create and continue.
-
Grant the following roles to the service account: Storage Object Admin, Dataproc Worker.
To grant a role, find the Select a role list, then select the role.
To grant additional roles, click Add another role and add each additional role.
- Click Continue.
-
Click Done to finish creating the service account.
-
Ensure that you have the Create Service Accounts IAM role
(
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します:
gcloud init
Cloud Storage に元データを取得する
元データを Cloud Storage の初期ストレージ レイヤに取り込みます。データ取り込みをシミュレートするには、一般公開データセットを BigQuery から Cloud Storage バケットにエクスポートします。このプロセスは、外部システムが元データファイルをランディング ゾーンに配信する様子を模倣しています。
データレイクとして機能する Cloud Storage バケットを作成します。パフォーマンスを最適化するには、Managed Service for Apache Spark クラスタと同じリージョンに作成します。
gsutil mb -l REGION gs://BUCKET_NAME/bigquery-public-data:samples.shakespeareテーブルを CSV 形式で Cloud Storage バケットにエクスポートします。bq extract \ --destination_format CSV \ "bigquery-public-data:samples.shakespeare" \ "gs://BUCKET_NAME/raw/shakespeare/shakespeare.csv"このコマンドは、テーブルの内容を指定された Cloud Storage パスに書き込むエクスポート ジョブを開始します。
PySpark で元データを読み取る
Cloud Storage にデータを取得したら、Managed Service for Apache Spark クラスタで PySpark ジョブを使用してデータを読み取り、探索します。Apache Spark は Cloud Storage コネクタを介して Cloud Storage と連携します。これにより、gs:// URI スキームを使用してデータの読み取りと書き込みを行うことができます。
次の PySpark スクリプトを使用して、SparkSession を作成し、Cloud Storage アクセス用に構成して、元の CSV ファイルを DataFrame に読み込みます。
from pyspark.sql import SparkSession # --- Configuration --- gcs_bucket = "BUCKET_NAME" raw_path = f"gs://{gcs_bucket}/raw/shakespeare/shakespeare.csv" # For local development only. service_account_key_path = "/path/to/your/service-account-key.json" # --- Spark Session Initialization --- spark = SparkSession.builder \ .appName("DataprocETL-RawIngestion") \ .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \ .getOrCreate() # --- Authentication for local development --- # This step is not necessary when running on a Dataproc cluster # with the service account attached to the cluster VMs. spark.conf.set("google.cloud.auth.service.account.json.keyfile", service_account_key_path) # --- Read Raw Data from Cloud Storage --- # Read the raw CSV data into a DataFrame. # inferSchema=True scans the data to determine column types. raw_df = spark.read.csv(raw_path, header=True, inferSchema=True) # --- Initial Exploration --- print("Raw data count:", raw_df.count()) print("Schema:") raw_df.printSchema() print("Sample of raw data:") raw_df.show(10, truncate=False) # --- Stop Spark Session --- spark.stop()スクリプトを Managed Service for Apache Spark ジョブとして実行して、元データを取り込み、探索します。
データの取り込みパターン
ブロンズ階層は、バッチファイル アップロード以外のさまざまな取り込みパターンをサポートできます。Managed Service for Apache Spark は、多様な取り込みシナリオに対応できる汎用性の高いエンジンです。
ストリーミング取り込み
IoT センサーデータやアプリケーション ログなどの継続的なデータソースには、ストリーミング パイプラインを使用します。Managed Service for Apache Spark を使用すると、Pub/Sub や Apache Kafka などのサービスから大量のストリームを処理し、ブロンズ階層にデータを取得できます。
データベースの取り込み
データレイクを運用データベースと同期するには、変更データ キャプチャ(CDC)を使用します。Managed Service for Apache Spark Spark ジョブは、変更イベントを受信する Pub/Sub トピックをサブスクライブし、ストリームを処理して、Cloud Storage の元のデータストアに変更を適用できます。
次のステップ
- Managed Service for Apache Spark の詳細を確認する。
- Cloud Storage コネクタの詳細を確認する。
- BigQuery の一般公開データセットを確認する。