オプションの Cloud Run 関数のトリガーを使用して Pub/Sub に変更をストリーミングする

このチュートリアルでは、Bigtable change streams to Pub/Sub テンプレートの使用方法(トピックやテンプレートの構成方法など)について説明します。イベント ストリームによってトリガーされる Cloud Run 関数を、任意のプログラミング言語で作成することもできます。

このチュートリアルは、Bigtable、コードの記述、イベント ストリーミング サービスに精通している技術ユーザーを対象としています。

Pub/Sub トピックの作成

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. ID を bigtable-change-stream-topic に設定します。

  4. [スキーマを使用する] を選択します。

  5. [Pub/Sub スキーマを選択] プルダウンで、[新しいスキーマを作成] をクリックします。スキーマを定義する新しいタブが開きます。

    1. スキーマ ID を bigtable-change-stream-schema に設定します。
    2. スキーマタイプを Avro に設定します。
    3. スキーマ定義として次の内容を貼り付けます。 スキーマの詳細については、テンプレートのドキュメント ページをご覧ください。
      {
          "name" : "ChangelogEntryMessage",
          "type" : "record",
          "namespace" : "com.google.cloud.teleport.bigtable",
          "fields" : [
            { "name" : "rowKey", "type" : "bytes"},
            {
              "name" : "modType",
              "type" : {
                "name": "ModType",
                "type": "enum",
                "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
            },
            { "name": "isGC", "type": "boolean" },
            { "name": "tieBreaker", "type": "int"},
            { "name": "columnFamily", "type": "string"},
            { "name": "commitTimestamp", "type" : "long"},
            { "name" : "sourceInstance", "type" : "string"},
            { "name" : "sourceCluster", "type" : "string"},
            { "name" : "sourceTable", "type" : "string"},
            { "name": "column", "type" : ["null", "bytes"]},
            { "name": "timestamp", "type" : ["null", "long"]},
            { "name": "timestampFrom", "type" : ["null", "long"]},
            { "name": "timestampTo", "type" : ["null", "long"]},
            { "name" : "value", "type" : ["null", "bytes"]}
        ]
      }
    
    1. [作成] をクリックしてスキーマを作成します。
  6. [スキーマの作成] タブを閉じて、スキーマリストを更新し、新しく定義したスキーマを選択します。

  7. [作成] をクリックしてトピックを作成します。

省略可: Cloud Run 関数を作成する

Pub/Sub ストリームを Cloud Run 関数で処理することもできます。

  1. bigtable-change-stream-topic トピックの [詳細] ページで、[Cloud Function をトリガー] をクリックします。
  2. [関数名] フィールドに「bt-ps-tutorial-function」と入力します。
  3. [ソースコード] セクションで [ランタイム] プルダウンをクリックし、目的のランタイムとプログラミング言語を選択します。受信時に変更ストリームを出力する hello world が生成されます。Cloud Run 関数の作成方法については、こちらのドキュメントをご覧ください。
  4. これ以外の項目はすべてデフォルト値のままにします。
  5. [関数をデプロイ] をクリックします。

変更ストリームを有効にしてテーブルを作成する

  1. Google Cloud コンソールで、Bigtable の [インスタンス] ページに移動します。

    [インスタンス] に移動

  2. このチュートリアルで使用しているインスタンスの ID をクリックします。

    使用可能なインスタンスがない場合は、近くのリージョンにデフォルトの構成でインスタンスを作成します。

  3. 左側のナビゲーション パネルで [テーブル] をクリックします。

  4. [テーブルの作成] をクリックします。

  5. テーブルに change-streams-pubsub-tutorial という名前を付けます。

  6. cf という名前の列ファミリーを追加します。

  7. [変更ストリームを有効にする] を選択します。

  8. [作成] をクリックします。

データ パイプラインを初期化して変更ストリームを取得する

  1. Bigtable の [テーブル] ページで、テーブル change-streams-pubsub-tutorial を見つけます。
  2. [変更ストリーム] 列で、[接続] をクリックします。
  3. ダイアログで [Pub/Sub] を選択します。
  4. [Dataflow ジョブを作成] をクリックします。
  5. Dataflow の [ジョブの作成] ページで、出力 Pub/Sub トピック名を bigtable-change-stream-topic に設定します。
  6. Bigtable アプリケーション プロファイル ID を default に設定します。
  7. [ジョブを実行] をクリックします。
  8. ジョブのステータスが「開始中」または「実行中」になったら処理を続行します。ジョブがキューに追加されてから約 5 分かかります。

Bigtable にデータを書き込む

  1. Cloud Shell で数行を Bigtable に書き込み、変更ログが一部のデータを Pub/Sub ストリームに書き込めるようにします。ジョブの作成後にデータを書き込んでいれば変更が表示されます。ジョブ ステータスが「running」になるのを待つ必要はありません。

    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user123 cf:col1=abc
    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user546 cf:col1=def
    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user789 cf:col1=ghi
    

Pub/Sub で変更ログを表示する

  1. Google Cloud コンソールで、Pub/Sub の [サブスクリプション] ページに移動します。

    [サブスクリプション] に移動

  2. トピック bigtable-change-stream-topic に対して自動的に作成されたサブスクリプションをクリックします。bigtable-change-stream-topic-sub という名前になります。

  3. [メッセージ] タブに移動します。

  4. [PULL] をクリックします。

  5. メッセージのリストを探索し、書き込んだデータを表示します。

    Pub/Sub の変更ログ メッセージ

省略可: Cloud Run 関数のログで変更を確認する

Cloud Run 関数を作成した場合は、ログで変更を確認できます。

  1. Google Cloud コンソールで、Cloud Run functions に移動します。

    Cloud Run 関数に移動します

  2. 関数 bt-ps-tutorial-function をクリックします。

  3. [ログ] タブに移動します。

  4. ログが表示されるように、[重大度] が「情報」以上に設定されていることを確認します。

  5. ログを調べて、書き込んだデータを表示します。

出力は次のようになります。

Pub/Sub message: {"rowKey":"user789","modType":"SET_CELL","isGC":false,"tieBreaker":0,"columnFamily":"cf","commitTimestamp":1695653833064548,"sourceInstance":"YOUR-INSTANCE","sourceCluster":"YOUR-INSTANCE-c1","sourceTable":"change-streams-pubsub-tutorial","column":{"bytes":"col1"},"timestamp":{"long":1695653832278000},"timestampFrom":null,"timestampTo":null,"value":{"bytes":"ghi"}}