BigQuery サブスクリプションの作成

このドキュメントでは、BigQuery サブスクリプションの作成方法について説明します。BigQuery サブスクリプションを作成するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API を使用できます。

始める前に

このドキュメントを読む前に、次の内容をよく理解しておいてください。

BigQuery サブスクリプションを作成する前に、Pub/Sub と BigQuery に精通しているだけでなく、次の前提条件を満たしていることを確認してください。

  • BigQuery テーブルが存在している。あるいは、このドキュメントの後半のセクションで説明するように、BigQuery サブスクリプションの作成時に作成することもできます。

  • Pub/Sub トピックのスキーマと BigQuery テーブルの間の互換性。互換性のない BigQuery テーブルを追加すると、互換性に関連するエラー メッセージが出力されます。詳細については、スキーマの互換性をご覧ください。

必要なロールと権限

BigQuery サブスクリプションの作成に必要な権限を取得するには、プロジェクトに対する Pub/Sub 編集者 roles/pubsub.editor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、BigQuery サブスクリプションの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

BigQuery サブスクリプションを作成するには、次の権限が必要です。

  • プロジェクトに対する pubsub.subscriptions.create
  • トピックに対する pubsub.topics.attachSubscription

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

また、Pub/Sub サービス エージェントまたはカスタム サービス アカウントに、BigQuery テーブルへの書き込み権限が必要です。詳細については、サービス アカウントにロールを割り当てるをご覧ください。

プロジェクト間のサブスクリプション

別のプロジェクトのトピックに対して 1 つのプロジェクトでサブスクリプションを作成する場合は、サブスクリプションを作成するプロジェクトに対する pubsub.subscriptions.create 権限と、トピックに対する pubsub.topics.attachSubscription 権限が必要です。

サービス アカウントにロールを割り当てる

一部の Google Cloud サービスには、サービスがリソースにアクセスできるようにする Google Cloud管理のサービス アカウントがあります。これらのサービス アカウントは、サービス エージェントと呼ばれます。Pub/Sub は、プロジェクトごとに service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com 形式のサービス エージェントを作成し、維持します。

Pub/Sub サービス エージェントまたはカスタム サービス アカウントに BigQuery テーブルへの書き込み権限を付与するかどうかを選択できます。

Pub/Sub サービス エージェントに権限を付与すると、プロジェクトでサブスクリプションを作成する権限を持つユーザーは、BigQuery テーブルに書き込むことができます。BigQuery テーブルへの書き込み権限をより細かく設定する場合は、代わりにカスタム サービス アカウントを構成します。

BigQuery IAM の詳細については、BigQuery のロールと権限をご覧ください。

Pub/Sub サービス エージェントに BigQuery のロールを割り当てる

Pub/Sub サービス エージェントを使用して BigQuery サブスクリプションを作成する場合は、特定の BigQuery テーブルへの書き込みとテーブル メタデータの読み取りを行う権限が必要です。

Pub/Sub サービス エージェントに BigQuery データ編集者(roles/bigquery.dataEditor)ロールを付与します。権限は、個々のテーブルまたはプロジェクト全体に付与できます。

テーブル

  1. Google Cloud コンソールで、[BigQuery Studio] に移動します。

    [BigQuery Studio] に移動

  2. [エクスプローラ] ペインの [名前とラベルでフィルタ] というラベルの付いた検索ボックスに、テーブルの名前を入力して Enter キーを押します。

  3. 権限を付与するテーブルをクリックします。

  4. テーブルで、 [その他の操作] > [共有] > [権限] を選択します。

    または、表をクリックし、メインページで [共有] > [権限] をクリックします。

    [共有権限] ウィンドウが開きます。

  5. [プリンシパルを追加] をクリックします。

  6. [プリンシパルを追加] に、サブスクリプションを含むプロジェクトの Pub/Sub サービス エージェントの名前を入力します。サービス エージェントの形式は service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com です。たとえば、project-number=112233445566 を使用するプロジェクトの場合、サービス エージェントの形式は service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com です。

  7. [ロールを選択] プルダウンに「BigQuery」と入力し、[BigQuery データ編集者] ロールを選択します。

  8. [保存] をクリックします。

