リバース ETL を使用して BigQuery から Spanner Graph にデータを読み込む

このドキュメントでは、リバース ETL パイプラインを使用して、BigQuery から Spanner Graph にグラフデータを移動し、継続的に同期する方法について説明します。ここでは、次の重要な側面について説明します。

リバース ETL を使用して BigQuery から Spanner にデータをエクスポートするには、Spanner にデータをエクスポートするをご覧ください。

BigQuery は、分析処理プラットフォームとして複雑なデータ操作を大規模に実行します。一方、Spanner は、高い QPS と低レイテンシのサービス提供を必要とするユースケース向けに最適化されています。Spanner Graph と BigQuery は、BigQuery 分析パイプラインでグラフデータを準備し、Spanner が低レイテンシのグラフ トラバーサルを提供できるように、効果的に統合されています。

始める前に

  1. グラフデータを含むデータベースを使用して Spanner インスタンスを作成します。詳細については、Spanner Graph を設定してクエリを実行するをご覧ください。

  2. BigQuery で、Enterprise または Enterprise Plus ティアのスロット予約を作成します。Spanner Graph へのエクスポートを実行する際に、BigQuery のコンピューティング費用を削減できます。これを行うには、ベースライン スロット容量をゼロに設定し、自動スケーリングを有効にします。

  3. このドキュメントの各タスクを実行するために必要な権限をユーザーに与える Identity and Access Management(IAM)のロールを付与します。

必要なロール

BigQuery グラフデータを Spanner Graph にエクスポートするために必要な権限を取得するには、プロジェクトに対して次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

リバース ETL のユースケース

次のようなユースケースがあります。BigQuery でデータを分析して処理した後、リバース ETL を使用してデータを Spanner Graph に移動できます。

データの集計と要約 - BigQuery を使用して、粒度の細かいデータを集計し、運用するユースケースに適したデータにします。

データ変換と拡充 - BigQuery を使用して、さまざまなデータソースから受信したデータをクレンジングして標準化します。

データのフィルタリングと選択 - BigQuery を使用して、分析目的で大規模なデータセットをフィルタリングします。たとえば、リアルタイム アプリケーションに不要なデータを除外できます。

特徴の前処理とエンジニアリング - BigQuery で、ML.TRANSFORM 関数を使用してデータを変換します。あるいは、ML.FEATURE_CROSS 関数を使用して入力特徴の特徴クロスを作成します。次に、リバース ETL を使用して、結果のデータを Spanner Graph に移動します。

リバース ETL パイプラインについて

リバース ETL パイプラインでは、データは次の 2 つのステップで BigQuery から Spanner Graph に移動します。

  1. BigQuery が、パイプライン ジョブに割り当てられたスロットを使用して、ソースデータを抽出して変換します。

  2. BigQuery リバース ETL パイプラインが Spanner API を使用して、プロビジョニングされた Spanner インスタンスにデータを読み込みます。

次の図は、リバース ETL パイプラインのステップを示しています。

リバース ETL パイプラインで BigQuery から Spanner Graph にデータを移動する際の主な 3 つのステップを示す図。

図 1. BigQuery リバース ETL パイプライン プロセス

グラフデータの変更を管理する

リバース ETL を使用すると、次のことができます。

  • BigQuery から Spanner Graph にグラフ データセットを読み込みます。

  • BigQuery のデータセットから継続的に更新されるデータを使用して、Spanner Graph のデータを同期します。

SQL クエリを使用してリバース ETL パイプラインを構成し、ソースデータと適用する変換を指定します。パイプラインは、SELECT ステートメントの WHERE 句を満たすデータをすべて、upsert オペレーションで Spanner に読み込みます。upsert オペレーションは INSERT OR UPDATE ステートメントと同等です。グラフデータを格納するテーブルに新しい行を挿入し、既存の行を更新します。パイプラインは、Spanner テーブルの主キーに基づいて行の作成と更新を行います。

読み込み順序の依存関係があるテーブルのデータを挿入して更新する

