ジョブビルダー UI のビルダー フォームを使用してパイプラインを作成する

このチュートリアルでは、Apache Beam YAML 構文を使用して Dataflow データ処理パイプラインを作成する方法について説明します。 Google Cloud コンソールのジョブビルダー UI を使用して、ファイルからデータを読み取り、変換を適用して、結果を別のファイルに書き込む方法を学習します。このチュートリアルは、Apache Beam を初めて使用するデベロッパーや、YAML API を使用してパイプラインを構築する方法を学習したいデベロッパーを対象としています。

次の表は、 Google Cloud コンソールのパイプライン グラフと、対応する YAML 仕様を示しています。

Dataflow ジョブのグラフ。
pipeline:
  transforms:
    - name: ReadFromCsv
      type: ReadFromCsv
      config:
        path: 'gs://[...]/restaurant-data.csv'
    - name: MapToFields
      type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          Lowercase_menu_item: Item.lower()
          Total_price: Price + Tax
        append: true
    - name: WriteToJson
      type: WriteToJson
      input: MapToFields
      config:
        path: 'gs://[...]/restaurant-data_map-fields.json'

目標

このチュートリアルでは、次の方法について学びます。

  • データの読み取り、書き込み、変換を行う Beam YAML パイプラインを作成します。
  • コンテンツに基づいてデータをフィルタします。
  • Python 式を使用してフィールドをマッピングします。
  • SQL を使用してデータをクエリし、集計する。
  • Google Cloud コンソールのジョブビルダー UI のビルダー フォームを使用して、Beam YAML パイプラインを構築して実行します。

費用

このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

新規の Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

パイプラインを実行する前に、次の手順を完了します。

プロジェクトを設定する

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Cloud Storage バケットを作成する

パイプラインを実行する前に、Cloud Storage バケットを作成する必要があります。

  1. Cloud Storage バケットを作成します。

    1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

      [バケット] に移動

    2. [ 作成] をクリックします。
    3. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
      1. [バケットに名前を付ける] に、一意のバケット名を入力します。バケットの名前空間は世界中の誰でも閲覧可能なため、機密情報をバケット名に含めないようにしてください。
      2. [データの保存場所の選択] セクションで、次の操作を行います。
        1. ロケーション タイプを選択してください。
        2. [ロケーション タイプ] プルダウン メニューから、バケットのデータが永続的に保存されるロケーションを選択します。
        3. クロスバケット レプリケーションを設定するには、[Storage Transfer Service 経由でクロスバケット レプリケーションを追加する] を選択し、次の手順を実施します。

          クロスバケット レプリケーションを設定する

          1. [バケット] メニューで、バケットを選択します。
          2. [レプリケーション設定] セクションで、[構成] をクリックして、レプリケーション ジョブの設定を構成します。

            [クロスバケット レプリケーションを構成する] ペインが表示されます。

            • オブジェクト名の接頭辞で複製するオブジェクトをフィルタするには、オブジェクトを追加または除外する接頭辞を入力し、 [接頭辞を追加] をクリックします。
            • 複製されたオブジェクトのストレージ クラスを設定するには、[ストレージ クラス] メニューからストレージ クラスを選択します。この手順をスキップすると、複製されたオブジェクトはデフォルトで宛先バケットのストレージ クラスを使用します。
            • [完了] をクリックします。
      3. [データの保存場所を選択する] セクションで、次の操作を行います。
        1. [デフォルトのクラスを設定する] セクションで、[標準] を選択します。
        2. 階層名前空間を有効にするには、[データ量が多いワークロード向けにストレージを最適化] セクションで、[このバケットで階層的な名前空間を有効にする] を選択します。
      4. [オブジェクトへのアクセスを制御する方法を選択する] セクションで、バケットに公開アクセスの防止を適用するかどうかを選択し、バケットのオブジェクトに使用するアクセス制御方法を選択します。
      5. [オブジェクト データを保護する方法を選択する] セクションで、次の操作を行います。
        • [データ保護] で、バケットに設定するオプションを選択します。
          • 削除(復元可能)を有効にするには、[削除(復元可能)ポリシー(データ復元用)] チェックボックスをオンにして、削除後にオブジェクトを保持する日数を指定します。
          • オブジェクトのバージョニングを設定するには、[オブジェクトのバージョニング(バージョン管理用)] チェックボックスをオンにして、オブジェクトあたりの最大バージョン数と、非現行バージョンの有効期限が切れるまでの日数を指定します。
          • オブジェクトとバケットで保持ポリシーを有効にするには、[保持(コンプライアンス用)] チェックボックスをオンにして、次の操作を行います。
            • オブジェクト保持ロックを有効にするには、[オブジェクト保持を有効にする] チェックボックスをオンにします。
            • バケットロックを有効にするには、[バケット保持ポリシーを設定] チェックボックスをオンにして、保持期間の単位と期間を選択します。
        • オブジェクト データの暗号化方法を選択するには、[データ暗号化] セクション()を開き、データ暗号化方法を選択します。
    4. [作成] をクリックします。
  2. 以下をコピーしておきます。これらは以後のセクションで使用されます。

    • Cloud Storage バケット名。
    • 実際の Google Cloud のプロジェクト ID。

    ID を調べる方法については、プロジェクトの識別をご覧ください。

