Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。
このシナリオで Dataflow パイプラインを実行する前に、UDF を使用した Pub/Sub BigQuery サブスクリプションが要件を満たしているかどうかを検討してください。
パイプラインの要件
- 入力 Pub/Sub サブスクリプションが存在していること。
- Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
- 未処理の Pub/Sub トピックが存在していること。
- 出力 BigQuery データセットが存在していること。
テンプレートのパラメータ
必須パラメータ
- schemaPath: Avro スキーマ ファイルがある Cloud Storage の場所。例:
gs://path/to/my/schema.avsc - inputSubscription: 読み取り元の Pub/Sub 入力サブスクリプション。例:
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID> - outputTableSpec: 出力を書き込む BigQuery 出力テーブルの場所。たとえば、
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>です。指定されたcreateDispositionによっては、ユーザーが指定した Avro スキーマを使用して出力テーブルが自動的に作成される場合があります。 - outputTopic: 未処理レコードに使用する Pub/Sub トピック。例:
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
オプション パラメータ
- useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを
falseに設定します。このパラメータは、useStorageWriteApiがtrueの場合にのみ適用されます。デフォルト値はfalseです。 - writeDisposition: BigQuery WriteDisposition(https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)の値。例:
WRITE_APPEND、WRITE_EMPTY、WRITE_TRUNCATE。デフォルトはWRITE_APPENDです。 - createDisposition: BigQuery CreateDisposition(https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)。例:
CREATE_IF_NEEDED、CREATE_NEVER。デフォルトはCREATE_IF_NEEDEDです。 - useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は
falseです。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。 - numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。
useStorageWriteApiがtrueであり、useStorageWriteApiAtLeastOnceがfalseの場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。 - storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。
useStorageWriteApiがtrueであり、useStorageWriteApiAtLeastOnceがfalseの場合に、このパラメータを設定する必要があります。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub Avro to BigQuery template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
次のように置き換えます。
JOB_NAME: 一意の任意のジョブ名REGION_NAME: Dataflow ジョブをデプロイするリージョン(例:us-central1)VERSION: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc)SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE: BigQuery 出力テーブル名DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
次のように置き換えます。
JOB_NAME: 一意の任意のジョブ名LOCATION: Dataflow ジョブをデプロイするリージョン(例:us-central1)VERSION: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例:gs://MyBucket/file.avsc)SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名BIGQUERY_TABLE: BigQuery 出力テーブル名DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。