Spanner Graph スキーマ設計のベスト プラクティスでは、インターリーブ テーブルと外部キーを使用することをおすすめしています。インターリーブされたテーブルまたは外部キーの適用を使用する場合は、ノードデータとエッジデータを特定の順序で読み込む必要があります。これにより、参照元の行を作成する前に、参照先の行が存在することが保証されます。詳細については、インターリーブ テーブルを作成するをご覧ください。

次のグラフ入力テーブル スキーマの例では、インターリーブ テーブルと外部キー制約を使用して、人物とそのアカウントの関係をモデリングしています。

CREATE TABLE Person (
  id    INT64 NOT NULL,
  name  STRING(MAX)
) PRIMARY KEY (id);

CREATE TABLE Account (
  id           INT64 NOT NULL,
  create_time  TIMESTAMP,
  is_blocked   BOOL,
  type        STRING(MAX)
) PRIMARY KEY (id);

CREATE TABLE PersonOwnAccount (
  id           INT64 NOT NULL,
  account_id   INT64 NOT NULL,
  create_time  TIMESTAMP,
  CONSTRAINT FK_Account FOREIGN KEY (account_id) REFERENCES Account (id)
) PRIMARY KEY (id, account_id),
  INTERLEAVE IN PARENT Person ON DELETE CASCADE;

CREATE PROPERTY GRAPH FinGraph
  NODE TABLES (
    Person,
    Account
  )
  EDGE TABLES (
    PersonOwnAccount
      SOURCE KEY (id) REFERENCES Person
      DESTINATION KEY (account_id) REFERENCES Account
      LABEL Owns
  );

このスキーマ例では、PersonOwnAccountPerson のインターリーブ テーブルです。PersonOwnAccount テーブルの要素の前に Person テーブルの要素を読み込みます。また、PersonOwnAccount の外部キー制約により、エッジ関係のターゲットである Account に一致する行が存在することが保証されます。したがって、PersonOwnAccount テーブルの前に Account テーブルを読み込みます。次のリストは、このスキーマの読み込み順序の依存関係をまとめたものです。

データの読み込み手順は次のとおりです。

  1. PersonOwnAccount の前に Person を読み込みます。
  2. PersonOwnAccount の前に Account を読み込みます。

サンプル スキーマでは、Spanner は参照整合性制約を適用しています。パイプラインが Person テーブルまたは Account テーブルに一致する行がない状態で PersonOwnAccount テーブルに行を作成しようとすると、Spanner はエラーを返します。その後、パイプラインは失敗します。

このリバース ETL パイプラインの例では、BigQuery の EXPORTDATA ステートメントを使用して、データセット内の PersonAccountPersonOwnAccount テーブルからデータをエクスポートし、読み込み順序の依存関係を満たしています。