VPC ネットワーク

デフォルトでは、新しいプロジェクトはデフォルト ネットワークで開始されます。プロジェクトのデフォルト ネットワークが無効になっているか削除されている場合、Compute ネットワーク ユーザーのロールroles/compute.networkUser)が付与されているユーザー アカウントのプロジェクト内にネットワークが必要です。

データの読み取り、書き込み、変換

このセクションでは、Dataflow で Beam YAML 構文を使用して、次の方法でデータの読み取り、書き込み、フィルタリングを行う方法について説明します。

  • ユーザー インターフェース主導の開発により、 Google Cloud コンソールのジョブビルダー UI でジョブを構築して実行します。具体的には、ジョブビルダー UI のビルダー フォームを使用するため、YAML ファイルを手動で作成する必要はありません。
  • 一般公開されている Cloud Storage バケットに保存されている CSV ファイルのデータ。このデータには、次のようなレストランのメニューのサンプルデータが含まれています。

    restaurant-data.csv

    Menu item,Category,Price,Tax
    Classic Cheeseburger,Entree,9.99,0.7
    Margherita Pizza,Entree,14.50,1.02
    Grilled Salmon with Asparagus,Entree,21.99,1.54
    Chicken Caesar Salad,Salad,12.75,0.89
    Spaghetti Carbonara,Entree,16.25,1.14
    Beef Tacos (3),Entree,10.50,0.74
    Vegetable Stir-Fry,Entree,13.00,0.91
    Shrimp Scampi,Entree,19.75,1.38
    Chicken Pot Pie,Entree,15.50,1.09
    Steak Frites,Entree,28.00,1.96
    Lobster Mac and Cheese,Entree,25.50,1.79
    Pork Belly Bao Buns (2),Appetizer/Side,11.25,0.79
    Mushroom Risotto,Entree,17.50,1.23
    Fish and Chips,Entree,14.00,0.98
    Buffalo Wings (6),Appetizer/Side,9.50,0.67
    French Onion Soup,Appetizer/Side,7.00,0.49
    Tomato Soup with Grilled Cheese,Appetizer/Side,10.00,0.7
    Avocado Toast,Appetizer/Side,8.50,0.6
    Quesadilla with Chicken,Appetizer/Side,11.75,0.82
    Pad Thai,Entree,15.00,1.05
    Chicken Tikka Masala,Entree,18.50,1.3
    Burrito Bowl,Entree,13.50,0.95
    Sushi Combo (8 pieces),Entree,22.00,1.54
    Greek Salad,Salad,11.00,0.77
    Clam Chowder,Appetizer/Side,8.00,0.56
    New York Cheesecake,Dessert,6.50,0.46
    Chocolate Lava Cake,Dessert,7.50,0.53
    Apple Pie,Dessert,5.00,0.35
    Tiramisu,Dessert,8.00,0.56
    Crème brûlée,Dessert,7.00,0.49
    Iced Coffee,Beverage,3.50,0.25
    Lemonade,Beverage,3.00,0.21
    Orange Juice,Beverage,4.00,0.28
    Soda,Beverage,2.50,0.18
    Craft Beer,Beverage,6.00,0.42
    Glass of Wine,Beverage,9.00,0.63
    Margarita,Beverage,12.00,0.84
    Moscow Mule,Beverage,11.50,0.81
    Old Fashioned,Beverage,13.00,0.91
    Espresso,Beverage,3.00,0.21
    Cappuccino,Beverage,4.50,0.32
    Latte,Beverage,5.00,0.35
    Mocha,Beverage,5.50,0.39
    Hot Chocolate,Beverage,4.00,0.28
    Breakfast Burrito,Breakfast,10.50,0.74
    Pancakes (3),Breakfast,8.00,0.56
    Waffles,Breakfast,9.00,0.63
    Eggs Benedict,Breakfast,14.00,0.98
    Omelette,Breakfast,11.00,0.77
    Fruit Salad,Salad,7.50,0.53
    Yogurt Parfait,Breakfast,6.00,0.42