プロジェクト

  1. Google Cloud コンソールで、[IAM] ページに移動します。

    IAM に移動

  2. [アクセス権を付与] をクリックします。

  3. [プリンシパルを追加] セクションに、Pub/Sub サービス エージェントの名前を入力します。サービス エージェントの形式は service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com です。たとえば、project-number=112233445566 を使用するプロジェクトの場合、サービス エージェントの形式は service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com です。

  4. [ロールを割り当てる] セクションで、[別のロールを追加] をクリックします。

  5. [ロールを選択] プルダウンに「BigQuery」と入力し、[BigQuery データ編集者] ロールを選択します。

  6. [保存] をクリックします。

カスタム サービス アカウントに BigQuery のロールを割り当てる

BigQuery テーブルへの書き込みにカスタム サービス アカウントを使用する場合は、次の権限を設定する必要があります。

  • カスタム サービス アカウントには、特定の BigQuery テーブルへの書き込みとテーブル メタデータの読み取りを行う権限が必要です。
  • Pub/Sub サービス エージェントには、カスタム サービス アカウントに対する iam.serviceAccounts.getAccessToken 権限が必要です。
  • サブスクリプションを作成するユーザーには、カスタム サービス アカウントに対する iam.serviceAccounts.actAs 権限が必要です。

サービス アカウントを作成して権限を付与する手順は次のとおりです。

  1. カスタム サービス アカウントを作成します。サービス アカウントは、サブスクリプションと同じプロジェクトに存在する必要があります。

  2. カスタム サービス アカウントに BigQuery データ編集者(roles/bigquery.dataEditor)ロールを付与します。

    サービス アカウントには、プロジェクト内の単一のテーブルまたはプロジェクト内のすべてのテーブルに対する権限を付与できます。これを行うには、Pub/Sub サービス エージェントに BigQuery のロールを割り当てるの該当するセクションをご覧ください。この手順では、Pub/Sub サービス エージェントのメールアドレスをカスタム サービス アカウントのメールアドレスに置き換えます。

  3. Pub/Sub サービス エージェントに、カスタム サービス アカウントまたはプロジェクト内のすべてのサービス アカウントに対する iam.serviceAccounts.getAccessToken 権限を付与します。この権限を付与するには、Pub/Sub サービス エージェントに roles/iam.serviceAccountTokenCreator ロールを付与します。

    要件に基づいて適切な方法を選択します。

サービス アカウント

  1. Google Cloud コンソールで、[サービス アカウント] ページに移動します。

    [サービス アカウント] に移動

  2. [フィルタ] にカスタム サービス アカウントの名前を入力します。

  3. リストからサービス アカウントを選択します。

  4. [アクセス権を持つプリンシパル] をクリックします。

  5. [アクセス権を付与] をクリックします。

  6. [プリンシパルを追加] セクションに、サブスクリプションを含むプロジェクトの Pub/Sub サービス エージェントの名前を入力します。サービス エージェントの形式は service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com です。たとえば、project-number=112233445566 を使用するプロジェクトの場合、サービス エージェントの形式は service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com です。

  7. [ロールを選択] プルダウンで、Service Account を入力し、[サービス アカウント トークン作成者] ロールを選択します。

  8. [保存] をクリックします。

プロジェクト

  1. Google Cloud コンソールで、[IAM] ページに移動します。

    IAM に移動

  2. [アクセス権を付与] をクリックします。

  3. [プリンシパルを追加] セクションに、カスタム サービス アカウントの名前を入力します。

  4. [ロールを割り当てる] セクションで、[別のロールを追加] をクリックします。

  5. [ロールを選択] プルダウンで、Service Account を入力し、[サービス アカウント トークン作成者] ロールを選択します。

  6. [保存] をクリックします。

カスタム サービス アカウントを作成した場合は、必要な iam.serviceAccounts.actAs 権限がすでに付与されています。サービス アカウントに対する権限を他のユーザーに付与する必要がある場合:

  1. Google Cloud コンソールで、[サービス アカウント] ページに移動します。

    [サービス アカウント] に移動

  2. [フィルタ] にカスタム サービス アカウントの名前を入力します。

  3. リストからサービス アカウントを選択します。

  4. [アクセス権を持つプリンシパル] をクリックします。

  5. [アクセス権を付与] をクリックします。

  6. [プリンシパルを追加] セクションに、アクセス権を付与するアカウントの名前を入力します。

  7. [ロールを選択] プルダウンで、Service Account を入力し、[サービス アカウント ユーザー] ロールを選択します。

  8. また、BigQuery テーブルが Apache Iceberg テーブルの場合は、Cloud Storage バケットにアクセスするために、Pub/Sub サービス アカウントにストレージ管理者ロール(roles/storage.admin)を付与します。

  9. [保存] をクリックします。