BEGIN
EXPORT DATA OPTIONS (
    uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
    format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "Person",
      "priority": "HIGH",
      "tag" : "graph_data_load_person"
    }"""
  ) AS
  SELECT
    id,
    name
  FROM
    DATASET_NAME.Person;

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
  spanner_options="""{
    "table": "Account",
    "priority": "HIGH",
    "tag" : "graph_data_load_account"
  }"""
) AS
SELECT
  id,
  create_time,
  is_blocked,
  type
FROM
  DATASET_NAME.Account;

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
  spanner_options="""{
    "table": "PersonOwnAccount",
    "priority": "HIGH",
    "tag" : "graph_data_load_person_own_account"
  }"""
) AS
SELECT
  id,
  account_id,
  create_time
FROM
  DATASET_NAME.PersonOwnAccount;
END;

データを同期する

BigQuery と Spanner Graph を同期するには、リバース ETL パイプラインを使用します。次のいずれかを行うようにパイプラインを構成できます。

  • BigQuery ソースからの挿入と更新を Spanner Graph ターゲット テーブルに適用します。スキーマ要素をターゲット テーブルに追加して、削除を論理的に通知し、スケジュールに従ってターゲット テーブルの行を削除できます。

  • 挿入オペレーションと更新オペレーションを適用し、削除オペレーションを識別する時系列関数を使用します。

参照整合性の制約

Spanner とは異なり、BigQuery では主キー制約と外部キー制約は強制されません。BigQuery データが Spanner テーブルに作成した制約に準拠していない場合、そのデータの読み込み時にリバース ETL パイプラインが失敗する可能性があります。

リバース ETL は、commit あたりの最大ミューテーション数の上限を超えないバッチにデータを自動的にグループ化し、任意の順序でバッチを Spanner テーブルにアトミックに適用します。参照整合性チェックに失敗したデータがバッチに含まれている場合、Spanner はそのバッチを読み込みません。このような失敗の例としては、親行のないインターリーブされた子行や、参照先の列に一致する値がない外部キー列の強制などがあります。バッチがチェックに失敗すると、パイプラインがエラーで失敗し、パイプラインはバッチの読み込みを停止します。

参照整合性の制約エラーについて

次の例は、発生する可能性のある参照整合性制約エラーを示しています。

外部キーの制約エラーを解決する
  • エラー: 「テーブル PersonOwnAccount で外部キー制約 FK_Account に違反しています。Account(id) で参照された値が見つかりません」

  • 原因: FK_Account 外部キーで必要となる Account テーブルに一致する行がないため、PersonOwnAccount テーブルへの行の挿入に失敗しました。

親行が見つからないエラーを解決する
  • エラー: 「テーブル PersonOwnAccount の行 [15,1] の親行がありません」

  • 原因: Person テーブル(id: 15)の親行がないため、PersonOwnAccountid: 15account_id: 1)への行の挿入が失敗しました。

参照整合性エラーのリスクを軽減するには、次のオプションを検討してください。各オプションにはトレードオフがあります。

  • 制約を緩和して、Spanner Graph がデータを読み込めるようにします。
  • 参照整合性制約に違反する行を省略するロジックをパイプラインに追加します。

参照整合性を緩和する

データを読み込むときに参照整合性エラーを回避する方法の一つは、Spanner が参照整合性を適用しないように制約を緩和することです。

  • INTERLEAVE IN 句を使用してインターリーブ テーブルを作成します。ここでは、同じ物理行インターリーブ特性を使用できます。INTERLEAVE IN PARENT の代わりに INTERLEAVE IN を使用すると、Spanner は参照整合性を適用しませんが、クエリは関連テーブルのコロケーションのメリットを享受できます。

  • NOT ENFORCED オプションを使用すると、情報用の外部キーを作成できます。NOT ENFORCED オプションは、クエリの最適化のメリットを提供します。ただし、Spanner は参照整合性を強制しません。

たとえば、参照整合性チェックなしでエッジ入力テーブルを作成するには、次の DDL を使用します。

CREATE TABLE PersonOwnAccount (
  id          INT64 NOT NULL,
  account_id  INT64 NOT NULL,
  create_time TIMESTAMP,
  CONSTRAINT FK_Account FOREIGN KEY (account_id) REFERENCES Account (id) NOT ENFORCED
) PRIMARY KEY (id, account_id),
INTERLEAVE IN Person;

リバース ETL パイプラインで参照整合性を維持する

パイプラインで参照整合性チェックを満たす行のみが読み込まれるようにするには、Person テーブルと Account テーブルに一致する行がある PersonOwnAccount 行のみを含めます。次に、読み込み順序を保持して、Spanner が Person 行と Account 行を、それらを参照する PersonOwnAccount 行の前に読み込むようにします。

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag" : "graph_data_load_person_own_account"
    }"""
  ) AS
  SELECT
    poa.id,
    poa.account_id,
    poa.create_time
  FROM `PROJECT_ID.DATASET_NAME.PersonOwnAccount` poa
    JOIN `PROJECT_ID.DATASET_NAME.Person` p ON (poa.id = p.id)
    JOIN `PROJECT_ID.DATASET_NAME.Account` a ON (poa.account_id = a.id)
  WHERE poa.id = p.id
    AND poa.account_id = a.id;