データの読み取りとフィルタ

次の例は、CSV ファイルからデータを読み取り、特定の情報でフィルタして、フィルタしたデータを JSON ファイルに書き込む方法を示しています。

このサンプルでは、特定の条件を満たすデータを選択的に保持できる Filter 変換を使用します。次の例では、データセットをフィルタして、Price20.00 以上のレコードのみを保持します。

CSV データを読み取り、フィルタリングされた JSON コンテンツを出力するには、次の操作を行います。

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. [ビルダーからジョブを作成] をクリックします。

  3. [ジョブビルダー] タブで、[ビルダーフォーム] を選択したままにします。

  4. [ジョブ名] フィールドに「filter-python-job」と入力します。

  5. [サービスの種類] で、[バッチ] を選択したままにします。

  6. [ソース] セクションで、次の操作を行います。

    1. [新しいソース] パネルの [ソース名] フィールドで、名前を ReadCsv に変更します。

    2. [ソースタイプ] リストで、[Cloud Storage からの CSV] を選択します。

    3. [CSV の場所] フィールドに、次のように入力します。

      cloud-samples-data/dataflow/tutorials/restaurant-data.csv
      
    4. [完了] をクリックします。

  7. [変換] セクションで、次の操作を行います。

    1. [変換を追加] をクリックします。

    2. [変換名] フィールドに「FilterPrice」と入力します。

    3. [変換タイプ] リストで、[フィルタ(Python)] を選択します。

    4. [Python フィルタ式] フィールドに「Price >= 20.00」と入力します。

    5. [変換の入力ステップ] リストで、ReadCsv を選択したままにします。

    6. [完了] をクリックします。

  8. [シンク] セクションで、次の操作を行います。

    1. [シンク名] フィールドで、名前を WriteJson に変更します。

    2. [シンクの種類] リストで、[Cloud Storage 上の JSON ファイル] を選択します。

    3. [JSON の場所] フィールドに、次のように入力します。

      BUCKET_NAME/output/restaurant-data_filtered.json
      

      BUCKET_NAME は Cloud Storage バケットの名前で置き換えます。

    4. [シンクの入力ステップ] リストで、FilterPrice を選択したままにします。

    5. [完了] をクリックします。

  9. [Dataflow オプション] セクションで、[ジョブを実行] をクリックします。

ジョブの出力を調べる

ジョブが完了したら、次の手順でパイプラインからの出力を確認します。

  1. Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. バケットのリストで、Cloud Storage バケットを作成するで作成したバケットの名前をクリックします。

  3. restaurant-data_filtered.json-00000-of-00001 というファイルをクリックします。

  4. [オブジェクトの詳細] ページで、認証済みの URL をクリックして、パイプライン出力を表示します。

出力は次のようになります。

{"Item":"Grilled Salmon with Asparagus","Category":"Entree","Price":21.99,"Tax":1.54}
{"Item":"Steak Frites","Category":"Entree","Price":28.0,"Tax":1.96}
{"Item":"Lobster Mac and Cheese","Category":"Entree","Price":25.5,"Tax":1.79}
{"Item":"Sushi Combo (8 pieces)","Category":"Entree","Price":22.0,"Tax":1.54}

Python を使用してフィールドをマッピングする

