このドキュメントでは、Dataflow から BigQuery にデータを書き込む方法について説明します。
概要
ほとんどのユースケースでは、マネージド I/O を使用して BigQuery に書き込むことを検討してください。マネージド I/O には、自動アップグレードや一貫した構成 API などの機能があります。BigQuery に書き込む場合、マネージド I/O はバッチジョブまたはストリーミング ジョブに最適な書き込み方法を自動的に選択します。
より高度なパフォーマンスの調整が必要な場合は、BigQueryIO コネクタの使用を検討してください。詳細については、このドキュメントの BigQueryIO コネクタを使用するをご覧ください。
パフォーマンス
次の表に、さまざまなワークロードのパフォーマンス指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2 ワーカーで実行されています。Runner v2 は使用されていません。
| 1 億件のレコード | 1 KB | 1 列 | スループット(バイト) | スループット(要素) |
|---|---|---|
| ストレージ書き込み | 55 MBps | 54,000 要素/秒 |
| Avro の読み込み | 78 MBps | 77,000 要素/秒 |
| Json の読み込み | 54 MBps | 53,000 要素/秒 |
これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。
BigQueryIO コネクタを使用する
BigQuery I/O コネクタでは、BigQuery への書き込みで次のメソッドがサポートされています。
STORAGE_WRITE_API。このモードでは、コネクタは BigQuery Storage Write API を使用して BigQuery ストレージに直接書き込みを行います。Storage Write API は、ストリーミングの取り込みとバッチ読み込みを 1 つの高性能 API にまとめたものです。このモードでは exactly-once セマンティクスが保証されます。STORAGE_API_AT_LEAST_ONCE。このモードでは Storage Write API も使用しますが、at-least-once セマンティクスが提供されます。このモードを使用すると、ほとんどのパイプラインでレイテンシが短縮されます。ただし、重複書き込みが発生する可能性があります。FILE_LOADS。このモードでは、入力データを Cloud Storage のステージング ファイルに書き込みます。その後、BigQuery の読み込みジョブを実行して、データを BigQuery に読み込みます。このモードは、バッチ パイプラインで最もよく見られる制限付きPCollectionsのデフォルトです。STREAMING_INSERTS。このモードでは、コネクタは以前のストリーミング API を使用します。このモードは、制限なしPCollectionsのデフォルトですが、新しいプロジェクトでは推奨されません。
書き込み方法を選択する際は、次の点を考慮してください。
- ストリーミング ジョブの場合は、
STORAGE_WRITE_APIまたはSTORAGE_API_AT_LEAST_ONCEの使用を検討してください。これらのモードでは、中間ステージング ファイルを使用せずに、BigQuery ストレージに直接書き込みます。 - 1 回以上のストリーミング モードを使用してパイプラインを実行する場合は、書き込みモードを
STORAGE_API_AT_LEAST_ONCEに設定します。この設定はより効率的で、1 回以上のストリーミング モードのセマンティクスと一致します。 - ファイルの読み込みと Storage Write API では割り当てと上限が異なります。
- 読み込みジョブは、共有 BigQuery スロットプールまたは予約済みスロットのいずれかを使用します。予約済みスロットを使用するには、
PIPELINEタイプの予約割り当てのプロジェクトで読み込みジョブを実行します。共有 BigQuery スロットプールを使用する場合、読み込みジョブは無料です。ただし、BigQuery は共有プールで使用可能な容量について保証しません。詳細については、予約の概要をご覧ください。
並列処理
ストリーミング パイプラインでの
FILE_LOADSとSTORAGE_WRITE_APIの場合、コネクタはデータを多数のファイルまたはストリームにシャーディングします。通常は、withAutoShardingを呼び出して自動シャーディングを有効にすることをおすすめします。バッチ パイプラインの
FILE_LOADSの場合、コネクタはパーティション分割ファイルにデータを書き込み、その後、BigQuery に並列に読み込まれます。バッチ パイプライン内の
STORAGE_WRITE_APIの場合、各ワーカーはシャードの合計数によって BigQuery に書き込む 1 つ以上のストリームを作成します。STORAGE_API_AT_LEAST_ONCEの場合、デフォルトの書き込みストリームが 1 つあります。このストリームには複数のワーカーが追加されます。
ベスト プラクティス
Storage Write API には割り当て上限があります。コネクタは、ほとんどのパイプラインでこれらの上限を処理します。ただし、一部のシナリオでは使用可能な Storage Write API ストリームが使い果たされる可能性があります。たとえば、特に負荷の高いワークロードが長時間実行されるジョブでは、多数の宛先を持つ自動シャーディングと自動スケーリングを使用するパイプラインで、この問題が発生することがあります。この問題が発生した場合は、
STORAGE_WRITE_API_AT_LEAST_ONCEの使用を検討してください。これにより、問題を回避できます。Google Cloud Platform の指標を使用して、Storage Write API の割り当て使用量をモニタリングします。
ファイル読み込みを使用する場合、Avro は通常 JSON よりパフォーマンスが優れています。Avro を使用するには、
withAvroFormatFunctionを呼び出します。デフォルトでは、読み込みジョブは Dataflow ジョブと同じプロジェクトで実行されます。別のプロジェクトを指定するには、
withLoadJobProjectIdを呼び出します。Java SDK を使用する場合は、BigQuery テーブルのスキーマを表すクラスを作成することを検討してください。次に、パイプラインで
useBeamSchemaを呼び出して、Apache BeamRow型と BigQuery のTableRow型を自動的に変換します。スキーマクラスの例については、ExampleModel.javaをご覧ください。何千ものフィールドを含む複雑なスキーマを持つテーブルを読み込む場合は、
withMaxBytesPerPartitionを呼び出して、読み込みジョブごとの最大サイズを小さく設定することを検討してください。デフォルトでは、
BigQueryIOはほとんどのパイプラインにとって適切な Storage Write API 設定を使用します。ただし、パフォーマンスの問題が発生した場合は、パイプライン オプションを設定してこれらの設定を調整できます。詳細については、Apache Beam のドキュメントの Storage Write API を調整するをご覧ください。
ストリーミング パイプライン
ストリーミング パイプラインには、次の推奨事項が適用されます。
ストリーミング パイプラインには、Storage Write API(
STORAGE_WRITE_APIまたはSTORAGE_API_AT_LEAST_ONCE)を使用することをおすすめします。ストリーミング パイプラインはファイル読み込みを使用できますが、次のような欠点があります。
可能であれば、
STORAGE_WRITE_API_AT_LEAST_ONCEの使用を検討してください。その結果、重複するレコードが BigQuery に書き込まれる可能性がありますが、STORAGE_WRITE_APIよりも低コストで、スケーラビリティも高くなります。通常は
STREAMING_INSERTSは使用しないでください。ストリーミング挿入には Storage Write API よりもコストがかかり、パフォーマンスも低くなります。データ シャーディングにより、ストリーミング パイプラインのパフォーマンスが向上します。ほとんどのパイプラインで、自動シャーディングが出発点として適しています。ただし、次のようにシャーディングを調整できます。
STORAGE_WRITE_APIの場合は、withNumStorageWriteApiStreamsを呼び出して、書き込みストリームの数を設定します。FILE_LOADSの場合は、withNumFileShardsを呼び出してファイル シャードの数を設定します。
ストリーミング挿入を使用する場合は、
retryTransientErrorsを再試行ポリシーとして設定することをおすすめします。
バッチ パイプライン
バッチ パイプラインには、次の推奨事項が適用されます。
ほとんどの大規模なバッチ パイプラインでは、まず
FILE_LOADSを試すことをおすすめします。バッチ パイプラインはSTORAGE_WRITE_APIを使用できますが、大規模(1,000 個以上の vCPU)な場合や同時実行のパイプラインが実行されている場合は、割り当ての上限を超える可能性があります。Apache Beam は、バッチSTORAGE_WRITE_APIジョブの書き込みストリームの最大数をスロットリングしないため、ジョブは最終的に BigQuery Storage API の上限に達します。FILE_LOADSを使用すると、共有 BigQuery スロットプールまたは予約済みスロットプールのいずれかが使い果たされる可能性があります。この種の障害が発生した場合は、次の方法をお試しください。- ジョブのワーカーの最大数またはワーカーのサイズを減らす。
- 予約スロットを追加購入する。
STORAGE_WRITE_APIの使用を検討する。
小規模から中規模のパイプライン(1,000 vCPU 未満)では、
STORAGE_WRITE_APIを使用することが効果的な場合があります。このような小規模なジョブで、デッドレター キューが必要な場合や、FILE_LOADS共有スロットプールが不十分な場合は、STORAGE_WRITE_APIの使用を検討してください。重複データを許容できる場合は、
STORAGE_WRITE_API_AT_LEAST_ONCEの使用を検討してください。このモードでは、重複するレコードが BigQuery に書き込まれる可能性がありますが、STORAGE_WRITE_APIオプションよりも低コストです。書き込みモードは、パイプラインの特性に応じて異なるパフォーマンスを発揮する場合があります。ワークロードに最適な書き込みモードをテストして見つけてください。
行レベルのエラーを処理する
このセクションでは、不適切な形式の入力データやスキーマの不一致が原因で発生する、行レベルのエラーの処理方法について説明します。
Storage Write API の場合、書き込みできない行は別の PCollection に配置されます。このコレクションを取得するには、WriteResult オブジェクトで getFailedStorageApiInserts を呼び出します。このアプローチの例については、BigQuery にデータをストリーミングするをご覧ください。
後で処理できるように、デッドレター キューまたはテーブルにエラーを送信することをおすすめします。このパターンの詳細については、BigQueryIO デッドレター パターンをご覧ください。
FILE_LOADS でデータの読み込み中にエラーが発生した場合、読み込みジョブが失敗し、パイプラインがランタイム例外をスローします。エラーは Dataflow ログまたは BigQuery ジョブ履歴で確認できます。I/O コネクタは、個々の失敗した行に関する情報を返しません。
エラーのトラブルシューティングの詳細については、BigQuery コネクタエラーをご覧ください。
例
次の例では、Dataflow を使用して BigQuery に書き込む方法を示します。これらの例では、BigQueryIO コネクタを使用します。
既存のテーブルに書き込む
次の例では、PCollection<MyData> を BigQuery に書き込むバッチ パイプラインを作成します(MyData はカスタムデータ型です)。
BigQueryIO.write() メソッドは、書き込みオペレーションの構成に使用される BigQueryIO.Write<T> タイプを返します。詳細については、Apache Beam ドキュメントのテーブルへの書き込みをご覧ください。このコードサンプルは、既存のテーブル(CREATE_NEVER)に書き込み、新しい行をテーブル(WRITE_APPEND)に追加します。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
新規または既存のテーブルに書き込む
次の例では、create disposition を CREATE_IF_NEEDED に設定することで、宛先テーブルが存在しない場合に新しいテーブルを作成します。このオプションを使用する場合は、テーブル スキーマを指定する必要があります。コネクタは、新しいテーブルを作成するときにこのスキーマを使用します。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
BigQuery にデータをストリーミングする
次の例は、書き込みモードを STORAGE_WRITE_API に設定して、exactly-once セマンティクスを使用してデータをストリーミングする方法を示しています。
すべてのストリーミング パイプラインが exactly-once セマンティクスを必要とするわけではありません。たとえば、宛先テーブルから重複を手動で削除できる場合があります。シナリオでレコードの重複が許容される場合は、書き込みメソッドを STORAGE_API_AT_LEAST_ONCE に設定して、exactly-once セマンティクスを使用することを検討してください。通常、この方法は効率的で、ほとんどのパイプラインでレイテンシが短縮されます。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
次のステップ
- マネージド I/O の詳細を確認する。
- Pub/Sub から BigQuery への書き込みのベスト プラクティスについて確認する。