Pub/Sub Proto to BigQuery テンプレート

Pub/Sub proto to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに proto データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。

データを変換するための JavaScript ユーザー定義関数(UDF)を指定できます。UDF の実行中のエラーは、個別の Pub/Sub トピック、または BigQuery エラーと同じ未処理のトピックに送信できます。

このシナリオで Dataflow パイプラインを実行する前に、UDF を使用した Pub/Sub BigQuery サブスクリプションが要件を満たしているかどうかを検討してください。

パイプラインの要件

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Proto レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • 出力 Pub/Sub トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。
  • BigQuery テーブルが存在する場合は、createDisposition 値にかかわらず、proto データに一致するスキーマが必要です。

テンプレートのパラメータ

必須パラメータ

  • protoSchemaPath(Proto スキーマ ファイルの Cloud Storage パス): 自己完結型の記述子セットファイルの Cloud Storage パス(例: gs://MyBucket/schema.pb)。schema.pb を生成するには、proto をコンパイルする protoc コマンドに --descriptor_set_out=schema.pb を追加します。--include_imports フラグを使用すると、ファイルが自己完結型であることを保証できます。
  • fullMessageName(完全な Proto メッセージ名): 完全なメッセージ名(例: package.name.MessageName)。メッセージが別のメッセージ内にネストされている場合は、区切り文字「.」を使用してすべてのメッセージを含めます(例: package.name.OuterMessage.InnerMessage)。「package.name」は java_package ステートメントではなく、package ステートメントから取得する必要があります。
  • inputSubscription(Pub/Sub 入力サブスクリプション): 入力の読み取り元の Pub/Sub サブスクリプション。形式は「projects/<プロジェクト ID>/subscriptions/<サブスクリプション名>」です(例: projects/your-project-id/subscriptions/your-subscription-name)。
  • outputTableSpec(BigQuery 出力テーブル): 出力を書き込む BigQuery テーブルの場所。名前は <project>:<dataset>.<table_name> の形式にする必要があります。テーブルのスキーマは、入力オブジェクトと一致する必要があります。
  • outputTopic(出力 Pub/Sub トピック): データを公開するトピックの名前。形式は「projects/<プロジェクト ID>/topics/<トピック名>」です(例: projects/your-project-id/topics/your-topic-name)。

オプション パラメータ

  • preserveProtoFieldNames(Proto フィールド名を保持): proto フィールド名を保持するか、lowerCamelCase に変換するかを制御するフラグ。テーブルがすでに存在する場合は、テーブルのスキーマと一致するように選択する必要があります。それ以外の場合は、作成されるテーブルの列名が決定されます。true の場合、proto の snake_case が保持されます。false の場合、フィールドが lowerCamelCase に変換されます(デフォルト: false)。
  • bigQueryTableSchemaPath(BigQuery テーブルのスキーマパス): BigQuery スキーマ JSON ファイルの Cloud Storage パス。このパラメータを設定しない場合は、Proto スキーマからスキーマが推測されます(例: gs://MyBucket/bq_schema.json)。
  • udfOutputTopic(UDF の失敗の Pub/Sub 出力トピック): UDF の失敗を送信する出力トピック(省略可)。このオプションを設定しない場合、失敗は BigQuery の失敗と同じトピックに書き込まれます(例: projects/your-project-id/topics/your-topic-name)。
  • writeDisposition(BigQuery に使用する書き込み処理): BigQuery の WriteDisposition。たとえば、WRITE_APPEND、WRITE_EMPTY、WRITE_TRUNCATE などです。デフォルト値は WRITE_APPEND です。
  • createDisposition(BigQuery に使用する作成処理): BigQuery の CreateDisposition。たとえば、CREATE_IF_NEEDED、CREATE_NEVER などです。デフォルト値は CREATE_IF_NEEDED です。
  • javascriptTextTransformGcsPath(JavaScript UDF ソースの Cloud Storage パス): ユーザー定義関数を含む JavaScript コードの Cloud Storage パスパターン(例: gs://your-bucket/your-function.js)。
  • javascriptTextTransformFunctionName(UDF JavaScript 関数名): JavaScript ファイルから呼び出す関数の名前。英字、数字、アンダースコアのみを使用できます(例: transform、transform_udf1)。
  • javascriptTextTransformReloadIntervalMinutes(JavaScript UDF の自動再読み込み間隔(分単位)): ワーカーが JavaScript UDF の変更を確認してファイルを再読み込みする間隔を定義します。デフォルト値は 0 です。
  • useStorageWriteApi(BigQuery Storage Write API を使用): true の場合、パイプラインは BigQuery にデータを書き込むときに Storage Write API を使用します(https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api を参照)。デフォルト値は false です。Storage Write API を 1 回限りモードで使用する場合は、BigQuery Storage Write API のストリーム数と BigQuery Storage Write API のトリガー頻度(秒単位)のパラメータを設定する必要があります。Dataflow の 1 回以上モードを有効にするか、useStorageWriteApiAtLeastOnce パラメータを true に設定した場合は、ストリーム数やトリガー頻度を設定する必要はありません。
  • useStorageWriteApiAtLeastOnce(BigQuery Storage Write API で「1 回以上」のセマンティクスを使用): このパラメータは、「BigQuery Storage Write API の使用」が有効になっている場合にのみ有効になります。有効にすると、Storage Write API で「1 回以上」のセマンティクスが使用されます。有効にしないと、「正確に 1 回」のセマンティクスが使用されます。デフォルトは false です。
  • numStorageWriteApiStreams(BigQuery Storage Write API のストリーム数): ストリーム数は、BigQueryIO の Write 変換の並列処理を定義し、パイプラインで使用される Storage Write API のストリーム数におおむね一致します。推奨値については、https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api をご覧ください。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec(BigQuery Storage Write API のトリガー頻度(秒単位): トリガー頻度は、BigQuery でクエリするデータが表示されるまでの時間を決定します。推奨値については、https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api をご覧ください。

ユーザー定義関数

必要であれば、ユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: JSON 文字列としてシリアル化された Pub/Sub メッセージ データ フィールド。
  • 出力: BigQuery 宛先テーブルのスキーマに一致する JSON 文字列。
  • テンプレートを実行する

    コンソール

    1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
    2. [テンプレートからジョブを作成] に移動
    3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
    4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

      Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

    5. [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub Proto to BigQuery template] を選択します。
    6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
    7. [ジョブを実行] をクリックします。

    gcloud

    シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    次のように置き換えます。

    • JOB_NAME: 一意の任意のジョブ名
    • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • SCHEMA_PATH: Proto スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.pb
    • PROTO_MESSAGE_NAME: Proto メッセージ名(例: package.name.MessageName
    • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
    • BIGQUERY_TABLE: BigQuery 出力テーブル名
    • UNPROCESSED_TOPIC: 未処理のキューに使用する Pub/Sub トピック

    API

    REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
    • JOB_NAME: 一意の任意のジョブ名
    • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • SCHEMA_PATH: Proto スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.pb
    • PROTO_MESSAGE_NAME: Proto メッセージ名(例: package.name.MessageName
    • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
    • BIGQUERY_TABLE: BigQuery 出力テーブル名
    • UNPROCESSED_TOPIC: 未処理のキューに使用する Pub/Sub トピック

    次のステップ