MapToFields 変換を使用すると、既存のフィールドに基づいて新しいフィールドを作成できます。次の例では、メニュー項目の小文字バージョンを作成し、合計価格を計算して、既存の値の後に値を追加します。

  1. Google Cloud コンソールの Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. [ビルダーからジョブを作成] をクリックします。

  3. [ジョブビルダー] タブで、[ビルダーフォーム] を選択したままにします。

  4. [ジョブ名] フィールドに「map-python-job」と入力します。

  5. [サービスの種類] で、[バッチ] を選択したままにします。

  6. [ソース] セクションで、次の操作を行います。

    1. [新しいソース] パネルの [ソース名] フィールドで、名前を ReadFromCsvPy に変更します。

    2. [ソースタイプ] リストで、[Cloud Storage からの CSV] を選択します。

    3. [CSV の場所] フィールドに、次のように入力します。

      cloud-samples-data/dataflow/tutorials/restaurant-data.csv
      
    4. [完了] をクリックします。

  7. [変換] セクションで、次の操作を行います。

    1. [変換を追加] をクリックします。

    2. [変換名] フィールドに「MapToFieldsPy」と入力します。

    3. [変換タイプ] リストで、[フィールドをマッピング(Python)] を選択します。

    4. [既存のフィールドを維持する] は選択したままにします。

    5. [マッピングされたフィールド] セクションで、[フィールドを追加] をクリックします。

    6. 開いた [新しいフィールド] パネルで、[フィールド名] に「Lowercase_menu_item」と入力します。

    7. [Python 式] フィールドに「Item.lower()」と入力します。

    8. [完了] をクリックします。

    9. 同じ [マッピングされたフィールド] セクションで、[フィールドを追加] をもう一度クリックします。

    10. 開いた [新しいフィールド] パネルで、[フィールド名] に「Total_price」と入力します。

    11. [Python 式] フィールドに「Price + Tax」と入力します。

    12. [新しいフィールド] パネルで、[完了] をクリックします。

    13. [新しい変換] パネルで、[完了] をクリックします。

  8. [シンク] セクションで、次の操作を行います。

    1. [シンク名] フィールドで、名前を WriteToJsonPy に変更します。

    2. [シンクの種類] リストで、[Cloud Storage 上の JSON ファイル] を選択します。

    3. [JSON の場所] フィールドに、次のように入力します。

      BUCKET_NAME/output/restaurant-data_map-fields.json
      

      BUCKET_NAME は Cloud Storage バケットの名前で置き換えます。

    4. [シンクの入力ステップ] リストで、MapToFieldsPy を選択したままにします。

    5. [完了] をクリックします。

  9. [Dataflow オプション] セクションで、[ジョブを実行] をクリックします。

ジョブの出力を調べる

ジョブが完了したら、次の手順でパイプラインからの出力を確認します。

  1. Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. バケットのリストで、Cloud Storage バケットを作成するで作成したバケットの名前をクリックします。

  3. restaurant-data_map-fields.json-00000-of-00001 というファイルをクリックします。

  4. [オブジェクトの詳細] ページで、認証済みの URL をクリックして、パイプライン出力を表示します。

出力は次のようになります。

{"Item":"Classic Cheeseburger","Category":"Entree","Price":9.99,"Tax":0.7,"Lowercase_menu_item":"classic cheeseburger","Total_price":10.69}
{"Item":"Margherita Pizza","Category":"Entree","Price":14.5,"Tax":1.02,"Lowercase_menu_item":"margherita pizza","Total_price":15.52}
{"Item":"Grilled Salmon with Asparagus","Category":"Entree","Price":21.99,"Tax":1.54,"Lowercase_menu_item":"grilled salmon with asparagus","Total_price":23.53}
{"Item":"Chicken Caesar Salad","Category":"Salad","Price":12.75,"Tax":0.89,"Lowercase_menu_item":"chicken caesar salad","Total_price":13.64}
{"Item":"Spaghetti Carbonara","Category":"Entree","Price":16.25,"Tax":1.14,"Lowercase_menu_item":"spaghetti carbonara","Total_price":17.39}
{"Item":"Beef Tacos (3)","Category":"Entree","Price":10.5,"Tax":0.74,"Lowercase_menu_item":"beef tacos (3)","Total_price":11.24}
[...]

SQL を使用してデータを変換する