BigQuery サブスクリプション プロパティ

BigQuery のサブスクリプションを構成するときに、次のプロパティを指定できます。

共通の特徴

すべてのサブスクリプションで設定できる共通のサブスクリプション プロパティについて学習します。

トピック スキーマを使用する

このオプションにより、Pub/Sub は、サブスクリプションが接続されている Pub/Sub トピックのスキーマを使用できます。さらに、Pub/Sub はメッセージ内のフィールドを BigQuery テーブル内の対応する列に書き込みます。

このオプションを使用する場合は、以下の追加要件を確認してください。

  • トピック スキーマ内のフィールドと BigQuery スキーマ内のフィールドは、同じ名前でなければならず、その型には相互に互換性が必要です。

  • トピック スキーマのオプション フィールドは BigQuery スキーマでも省略可能な必要があります。

  • トピック スキーマの必須フィールドは、BigQuery スキーマでは必要ありません。

  • BigQuery フィールドがトピック スキーマに存在しない場合、これらの BigQuery フィールドは NULLABLE モードにする必要があります。

  • トピック スキーマに、BigQuery スキーマに存在しない追加のフィールドがあり、このようなフィールドを削除できる場合は、[不明な項目を削除する] オプションを選択します。

  • サブスクリプション プロパティとして [トピック スキーマを使用する] または [テーブル スキーマを使用する] のいずれか 1 つのみを選択できます。

[トピック スキーマを使用する] または [テーブル スキーマを使用する] オプションを選択しない場合は、BigQuery テーブルに、BYTESSTRING、または JSON 型の data という列があることを確認してください。Pub/Sub は、この BigQuery 列にメッセージを書き込みます。

Pub/Sub トピック スキーマまたは BigQuery テーブル スキーマの変更は、BigQuery テーブルに書き込まれたメッセージにすぐに反映されない場合があります。たとえば、[不明なフィールドを削除する] オプションが有効で、フィールドが Pub/Sub スキーマには存在するが BigQuery スキーマにはない場合、BigQuery テーブルに書き込まれるメッセージには、BigQuery スキーマに追加した後も、フィールドが含まれていない場合があります。最終的に、スキーマが同期され、後続のメッセージにこのフィールドが含まれます。

BigQuery サブスクリプションで [テーブル スキーマを使用する] オプションを使用すると、BigQuery 変更データ キャプチャ(CDC)も利用できます。CDC は、既存の行を処理して変更を適用し、BigQuery テーブルを更新します。

この機能の詳細については、変更データ キャプチャを使用してテーブル更新をストリーミングするをご覧ください。

BigQuery サブスクリプションでこの機能を使用する方法については、BigQuery の変更データ キャプチャをご覧ください。

テーブル スキーマを使用する

