このチュートリアルでは、Cloud Storage からデータを読み取り、データ品質検査を実行して、Cloud Storage に書き込む再利用可能なパイプラインを構築する方法を説明します。
再利用可能なパイプラインには正規のパイプライン構造がありますが、各パイプライン ノードの構成は、HTTP サーバーによって提供される構成に基づいて変更できます。たとえば、静的パイプラインはデータを Cloud Storage から読み取り、変換を適用して、BigQuery 出力テーブルに書き込みます。パイプラインが読み取る Cloud Storage ファイルに基づいて変換と BigQuery 出力テーブルを変更する場合は、再利用可能なパイプラインを作成します。
目標
- Cloud Storage Argument Setter プラグインを使用して、パイプラインが実行ごとに異なる入力を読み取ることができるようにします。
- Cloud Storage Argument Setter プラグインを使用して、パイプラインが実行ごとに異なる品質検査を実行できるようにします。
- 実行ごとの出力データを Cloud Storage に書き込みます。
費用
このドキュメントでは、課金対象である次のコンポーネントを使用します。 Google Cloud
- Cloud Data Fusion
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
- アカウントにログインします Google Cloud を初めて使用する場合は、 アカウントを作成して、実際のシナリオで Google プロダクトのパフォーマンスを評価してください。 Google Cloud新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Cloud Data Fusion API、Cloud Storage API、BigQuery API、Dataproc API を有効にします。
API を有効にするために必要なロール
API を有効にするには、
serviceusage.services.enable権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。詳しくは、ロールを付与する方法をご覧ください。- Cloud Data Fusion インスタンスを作成します。
Cloud Data Fusion ウェブ インターフェースに移動する
Cloud Data Fusion を使用する際は、 Google Cloud コンソール と個別の Cloud Data Fusion ウェブ インターフェースの両方を使用します。コンソールでは、コンソール プロジェクトを作成し、Cloud Data Fusion インスタンスを作成および削除できます。 Google Cloud Google Cloud Cloud Data Fusion ウェブ インターフェースでは、 Pipeline Studio や Wrangler などのさまざまなページで Cloud Data Fusion の機能を使用できます。
コンソールで、[インスタンス] ページを開きます。 Google Cloud
インスタンスの [操作] 列で、[インスタンスの表示] リンクをクリックします。Cloud Data Fusion のウェブ インターフェースが新しいブラウザタブで開きます。
Cloud Storage Argument Setter プラグインをデプロイする
Cloud Data Fusion ウェブ インターフェースで、[Studio] ページに移動します。
[操作] メニューで [GCS Argument Setter] をクリックします。
Cloud Storage からの読み取り
- Cloud Data Fusion ウェブ インターフェースで、[Studio] ページに移動します。
- arrow_drop_down [Source] をクリックし、[Cloud Storage] を選択します。Cloud Storage ソースのノードがパイプラインに表示されます。
[Cloud Storage] ノードで、[プロパティ] をクリックします。
[Reference name] フィールドに名前を入力します。
[Path] フィールドに「
${input.path}」と入力します。このマクロは、異なるパイプラインの実行で使用する Cloud Storage 入力パスを制御します。右側の [Output Schema] パネルで、オフセット フィールド行のゴミ箱アイコンをクリックして、出力スキーマの [offset] フィールドを削除します。
[Validate] をクリックして、エラーに対処します。
[] をクリックして、[Properties] ダイアログを終了します。
データを変換する
- Cloud Data Fusion ウェブ インターフェースで、[Studio] ページのデータ パイプラインに移動します。
- [Transform] プルダウン メニュー arrow_drop_down で、[Wrangler] を選択します。
- Pipeline Studio キャンバスで、Cloud Storage ノードから Wrangler ノードに矢印をドラッグします。
- パイプラインの Wrangler ノードに移動し、[プロパティ] をクリックします。
- [Input field name] に「
body」と入力します。 - [Recipe] フィールドに「
${directives}」と入力します。このマクロは、異なるパイプラインの実行で使用する変換ロジックを制御します。
- [Validate] をクリックして、エラーに対処します。
- [] をクリックして、[Properties] ダイアログを終了します。
Cloud Storage への書き込み
- Cloud Data Fusion ウェブ インターフェースで、[Studio] ページのデータ パイプラインに移動します。
- [Sink] プルダウン メニュー arrow_drop_down で、[Cloud Storage] を選択します。
- Pipeline Studio キャンバスで、Wrangler ノードから先ほど追加した Cloud Storage ノードに矢印をドラッグします。
- パイプラインの Cloud Storage シンクノードに移動し、[Properties] をクリックします。
- [Reference name] フィールドに名前を入力します。
- [Path] フィールドに、パイプラインが出力ファイルを書き込むことができる、プロジェクトの Cloud Storage バケットのパスを入力します。Cloud Storage バケットがない場合は、1 つ作成します。
- [Validate] をクリックして、エラーに対処します。
- [] をクリックして、[Properties] ダイアログを終了します。
マクロ引数を設定する
- Cloud Data Fusion ウェブ インターフェースで、[Studio] ページのデータ パイプラインに移動します。
- In the arrow_drop_down [Conditions and Actions] drop-down menu, click [GCS Argument Setter].
- Pipeline Studio キャンバスで、Cloud Storage Argument Setter ノードから Cloud Storage ソースノードに矢印をドラッグします。
- パイプラインの Cloud Storage Argument Setter ノードに移動し、[プロパティ] をクリックします。
[URL] フィールドに、次のように入力します。
gs://reusable-pipeline-tutorial/args.jsonこの URL は、Cloud Storage 内の一般公開オブジェクトを示しており、次のコンテンツを含みます。
{ "arguments" : [ { "name": "input.path", "value": "gs://reusable-pipeline-tutorial/user-emails.txt" }, { "name": "directives", "value": "send-to-error !dq:isEmail(body)" } ] }2 つある引数のうち最初の引数は
input.pathの値です。パスgs://reusable-pipeline-tutorial/user-emails.txtは、Cloud Storage の一般公開オブジェクトで、これには次のテストデータが含まれています。alice@example.com bob@example.com craig@invalid@example.com2 番目の引数は
directivesの値です。値send-to-error !dq:isEmail(body)は、有効なメールアドレスではない行を除外するように Wrangler を設定します。たとえば、craig@invalid@example.comは除外されます。[検証] をクリックして、エラーがないことを確認します。
[] をクリックして、[Properties] ダイアログを終了します。
パイプラインをデプロイして実行する
[Pipeline Studio] ページの上部バーから [Name your pipeline] をクリックします。 パイプラインに名前を付け、[保存] をクリックします。
[デプロイ] をクリックします。
ランタイム引数マクロ(ランタイム)を表示する
input.pathおよびdirectives引数を開くには、arrow_drop_downの横にあるプルダウン実行をクリックします。値フィールドを空白のままにすると、パイプライン内の Cloud Storage Argument Sette ノードがランタイム中にこれらの引数の値を設定することを Cloud Data Fusion に通知します。
[実行] をクリックします。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
チュートリアルが終了したら、 Google Cloud で作成したリソースをクリーンアップして、割り当てを消費せず、今後料金が発生しないようにします。次のセクションで、このようなリソースを削除または無効にする方法を説明します。
Cloud Data Fusion インスタンスを削除する
Cloud Data Fusion インスタンスを削除する手順に従います。
プロジェクトを削除する
課金されないようにする最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
プロジェクトを削除するには:
- コンソールで [**リソースの管理**] ページに移動します。 Google Cloud
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、 [Shut down] をクリックしてプロジェクトを削除します。