Sql 変換を使用すると、データに対して SQL クエリを実行できます。次の例では、メニュー項目をカテゴリ(EntreeBeverageDessert など)でグループ化し、各カテゴリの項目数を表す列を追加します。

ジョブビルダー UI を使用してパイプラインを作成する手順は次のとおりです。

  1. Google Cloud コンソールの Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. [ビルダーからジョブを作成] をクリックします。

  3. [ジョブビルダー] タブの [ジョブ名] フィールドに「sql-transform-job」と入力します。

  4. [サービスの種類] で、[バッチ] を選択したままにします。

  5. [ソース] セクションで、次の操作を行います。

    1. [ソース名] フィールドで、名前を SqlTransformSource に変更します。

    2. [新しいソース] タブの [ソースタイプ] で、[Cloud Storage からの CSV] を選択します。[CSV の場所] フィールドが開きます。

    3. [CSV の場所] に以下のように入力します。

      cloud-samples-data/dataflow/tutorials/restaurant-data.csv
      
    4. [完了] をクリックします。

  6. [変換] セクションで、次の操作を行います。

    1. [変換を追加] をクリックします。

    2. [変換名] フィールドで、名前を SqlTransform に更新します。

    3. [変換タイプ] で [SQL 変換] を選択します。[SQL 変換] オプションが開きます。

    4. [SQL 式] フィールドに、次のように入力します。

      select Category, count(*) as category_count from PCOLLECTION group by Category
      
    5. [完了] をクリックします。

  7. [シンク] セクションで、次の操作を行います。

    1. [シンク名] に「SqlTransformSink」と入力します。

    2. [シンクタイプ] で [Cloud Storage 上の JSON ファイル] を選択します。[Cloud Storage 上の JSON ファイルに書き込む] オプションが開きます。

    3. [JSON の場所] に、次のように入力します。

      BUCKET_NAME/output/restaurant-data_transform-sql.json
      

      BUCKET_NAME は Cloud Storage バケットの名前で置き換えます。

    4. [完了] をクリックします。

  8. 省略可: このパイプライン用に生成された YAML 定義を表示します。

    1. [Job builder] タブの上部に移動します。

    2. [YAML エディタ] を選択します。YAML 定義が表示されます。次のように表示されます。

      生成された YAML 仕様

      pipeline:
        transforms:
          - name: SqlTransformSource
            type: ReadFromCsv
            config:
              path: 'gs://cloud-samples-data/dataflow/tutorials/restaurant-data.csv'
          - name: SqlTransform
            type: Sql
            config:
              query: >-
                select Category, count(*) as category_count from PCOLLECTION group by
                Category
            input:
              input0: SqlTransformSource
          - name: SqlTransformSink
            type: WriteToJson
            input: SqlTransform
            config:
              path: 'gs://BUCKET_NAME/output/restaurant-data_transform-sql.json'
  9. [Dataflow オプション] セクションで、[ジョブを実行] をクリックします。

ジョブの出力を調べる

ジョブが完了したら、次の手順でパイプラインからの出力を確認します。

  1. Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. バケットのリストで、Cloud Storage バケットを作成するで作成したバケットの名前をクリックします。

  3. restaurant-data_transform-sql.json-00000-of-00001 というファイルをクリックします。

  4. [オブジェクトの詳細] ページで、認証済みの URL をクリックして、パイプライン出力を表示します。

出力は次のようになります。

{"Category":"Entree","category_count":16}
{"Category":"Beverage","category_count":14}
{"Category":"Appetizer\/Side","category_count":7}
{"Category":"Dessert","category_count":5}
{"Category":"Breakfast","category_count":6}
{"Category":"Salad","category_count":3}

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除する

  1. Google Cloud コンソールで [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

個々のリソースの削除

後でプロジェクトを再利用する場合は、プロジェクトを残したまま、チュートリアル中に作成したリソースを削除します。

Dataflow パイプラインを停止する

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. 停止するジョブをクリックします。

    ジョブを停止するには、ジョブのステータスが「実行中」でなければなりません。

  3. ジョブの詳細ページで、[停止] をクリックします。

  4. [キャンセル] をクリックします。

  5. 選択を確定するには、[ジョブの停止] をクリックします。

Cloud Storage バケットを削除する

  1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。

次のステップ