このオプションにより、Pub/Sub は BigQuery テーブルのスキーマを使用して、JSON メッセージのフィールドを対応する列に書き込むことができます。このオプションを使用する場合は、次の追加要件を確認してください。

  • BigQuery テーブルの各列の名前には、英字(a ~ z、A ~ Z)、数字(0 ~ 9)、アンダースコア(_)のみを使用できます。

  • 公開されるメッセージは JSON 形式である必要があります。

    BigQuery テーブル列のデータ型が JSON の場合、Pub/Sub メッセージ内の対応するフィールドは、エスケープされた文字列内の有効な JSON である必要があります。たとえば、myData という名前の列の場合、メッセージ フィールドは "myData": "{\"key\":\"value\"}" にする必要があります。BigQuery は、有効な JSON が含まれていないメッセージを拒否します。

  • 次の JSON 変換がサポートされています。

    JSON 型 BigQuery のデータ型
    string NUMERICBIGNUMERICDATETIMEDATETIME、または TIMESTAMP
    number NUMERICBIGNUMERICDATETIMEDATETIME、または TIMESTAMP
    • number から DATEDATETIMETIME、または TIMESTAMP への変換を使用する場合、数値はサポートされている表現に準拠している必要があります。
    • number から NUMERIC または BIGNUMERIC へのコンバージョンを使用する場合、値の適合率と範囲は、浮動小数点演算の IEEE 754 標準で許容されるものに制限されます。高い適合率やより広い範囲の値が必要な場合は、代わりに string から NUMERIC または BIGNUMERIC へのコンバージョンを使用します。
    • string から NUMERIC または BIGNUMERIC へのコンバージョンを使用する場合、Pub/Sub は文字列が人間が判読できる数値("123.124" など)であると想定します。文字列を人間が判読できる数値として処理できない場合、Pub/Sub は文字列を BigDecimalByteStringEncoder でエンコードされたバイトとして扱います。
  • サブスクリプションのトピックにスキーマが関連付けられている場合、メッセージ エンコード プロパティは JSON に設定する必要があります。

  • メッセージに存在しない BigQuery フィールドがある場合、これらの BigQuery フィールドは NULLABLE モードにする必要があります。

  • BigQuery スキーマに存在しない追加のフィールドがメッセージにあり、このようなフィールドを削除できる場合は、[不明なフィールドを削除する] オプションを選択します。

  • サブスクリプション プロパティとして [トピック スキーマを使用する] または [テーブル スキーマを使用する] のいずれか 1 つのみを選択できます。

[トピック スキーマを使用する] または [テーブル スキーマを使用する] オプションを選択しない場合は、BigQuery テーブルに、BYTESSTRING、または JSON 型の data という列があることを確認してください。Pub/Sub は、この BigQuery 列にメッセージを書き込みます。

BigQuery テーブル スキーマの変更は、BigQuery テーブルに書き込まれたメッセージにすぐに反映されない場合があります。 たとえば、[不明なフィールドを削除する] オプションが有効で、フィールドがメッセージには存在するが BigQuery スキーマにはない場合、BigQuery テーブルに書き込まれるメッセージには、BigQuery スキーマに追加した後も、フィールドが含まれていない場合があります。最終的に、スキーマが同期され、後続のメッセージにこのフィールドが含まれます。

BigQuery サブスクリプションで [テーブル スキーマを使用する] オプションを使用すると、BigQuery 変更データ キャプチャ(CDC)も利用できます。 CDC は、既存の行を処理して変更を適用し、BigQuery テーブルを更新します。

この機能の詳細については、変更データ キャプチャを使用してテーブル更新をストリーミングするをご覧ください。

BigQuery サブスクリプションでこの機能を使用する方法については、BigQuery 変更データ キャプチャをご覧ください。

不明な項目を削除する

このオプションは、[トピック スキーマを使用する] または [テーブル スキーマを使用する] オプションで使用します。このオプションを有効にすると、Pub/Sub は、トピック スキーマやメッセージには存在するが BigQuery スキーマには存在しないフィールドをドロップできます。BigQuery スキーマに含まれていないフィールドは、BigQuery テーブルにメッセージを書き込むときに削除されます。

[不明な項目を削除する] が設定されていない場合、デッドレター トピックを構成しない限り、余分な項目を含むメッセージは BigQuery に書き込まれず、サブスクリプション バックログに残ります。

[不明な項目を削除する] 設定は、Pub/Sub トピック スキーマまたは BigQuery テーブル スキーマのいずれにも定義されていないフィールドには影響しません。この場合、有効な Pub/Sub メッセージがサブスクリプションに配信されます。ただし、BigQuery にはこれらの追加フィールド用に定義された列がないため、これらのフィールドは BigQuery 書き込みプロセス中に削除されます。この動作を防ぐには、Pub/Sub メッセージに含まれるフィールドが BigQuery テーブル スキーマにも含まれていることを確認します。

追加フィールドに関する動作は、使用する特定のスキーマタイプ(Avro、Protocol Buffer)とエンコード(JSON、バイナリ)によっても異なります。これらの要因が追加フィールドの処理にどのように影響するかについては、特定のスキーマタイプとエンコードのドキュメントをご覧ください。

メタデータを書き込む

このオプションを使用すると、Pub/Sub が各メッセージのメタデータを BigQuery テーブルの追加の列に書き込むようにする場合は、このオプションを選択できます。それ以外の場合、メタデータは BigQuery テーブルに書き込まれません。