グラフ要素を削除する

リバース ETL パイプラインは、upsert オペレーションを使用します。upsert オペレーションは INSERT OR UPDATE ステートメントと同等であるため、パイプラインは実行時にソースデータに存在する行のみを同期できます。つまり、パイプラインは削除された行を除外します。BigQuery からデータを削除した場合、リバース ETL パイプラインは Spanner Graph から同じデータを直接削除できません。

次のいずれかのオプションを使用して、BigQuery ソーステーブルからの削除を処理できます。

ソースで論理削除または削除(復元可能)を実行する

行を削除対象として論理的にマークするには、BigQuery で削除済みフラグを使用します。次に、フラグを伝播できるターゲット Spanner テーブルに列を作成します。リバース ETL がパイプラインの更新を適用するときに、Spanner でこのフラグが設定されている行を削除します。このような行は、パーティション化 DML を使用して明示的に検索して削除できます。または、削除フラグ列に依存する日付で TTL(有効期間)列を構成して、行を暗黙的に削除します。これらの論理的に削除された行を除外する Spanner クエリを作成します。これにより、Spanner はスケジュールされた削除の前にこれらの行を結果から除外します。リバース ETL パイプラインが完了すると、Spanner は論理削除をその行に反映します。その後、BigQuery から行を削除できます。

この例では、Spanner の PersonOwnAccount テーブルに is_deleted 列を追加します。次に、is_deleted の値に依存する expired_ts_generated 列を追加します。生成された列の日付が DELETION POLICY しきい値より前であるため、TTL ポリシーは削除対象の行をスケジュールします。

ALTER TABLE PersonOwnAccount
  ADD COLUMN is_deleted BOOL DEFAULT (FALSE);

ALTER TABLE PersonOwnAccount ADD COLUMN
  expired_ts_generated TIMESTAMP AS (IF(is_deleted,
    TIMESTAMP("1970-01-01 00:00:00+00"),
    TIMESTAMP("9999-01-01 00:00:00+00"))) STORED HIDDEN;

ALTER TABLE PersonOwnAccount
  ADD ROW DELETION POLICY (OLDER_THAN(expired_ts_generated, INTERVAL 0 DAY));

INSERT、UPDATE、論理削除に BigQuery の変更履歴を使用する

BigQuery テーブルの変更を追跡するには、変更履歴を使用します。GoogleSQL の CHANGES 関数を使用して、特定の時間間隔内で変更された行を検索します。次に、削除された行情報をリバース ETL パイプラインで使用します。パイプラインを設定して、Spanner テーブルに削除フラグや有効期限などのインジケーターを設定できます。このインジケーターは、Spanner テーブルで削除する行をマークします。

CHANGES 時系列関数の結果を使用して、リバース ETL パイプラインの読み込みに含めるソーステーブルの行を決定します。

_CHANGE_TYPEINSERT または UPDATE の行がソーステーブルに存在する場合は upsert として含まれます。ソーステーブルの現在の行には、最新のデータが格納されています。

ソーステーブルに既存の行がない DELETE として _CHANGE_TYPE を含む行を使用して、Spanner テーブルにインジケーター(削除フラグや行の有効期限など)を設定します。

エクスポート クエリでは、BigQuery での挿入と削除の順序を考慮する必要があります。たとえば、時刻 T1 で削除された行と、後で時刻 T2 で挿入された新しい行について考えてみましょう。両方が同じ Spanner テーブル行にマッピングされている場合、エクスポートでは、これらのイベントの効果を元の順序で保持する必要があります。

設定されている場合、削除インジケーターは、Spanner テーブルで削除対象の行をマークします。

たとえば、Spanner 入力テーブルに列を追加して、各行の有効期限を保存できます。次に、これらの有効期限を使用する削除ポリシーを作成します。

次の例は、テーブルの行の有効期限を保存する列を追加する方法を示しています。

