このドキュメントでは、BigQuery サブスクリプションの概要、ワークフロー、関連するプロパティについて説明します。
BigQuery サブスクリプションは、メッセージの受信時に既存の BigQuery テーブルにメッセージを書き込むエクスポート サブスクリプションの一種です。別のサブスクライバー クライアントを構成する必要はありません。 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API を使用して、BigQuery サブスクリプションの作成、更新、一覧表示、接続解除、削除を行います。
BigQuery サブスクリプション タイプがない場合は、pull サブスクリプションまたは push サブスクリプションと、メッセージを読み取り、BigQuery テーブルに書き込むサブスクライバー(Dataflow など)が必要です。メッセージが BigQuery テーブルに保存される前に追加の処理が不要な場合は、Dataflow ジョブを実行するオーバーヘッドは必要ありません。代わりに BigQuery サブスクリプションを使用できます。
ただし、データを BigQuery テーブルに保存する前にデータ変換が必要な Pub/Sub システムでは、Dataflow パイプラインの使用をおすすめします。Dataflow を使用して変換で Pub/Sub から BigQuery にデータをストリーミングする方法については、Pub/Sub から BigQuery へのストリーミングをご覧ください。
Dataflow の Pub/Sub subscription to BigQuery テンプレートでは、デフォルトで 1 回限りの配信が適用されます。通常、これは Dataflow パイプライン内の重複除去メカニズムによって実現されます。ただし、BigQuery サブスクリプションでは少なくとも 1 回 の配信のみがサポートされます。ユースケースで正確な重複除去が重要な場合は、BigQuery のダウンストリーム プロセスで重複を処理することを検討してください。
始める前に
このドキュメントを読む前に、次の内容をよく理解しておいてください。
Pub/Sub の仕組みとさまざまな Pub/Sub の用語。
Pub/Sub でサポートされているさまざまなサブスクリプションの種類と、BigQuery サブスクリプションを使用する理由。
BigQuery の仕組みと、BigQuery テーブルを構成して管理する方法。
BigQuery サブスクリプション ワークフロー
次の図は、BigQuery サブスクリプションと BigQuery の間のワークフローを示しています。
図 1 を参照したワークフローの概要を次に示します。
- Pub/Sub は BigQuery ストレージ書き込み API を使用して、データを BigQuery テーブルに送信します。
- メッセージはバッチで BigQuery テーブルに送信されます。
- 書き込みオペレーションが正常に完了すると、API は OK レスポンスを返します。
- 書き込みオペレーション中にエラーが発生した場合、Pub/Sub メッセージ自体に否定応答が行われます。 その後、メッセージが再送信されます。メッセージが十分な回数失敗し、サブスクリプションにデッドレター トピックが構成されている場合、メッセージはデッドレター トピックに移動します。
BigQuery のサブスクリプションのプロパティ
BigQuery サブスクリプションに対して構成するプロパティによって、Pub/Sub がメッセージを書き込む BigQuery テーブルと、そのテーブルのスキーマのタイプが決まります。
詳細については、BigQuery のプロパティをご覧ください。
スキーマの互換性
このセクションは、BigQuery サブスクリプションを作成するときに [トピック スキーマを使用する] オプションを選択した場合にのみ適用できます。
Pub/Sub と BigQuery では、スキーマの定義方法が異なります。Pub/Sub スキーマは Apache Avro または Protocol Buffer の形式で定義されますが、BigQuery スキーマはさまざまな形式を使用して定義されます。
Pub/Sub トピックと BigQuery テーブルとの間のスキーマ互換性に関する重要な情報を次に示します。
形式が正しくないフィールドを含むメッセージは、BigQuery に書き込まれません。
BigQuery のスキーマで、
INT、SMALLINT、INTEGER、BIGINT、TINYINT、BYTEINTはINTEGERのエイリアス、DECIMALはNUMERICのエイリアス、BIGDECIMALはBIGNUMERICのエイリアスです。トピック スキーマの型が
stringで、BigQuery テーブルの型がJSON、TIMESTAMP、DATETIME、DATE、TIME、NUMERIC、またはBIGNUMERICの場合、Pub/Sub メッセージのフィールドは、BigQuery データ型に指定された形式に従う必要があります。Avro の論理型の一部がサポートされています。サポートされているものを次の表に示します。表に記載されていない論理型は、その論理型によってアノテーションを付けられる同等の Avro 型と一致します。詳細については、Avro の仕様をご覧ください。
以下に、さまざまなスキーマ形式の BigQuery データ型へのマッピングのコレクションを示します。
Avro 型
| Avro 型 | BigQuery のデータ型 |
null |
Any NULLABLE |
boolean |
BOOLEAN |
int |
INTEGER、NUMERIC、またはBIGNUMERIC |
long |
INTEGER、NUMERIC、またはBIGNUMERIC |
float |
FLOAT64、NUMERIC、またはBIGNUMERIC |
double |
FLOAT64、NUMERIC、またはBIGNUMERIC |
bytes |
BYTES、NUMERIC、またはBIGNUMERIC |
string |
STRING、JSON、TIMESTAMP、DATETIME、DATE、TIME、NUMERIC、またはBIGNUMERIC |
record |
RECORD/STRUCT |
array/Type |
REPEATED Type |
値タイプ ValueType を持つ map
|
REPEATED STRUCT <key STRING, value
ValueType> |
null と Type の 2 タイプがある union |
NULLABLE Type |
その他の union |
マッピング不可 |
fixed |
BYTES、NUMERIC、またはBIGNUMERIC |
enum |
INTEGER |
Avro の論理型
| Avro の論理型 | BigQuery のデータ型 |
timestamp-micros |
TIMESTAMP |
date |
DATE |
time-micros |
TIME |
duration |
INTERVAL |
decimal |
NUMERIC または BIGNUMERIC |
プロトコル バッファタイプ
| プロトコル バッファ型 | BigQuery のデータ型 |
double |
FLOAT64、NUMERIC、またはBIGNUMERIC |
float |
FLOAT64、NUMERIC、またはBIGNUMERIC |
int32 |
INTEGER、NUMERIC、BIGNUMERIC または DATE |
int64 |
INTEGER、NUMERIC、BIGNUMERIC、DATE、DATETIME または TIMESTAMP |
uint32 |
INTEGER、NUMERIC、BIGNUMERIC または DATE |
uint64 |
NUMERIC または BIGNUMERIC |
sint32 |
INTEGER、NUMERIC、またはBIGNUMERIC |
sint64 |
INTEGER、NUMERIC、BIGNUMERIC、DATE、DATETIME または TIMESTAMP |
fixed32 |
INTEGER、NUMERIC、BIGNUMERIC または DATE |
fixed64 |
NUMERIC または BIGNUMERIC |
sfixed32 |
INTEGER、NUMERIC、BIGNUMERIC または DATE |
sfixed64 |
INTEGER、NUMERIC、BIGNUMERIC、DATE、DATETIME または TIMESTAMP |
bool |
BOOLEAN |
string |
STRING、JSON、TIMESTAMP、DATETIME、DATE、TIME、NUMERIC、またはBIGNUMERIC |
bytes |
BYTES、NUMERIC、またはBIGNUMERIC |
enum |
INTEGER |
message |
RECORD/STRUCT |
oneof |
マッピング不可 |
map<KeyType, ValueType> |
REPEATED RECORD<key KeyType, value
ValueType> |
enum |
INTEGER |
repeated/array of Type |
REPEATED Type |
日付と時刻の整数表現
整数から日付または時刻のいずれかの型にマッピングする場合、数値は正しい値を表す必要があります。BigQuery のデータ型と、それらを表す整数とのマッピングは次のとおりです。
| BigQuery のデータ型 | 整数の表現 |
DATE |
Unix エポックである1970 年 1 月 1 日からの日数 |
DATETIME |
CivilTimeEncoder を使用して常用時として表された日時(マイクロ秒単位) |
TIME |
CivilTimeEncoder を使用して常用時として表された時間(マイクロ秒単位) |
TIMESTAMP |
Unix エポックである1970 年 1 月 1 日 00:00:00 UTC からのマイクロ秒数 |
BigQuery の変更データ キャプチャ
BigQuery サブスクリプションでは、use_topic_schema または use_table_schema がサブスクリプション プロパティで true に設定されてる場合、変更データ キャプチャ(CDC)の更新がサポートされます。use_topic_schema でこの機能を使用するには、次のフィールドを使用してトピックのスキーマを設定します。
_CHANGE_TYPE(必須):UPSERTまたはDELETEに設定されたstringフィールド。BigQuery テーブルに書き込まれた Pub/Sub メッセージの
_CHANGE_TYPEがUPSERTに設定されている場合、BigQuery は同じキーが存在する場合は対象のキーで行を更新し、同じキーが存在しない場合は新しい行を挿入します。BigQuery テーブルに書き込まれた Pub/Sub メッセージの
_CHANGE_TYPEがDELETEに設定されている場合、BigQuery は同じがキーが存在する場合は対象のキーでテーブル内の行を削除します。
_CHANGE_SEQUENCE_NUMBER(省略可): BigQuery テーブルに対する更新と削除を確実に行うために設定されたstringフィールドが順番に処理されます。同じ行キーのメッセージには、_CHANGE_SEQUENCE_NUMBERの単調に増加する値が含まれている必要があります。行に対して処理された最大のシーケンス番号よりも小さいシーケンス番号を持つメッセージは、BigQuery テーブルの行に影響しません。シーケンス番号は_CHANGE_SEQUENCE_NUMBER形式にする必要があります。
use_table_schema でこの機能を使用するには、JSON メッセージに上記のフィールドを含めます。
CDC の料金については、CDC の料金をご覧ください。
BigQuery 内の Apache Iceberg 用 BigLake テーブル
BigQuery サブスクリプションは、追加の変更なしで BigQuery の Apache Iceberg 用 BigLake テーブルで使用できます。
BigQuery 内の Apache Iceberg 用 BigLake テーブルは、 Google Cloudでオープン形式のレイクハウスを構築するための基盤になります。これらのテーブルは、標準(組み込み)の BigQuery テーブルと同じフルマネージド エクスペリエンスを提供しますが、Parquet を使用してお客様所有のストレージ バケットにデータを保存することで、オープン形式の Iceberg テーブル形式と相互運用が可能です。
BigQuery で Apache Iceberg 用 BigLake テーブルを作成する方法については、Iceberg テーブルを作成するをご覧ください。
メッセージ エラーの処理
Pub/Sub メッセージを BigQuery に書き込めない場合、メッセージの確認応答はできません。このような配信不能メッセージを転送するには、BigQuery サブスクリプションでデッドレター トピックを構成します。デッドレター トピックに転送された Pub/Sub メッセージには、Pub/Sub メッセージを BigQuery に書き込めなかった理由を示す属性 CloudPubSubDeadLetterSourceDeliveryErrorMessage が含まれています。
Pub/Sub が BigQuery にメッセージを書き込むことができない場合、Pub/Sub は push バックオフ動作と同様の方法でメッセージの配信を解除します。ただし、サブスクリプションにデッドレター トピックがアタッチされているときに、スキーマの互換性エラーが原因でメッセージ エラーが発生した場合、Pub/Sub は配信を解除しません。
割り当てと上限
リージョンごとの BigQuery サブスクライバーのスループットには割り当て上限があります。詳細については、Pub/Sub の割り当てと上限をご覧ください。
BigQuery サブスクリプションは、BigQuery Storage Write API を使用してデータを書き込みます。Storage Write API の割り当てと制限については、BigQuery Storage Write API リクエストをご覧ください。BigQuery サブスクリプションは、Storage Write API のスループット割り当てのみを使用します。このインスタンスでは、他の Storage Write API の割り当てに関する考慮事項は無視できます。
料金
BigQuery サブスクリプションの料金については、Pub/Sub の料金ページをご覧ください。
次のステップ
BigQuery サブスクリプションなどのサブスクリプションを作成します。
BigQuery サブスクリプションのトラブルシューティングを行う。
BigQuery について確認する。
BigQuery サブスクリプションを含む Pub/Sub の料金を確認する。
gcloudCLI コマンドを使用して、サブスクリプションを作成または変更する。REST API を使用してサブスクリプションを作成または変更する。