[メタデータを書き込む] オプションを選択する場合は、BigQuery テーブルに次の表に示すフィールドがあることを確認してください。

メタデータを書き込むオプションを選択しない場合、use_topic_schema が true でない限り、宛先 BigQuery テーブルでは data フィールドのみが必要になります。[メタデータを書き込む] と [トピック スキーマを使用する] の両方のオプションを選択した場合、トピックのスキーマには、次の名前にメタデータ パラメータと一致する名前のフィールドを含めないでください。この制限には、スネークケース パラメータのキャメルケース バージョンが含まれます。

パラメータ
subscription_name

STRING

サブスクリプションの名前。

message_id

STRING

メッセージの ID

publish_time

TIMESTAMP

メッセージのパブリッシュ時刻。

data

BYTES、STRING、または JSON

メッセージの本文。

data フィールドは、[トピック スキーマを使用する] または [テーブル スキーマを使用する] を選択しないすべての宛先 BigQuery テーブルに必要です。フィールドの型が JSON の場合、メッセージ本文を有効な JSON にする必要があります。

attributes

STRING または JSON

すべてのメッセージ属性を含む JSON オブジェクト。また、存在する場合、順序指定キーなど、Pub/Sub メッセージの一部である追加のフィールドも含まれています。

BigQuery サブスクリプションの作成

BigQuery 配信でサブスクリプションを作成するには、次の操作を行います。

コンソール

  1. Google Cloud コンソールで、[サブスクリプションの作成] ページに移動します。

    [サブスクリプション] に移動

  2. [サブスクリプション ID] フィールドに名前を入力します。サブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。

  3. [Cloud Pub/Sub トピックを選択してください] ボックスに、メッセージを受信するトピックを入力または選択します。

  4. [配信タイプ] で、[BigQuery への書き込み] を選択します。

  5. BigQuery テーブルを選択します。

    1. [プロジェクト] で、BigQuery テーブルを含む Google Cloud プロジェクトを選択します。

    2. [データセット] で、既存のデータセットを選択するか、[新しいデータセットの作成] をクリックして新しいデータセットを作成します。データセットの作成については、データセットの作成をご覧ください。

    3. [テーブル] フィールドにテーブルの名前を入力します。新しいテーブルを作成するには、BigQuery の [新しいテーブルを作成] ページに移動するリンクをクリックします。ページが別のタブで開きます。テーブルの作成については、テーブルの作成と使用をご覧ください。

  6. [スキーマ構成] で、次のいずれかのオプションを選択します。

    • スキーマを使用しない。Pub/Sub は、メッセージのバイトを data という名前の列に書き込みます。

    • トピック スキーマを使用する。Pub/Sub は、トピックに関連付けられているスキーマを使用します。詳細については、トピック スキーマを使用するをご覧ください。

    • テーブル スキーマを使用する。Pub/Sub は BigQuery テーブルのスキーマを使用します。詳細については、テーブル スキーマを使用するをご覧ください。

  7. 省略可。メッセージ メタデータを BigQuery テーブルに書き込むには、[メタデータを書き込む] を選択します。詳細については、メタデータを書き込むをご覧ください。

  8. 省略可。BigQuery テーブル スキーマに存在しないフィールドを削除するには、[不明なフィールドを削除する] を選択します。詳細については、不明なフィールドを削除するをご覧ください。

  9. 必要に応じて、共通のサブスクリプション プロパティを構成します。メッセージ エラーを処理するには、デッドレターを有効にすることを強くおすすめします。詳細については、デッドレター トピックをご覧ください。

  10. [作成] をクリックします。