ALTER TABLE PersonOwnAccount ADD COLUMN expired_ts TIMESTAMP;

ALTER TABLE PersonOwnAccount
  ADD ROW DELETION POLICY (OLDER_THAN(expired_ts, INTERVAL 1 DAY));

BigQuery のテーブルで CHANGES 関数を使用するには、テーブルの enable_change_history オプションTRUE に設定します。

ALTER TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`
  SET OPTIONS (enable_change_history=TRUE);

次の例は、リバース ETL を使用して、新規または変更された行を更新し、削除対象としてマークされた行の有効期限を設定する方法を示しています。PersonOwnAccount テーブルとの左結合により、クエリは各行の現在のステータスに関する情報を取得します。

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
    format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag" : "graph_data_delete_via_reverse_etl"
    }"""
  ) AS
SELECT
  DISTINCT
   IF (changes._CHANGE_TYPE = 'DELETE', changes.id, poa.id) AS id,
   IF (changes._CHANGE_TYPE = 'DELETE', changes.account_id, poa.account_id) AS account_id,
   IF (changes._CHANGE_TYPE = 'DELETE', changes.create_time, poa.create_time) AS create_time,
   IF (changes._CHANGE_TYPE = 'DELETE', changes._CHANGE_TIMESTAMP, NULL) AS expired_ts
FROM
  CHANGES(TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`,
    TIMESTAMP_TRUNC(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY), DAY),
    TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY)) changes
LEFT JOIN `PROJECT_ID.DATASET_NAME.PersonOwnAccount` poa
  ON (poa.id = changes.id
  AND poa.account_id = changes.account_id)
WHERE (changes._CHANGE_TYPE = 'DELETE'
   AND poa.id IS NULL)
   OR (changes._CHANGE_TYPE IN ( 'UPDATE', 'INSERT')
   AND poa.id IS NOT NULL );

クエリの例では、順序を保持するためにソーステーブルで LEFT JOIN を使用しています。この結合により、クエリ変更履歴の間隔内で削除されてから再作成された行の DELETE 変更レコードが無視されます。パイプラインは有効な新しい行を保持します。

行を削除すると、パイプラインは _CHANGE_TIMESTAMP 列の DELETE タイムスタンプを使用して、対応する Spanner Graph 行の expired_ts 列に入力します。Spanner の行削除ポリシー(TTL ポリシー)は、expired_ts 値が過去 1 日以上前の行を削除します。

システムの信頼性を確保するには、パイプラインのスケジュール、変更ルックバック ウィンドウ、Spanner TTL ポリシーを調整します。パイプラインを毎日実行するようにスケジュールします。Spanner TTL ポリシーの期間は、この実行間隔より長くする必要があります。これにより、Spanner TTL ポリシーによってすでに削除された行に対して、パイプラインが以前の DELETE イベントを再処理することを防ぐことができます。

この例では、前日の UTC の BigQuery テーブルのすべての変更をキャプチャする毎日のクエリの start_timestampend_timestamp の間隔を示します。これはバッチクエリであり、CHANGES 関数には制限があるため、end_timestamp は現在時刻の 10 分以上前である必要があります。したがって、このクエリは午前 0 時(UTC)から 10 分以上あとに実行されるようにスケジュールしてください。詳細については、CHANGES のドキュメントをご覧ください。

最終確認タイムスタンプを含む TTL 列を使用する

リバース ETL パイプラインは、Spanner テーブルの各行の last_seen_ts 列を現在のタイムスタンプに設定します。BigQuery の行を削除しても、Spanner の対応する行は更新されず、last_seen_ts 列は変更されません。その後、Spanner は、定義されたしきい値に基づいて、TTL ポリシーまたはパーティション化 DML を使用して、古い last_seen_ts を含む行を削除します。スケジュールされた削除の前に、Spanner クエリでこのしきい値より古い last_seen_ts を持つ行を除外できます。このアプローチは、グラフデータが定期的に更新され、更新がない場合は削除対象の古いデータであることを示す場合に効果的です。

完全更新を実行する

BigQuery から読み込む前に、Spanner テーブルを削除して、ソーステーブルの削除を反映できます。これにより、次のパイプライン実行時に、ソースの BigQuery テーブルから削除された行が Spanner に読み込まれないようになります。これは、実装が最も簡単なオプションかもしれません。ただし、グラフデータの完全な再読み込みに必要な時間を考慮してください。

スケジュールされたバッチリバース ETL パイプラインを維持する

リバース ETL パイプラインの初回実行で、BigQuery から Spanner Graph にデータが一括読み込みされた後も、実際のデータは変化し続けます。データセットが変更されると、パイプラインはグラフ要素を追加または削除します。パイプラインが新しいノードを検出し、新しいエッジ関係を追加するか、AI 推論が新しいエッジ関係を生成します。

Spanner Graph データベースを最新の状態に保つには、次のいずれかのオプションを使用して BigQuery のパイプライン オーケストレーションをスケジュールして順序付けます。

BigQuery Pipelines を使用すると、BigQuery で複雑な SQL データ変換ワークフローを開発、テスト、バージョン管理、デプロイできます。パイプライン内のクエリ間の関係を定義することで、順序の依存関係をネイティブに処理します。Dataform は依存関係ツリーを構築し、クエリを正しい順序で実行します。これにより、ダウンストリーム タスクが開始される前にアップストリームの依存関係が完了します。

Cloud Scheduler によって呼び出される Workflows は、BigQuery クエリなどのGoogle Cloud サービスのシーケンスをオーケストレートするための有用で柔軟なソリューションを提供します。ワークフローは、それぞれが BigQuery ジョブを実行する一連のステップとして定義します。Cloud Scheduler を使用すると、定義されたスケジュールでこれらのワークフローを呼び出すことができます。ワークフロー定義を使用して依存関係を管理し、実行順序の指定、条件付きロジックの実装、エラーの処理、あるクエリから別のクエリへの出力の受け渡しを行います。

BigQuery のスケジュールされたクエリ(BigQuery 転送ジョブとも呼ばれます)を使用すると、SQL ステートメントを定期的に実行できます。スケジュールされたクエリには、堅牢なエラー処理や動的な依存関係管理の機能はありません。

BigQuery 継続的クエリを使用したリバース ETL

BigQuery 継続的クエリ機能を使用すると、BigQuery オペレーションをほぼリアルタイムで実行できます。EXPORT DATA と継続的クエリを組み合わせると、スケジュールされたバッチジョブを回避するリバース ETL パイプラインを実行する別の方法が提供されます。

継続的クエリは、ソース BigQuery テーブルで新しい行をモニタリングする長時間実行クエリです。BigQuery は、テーブルに追加された新しい行を検出すると、クエリ結果を EXPORT DATA オペレーションにストリーミングします。

このアプローチには次のような利点があります。

  • ほぼリアルタイムのデータ同期: BigQuery の新しい行が、最小限の遅延で Spanner に反映されます。

  • バッチ処理のオーバーヘッドの削減: 継続的クエリにより、定期的なバッチジョブが不要になり、コンピューティングのオーバーヘッドが削減されます。

  • イベント ドリブン更新: BigQuery の実際の変更に応じて Spanner データを更新します。

継続的クエリ パイプラインには、job_typeCONTINUOUS のスロット予約割り当てが必要です。これは、プロジェクトまたはフォルダ レベル、または組織レベルで割り当てます。

BigQuery から Spanner へのリバース ETL を使用して継続的クエリを作成する

APPENDS 関数の start_timestamp パラメータを構成して、バッチ読み込みが終了した場所からデータの処理を開始します。この関数は、特定の時間枠で作成されたすべての行をキャプチャします。次の例では、パイプラインが開始点を CURRENT_TIME の 10 分前に任意に設定しています。このタイムスタンプは、BigQuery のタイムトラベル ウィンドウ内に存在する必要があります。