gcloud

  1. Google Cloud コンソールで Cloud Shell をアクティブにします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. Pub/Sub サブスクリプションを作成するには、gcloud pubsub subscriptions create コマンドを使用します。

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --bigquery-table=PROJECT_ID.DATASET_ID.TABLE_ID
    

    カスタム サービス アカウントを使用する場合は、追加の引数として指定します。

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --bigquery-table=PROJECT_ID.DATASET_ID.TABLE_ID \
        --bigquery-service-account-email=SERVICE_ACCOUNT_NAME
    

    次のように置き換えます。

    • SUBSCRIPTION_ID: サブスクリプションの ID を指定します。
    • TOPIC_ID: トピックの ID を指定します。このトピックでは、スキーマが必要です。
    • PROJECT_ID: プロジェクトの ID を指定します。
    • DATASET_ID: 既存のデータセットの ID を指定します。データセットを作成するには、 データセットの作成をご覧ください。
    • TABLE_ID: 既存のテーブルの ID を指定します。トピックにスキーマがない場合、このテーブルには data フィールドが必要です。テーブルを作成するには、スキーマ定義を含む空のテーブルを作成するをご覧ください。
    • SERVICE_ACCOUNT_NAME: BigQuery への書き込みに使用するサービス アカウントの名前を指定します。

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& table_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_bigquery_config()->set_table(table_id);
  auto sub = client.CreateSubscription(request);
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API リファレンス ドキュメントをご覧ください。


using Google.Cloud.PubSub.V1;

public class CreateBigQuerySubscriptionSample
{
    public Subscription CreateBigQuerySubscription(string projectId, string topicId, string subscriptionId, string bigqueryTableId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            BigqueryConfig = new BigQueryConfig
            {
                Table = bigqueryTableId
            }
        };
        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        return subscription;
    }
}

Go

次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用しています。引き続き v1 ライブラリを使用している場合は、v2 への移行ガイドをご覧ください。v1 コードサンプルの一覧については、 非推奨のコードサンプルをご覧ください。

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// createBigQuerySubscription creates a Pub/Sub subscription that exports messages to BigQuery.
func createBigQuerySubscription(w io.Writer, projectID, topic, subscription, table string) error {
	// projectID := "my-project"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	// table := "my-project-id.dataset_id.table_id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:  subscription,
		Topic: topic,
		BigqueryConfig: &pubsubpb.BigQueryConfig{
			Table:         table,
			WriteMetadata: true,
		},
	})
	if err != nil {
		return fmt.Errorf("failed to create subscription: %w", err)
	}
	fmt.Fprintf(w, "Created BigQuery subscription: %v\n", sub)

	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.BigQueryConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateBigQuerySubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bigqueryTableId = "your-project.your-dataset.your-table";

    createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId);
  }

  public static void createBigQuerySubscription(
      String projectId, String topicId, String subscriptionId, String bigqueryTableId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      BigQueryConfig bigqueryConfig =
          BigQueryConfig.newBuilder().setTable(bigqueryTableId).setWriteMetadata(true).build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setBigqueryConfig(bigqueryConfig)
                  .build());

      System.out.println("Created a BigQuery subscription: " + subscription.getAllFields());
    }
  }
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId,
  subscriptionNameOrId,
  bigqueryTableId,
) {
  const options = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.ts

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  bigqueryTableId: string,
) {
  const options: CreateSubscriptionOptions = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

PHP

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API リファレンス ドキュメントをご覧ください。

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\BigQueryConfig;

/**
 * Creates a Pub/Sub BigQuery subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $table      The BigQuery table to which to write.
 */
function create_bigquery_subscription($projectId, $topicName, $subscriptionName, $table)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $config = new BigQueryConfig(['table' => $table]);
    $subscription->create([
        'bigqueryConfig' => $config
    ]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bigquery_table_id = "your-project.your-dataset.your-table"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(
    table=bigquery_table_id, write_metadata=True
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "bigquery_config": bigquery_config,
        }
    )

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")

Ruby

次のサンプルでは、Ruby Pub/Sub クライアント ライブラリ v3 を使用しています。引き続き v2 ライブラリを使用している場合は、 v3 への移行ガイドをご覧ください。Ruby v2 のコードサンプルの一覧については、 非推奨のコードサンプルをご覧ください。

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bigquery_table_id = "my-project:dataset-id.table-id"

pubsub = Google::Cloud::PubSub.new project_id: project_id
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  bigquery_config: {
    table: bigquery_table_id,
    write_metadata: true
  }

puts "BigQuery subscription created: #{subscription_id}."
puts "Table for subscription is: #{bigquery_table_id}"

BigQuery サブスクリプションをモニタリングする

Cloud Monitoring には、サブスクリプションをモニタリングするための指標が多数用意されています。

Pub/Sub に関連する使用可能なすべての指標のリストとその説明については、Pub/Sub のモニタリング ドキュメントをご覧ください。

Pub/Sub 内からサブスクリプションをモニタリングすることもできます。

次のステップ