継続的クエリ パイプラインを開始するには、いくつかの方法があります。たとえば、次の方法があります。

  1. BigQuery Studio で、[その他] を選択し、[クエリモードを選択] で [継続的クエリ] を選択します。

  2. bq CLI を使用して、--continuous=true オプションを指定します。

EXPORT DATA OPTIONS ( uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format="CLOUD_SPANNER",
  spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag": "reverse-etl-continuous",
      "change_timestamp_column": "create_time"
   }"""
)
AS SELECT id, account_id, _CHANGE_TIMESTAMP as create_time
  FROM
APPENDS(TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`,
  CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE )

読み込み順序は保証されない

Spanner Graph データは複数の入力テーブルで構成されます。テーブルに参照整合性制約がある場合は、厳密な読み込み順序に従う必要があります。ただし、同時実行の継続的クエリでは、Spanner が行を追加する順序を制御できません。そのため、継続的クエリを使用した Spanner Graph データの読み込みは、参照整合性制約が緩和されたグラフスキーマでのみ使用できます。

既存のパイプラインと統合する

継続的クエリは、既存のスケジュールされたバッチジョブを補完します。たとえば、ほぼリアルタイムの更新には継続的クエリを使用し、完全なデータ同期または調整にはスケジュールされたジョブを使用します。

BigQuery 継続的クエリを使用して、BigQuery と Spanner Graph 間でデータを同期するための、応答性の高い最新のリバース ETL パイプラインを構築します。

継続的クエリに関する考慮事項

  • 費用: 継続的クエリでは、継続的なクエリ実行とデータ ストリーミングで費用が発生します。

  • エラー処理: 連続クエリ パイプラインで、重複する主キーや参照整合性違反などのデータベース エラーが発生すると、そのパイプラインはキャンセルされます。パイプラインが失敗した場合は、クエリを再開する前に、ソースの BigQuery テーブルのデータを手動で修正する必要があります。

  • 削除と更新が処理されない: APPENDS 関数は挿入のみをキャプチャします。削除や更新はキャプチャされません。

リバース ETL のベスト プラクティスに従う

最良の結果を得るには、次の操作を行います。

  • エッジデータを読み込むときに参照整合性エラーを防ぐための戦略を選択します。

  • ダングリング エッジを防ぐようにデータ パイプライン全体を設計します。ダングリング エッジは、Spanner Graph クエリの効率とグラフ構造の完全性を損なう可能性があります。詳細については、ダングリング エッジエッジを防ぐをご覧ください。

  • Spanner のエクスポートの最適化に関する推奨事項に従います。

  • 大量のデータを読み込む場合は、パイプラインを複数の小さなパイプラインに分割して、デフォルトの BigQuery クエリ実行時間の割り当て(6 時間)を超えないようにすることを検討してください。詳細については、BigQuery クエリジョブの制限をご覧ください。

  • 大量のデータを読み込む場合は、初期の一括データ読み込みが完了した後に、インデックスと外部キー制約を追加します。外部キー制約では、検証のための追加の読み取りが必要になり、インデックスでは追加の書き込みが必要になるため、この方法によりデータ読み込みのパフォーマンスが向上します。これらのオペレーションはトランザクション参加者の数を増やすため、データ読み込みプロセスが遅くなる可能性があります。

  • Spanner で自動スケーリングを有効にして、インスタンスへのデータ読み込み時間を短縮します。次に、BigQuery の EXPORT DATA コマンドの spanner_options セクションで、Spanner の priority パラメータを HIGH に構成します。詳細については、Spanner の自動スケーリングの概要spanner_options オプションを使用してエクスポートを構成するRequestOptions.priority をご覧ください。

  • 大規模なデータ読み込みの場合は、分割ポイントを作成して、データベースを事前に分割します。これにより、Spanner はスループットの増加に対応できるようになります。

  • パイプライン定義で、Spanner のデータ読み込みのリクエストの優先度を構成します。

次のステップ