创建 BigQuery 订阅

本文档介绍了如何创建 BigQuery 订阅。您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API 创建 BigQuery 订阅。

准备工作

在阅读本文档之前,请确保您熟悉以下内容:

除了熟悉 Pub/Sub 和 BigQuery 之外,在创建 BigQuery 订阅之前,请确保您满足以下前提条件:

  • BigQuery 表存在。或者,您也可以在创建 BigQuery 订阅时创建,如本文档后面的部分所述。

  • Pub/Sub 主题的架构与 BigQuery 表的架构之间的兼容性。如果您添加不兼容的 BigQuery 表,系统会显示一条与兼容性相关的错误消息。如需了解详情,请参阅架构兼容性

所需的角色和权限

如需获得创建 BigQuery 订阅所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建 BigQuery 订阅所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要具备以下权限才能创建 BigQuery 订阅:

  • 针对项目的 pubsub.subscriptions.create 权限
  • pubsub.topics.attachSubscription 关于该主题

您也可以使用自定义角色或其他预定义角色来获取这些权限。

跨项目订阅

如果您在一个项目中为另一个项目中的主题创建订阅,则必须对创建订阅的项目拥有 pubsub.subscriptions.create 权限,并对相应主题拥有 pubsub.topics.attachSubscription 权限。

向服务账号授予 IAM 角色

Pub/Sub 使用 Identity and Access Management (IAM) 服务账号来访问 Google Cloud 资源。默认情况下,它使用 Pub/Sub 服务代理 (service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com)。

如需让 Pub/Sub 能够写入 BigQuery 表,服务账号需要具有 BigQuery Data Editor (roles/bigquery.dataEditor) 角色。您可以为服务账号授予项目或表的权限,具体如下:

项目

  1. 在 Google Cloud 控制台中,前往 IAM 页面。

    转到 IAM

  2. 选择包括 Google 提供的角色授权

  3. 找到 Cloud Pub/Sub 服务账号对应的行,然后点击 修改主账号

  4. 点击添加其他角色,然后选择 BigQuery Data Editor 角色。

如需了解详情,请参阅使用控制台授予 IAM 角色

  1. 在 Google Cloud 控制台中,前往 BigQuery Studio

    进入 BigQuery Studio

  2. 在标记为按名称和标签过滤的“探索器”窗格搜索框中,输入表的名称,然后按 Enter 键。

  3. 在搜索结果中,点击要授予权限的表的名称。

  4. 详细信息标签页中,依次点击 共享 > 管理权限

  5. 点击 添加主账号,然后输入服务账号标识符,格式如下:

    service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com

  6. 分配角色列表中,选择 BigQuery Data Editor

  7. 点击保存。将向主账号授予资源上的角色。

使用自定义服务账号

通过向 Cloud Pub/Sub 服务账号授予 BigQuery Data Editor 角色,有权在您的项目中创建订阅的任何用户都可以写入 BigQuery 表。如果您想提供更精细的权限,请改为配置用户代管式服务账号

如需配置用户代管式服务账号以写入 BigQuery,您需要具备以下权限:

  • 用户代管式服务账号必须具有 BigQuery Data Editor 角色。

  • Cloud Pub/Sub 服务账号必须具有用户代管式服务账号的 iam.serviceAccounts.getAccessToken 权限。

  • 创建订阅的用户必须拥有用户代管式服务账号的 iam.serviceAccounts.actAs 权限。

创建订阅时,请将用户代管式服务账号指定为订阅服务账号

BigQuery 订阅属性

BigQuery 订阅支持所有常见订阅属性。以下部分介绍了 BigQuery 订阅特有的属性。

使用主题架构

此选项可让 Pub/Sub 使用订阅所附加的 Pub/Sub 主题的架构。此外,Pub/Sub 还会将消息中的字段写入 BigQuery 表中的相应列。

使用此选项时,请务必检查是否满足以下附加要求:

  • 主题架构和 BigQuery 架构中的字段必须具有相同的名称,并且它们的类型必须相互兼容。

  • 主题架构中的任何可选字段在 BigQuery 架构中也必须是可选字段。

  • 主题架构中的必需字段在 BigQuery 架构中不必是必需字段。

  • 如果主题架构中没有 BigQuery 字段,则这些 BigQuery 字段必须处于 NULLABLE 模式。

  • 如果主题架构包含 BigQuery 架构中没有的其他字段,并且可以舍弃这些字段,请选择舍弃未知字段选项。

  • 您只能选择一个订阅属性,即使用主题架构使用表架构

如果您未选择使用主题架构使用表架构选项,请确保 BigQuery 表有一个名为 data 的列,其类型为 BYTESSTRINGJSON。Pub/Sub 会将消息写入此 BigQuery 列。

您可能不会立即看到 Pub/Sub 主题架构或 BigQuery 表架构的更改对写入 BigQuery 表的消息生效。例如,如果启用了舍弃未知字段选项,并且某个字段存在于 Pub/Sub 架构中,但不存在于 BigQuery 架构中,那么即使将该字段添加到 BigQuery 架构中,写入 BigQuery 表的消息可能仍然不包含该字段。最终,架构会同步,后续消息会包含该字段。

如果您为 BigQuery 订阅使用使用主题架构选项,还可以利用 BigQuery 变更数据捕获 (CDC)。CDC 通过处理更改并将更改应用于现有行来更新 BigQuery 表。

如需详细了解此功能,请参阅使用变更数据捕获来流式传输表更新

如需了解如何将此功能与 BigQuery 订阅搭配使用,请参阅 BigQuery 变更数据捕获

使用表架构

此选项可让 Pub/Sub 使用 BigQuery 表的架构将 JSON 消息的字段写入相应的列。使用此选项时,请务必检查以下额外要求:

  • BigQuery 表中每个列的名称只能包含字母(a-z、A-Z)、数字 (0-9) 或下划线 (_)。

  • 发布的消息必须采用 JSON 格式。

    如果 BigQuery 表列具有 JSON 数据类型,则 Pub/Sub 消息中的相应字段必须是有效的 JSON(采用转义字符串格式)。例如,对于名为 myData 的列,消息字段必须为 "myData": "{\"key\":\"value\"}"。BigQuery 会拒绝不包含有效 JSON 的消息。

  • 支持以下 JSON 转换:

    JSON 类型 BigQuery 数据类型
    string NUMERICBIGNUMERICDATETIMEDATETIMETIMESTAMP
    number NUMERICBIGNUMERICDATETIMEDATETIMETIMESTAMP
    • 使用 number 将转化设置为 DATEDATETIMETIMETIMESTAMP 时,该数字必须符合支持的表示形式
    • 使用 numberNUMERICBIGNUMERIC 的转换时,值的精度和范围会限制为 IEEE 754 浮点运算标准所接受的精度和范围。如果您需要高精度或更广的值范围,请改用 stringNUMERICBIGNUMERIC 转换。
    • 当使用 stringNUMERICBIGNUMERIC 转换时,Pub/Sub 会假定该字符串是人类可读的数字(例如 "123.124")。如果将该字符串作为人类可读的数字进行处理失败,Pub/Sub 会将该字符串视为使用 BigDecimalByteStringEncoder 编码的字节。
  • 如果订阅的主题具有关联的架构,则消息编码属性必须设置为 JSON

  • 如果存在消息中没有的 BigQuery 字段,则这些 BigQuery 字段必须处于 NULLABLE 模式。

  • 如果消息包含 BigQuery 架构中没有的其他字段,并且可以舍弃这些字段,请选择舍弃未知字段选项。

  • 您只能选择一个订阅属性,即使用主题架构使用表架构

如果您未选择使用主题架构使用表架构选项,请确保 BigQuery 表有一个名为 data 的列,其类型为 BYTESSTRINGJSON。Pub/Sub 会将消息写入此 BigQuery 列。

您可能不会立即看到写入 BigQuery 表的消息对 BigQuery 表架构的更改生效。例如,如果启用了舍弃未知字段选项,并且消息中存在某个字段,但 BigQuery 架构中不存在该字段,那么即使将该字段添加到 BigQuery 架构中,写入 BigQuery 表的消息可能仍然不包含该字段。最终,架构会同步,后续消息会包含该字段。

为 BigQuery 订阅选择使用表架构选项后,您还可以利用 BigQuery 变更数据捕获 (CDC) 功能。CDC 通过处理对现有行的更改并将其应用于现有行来更新 BigQuery 表。

如需详细了解此功能,请参阅使用变更数据捕获来流式传输表更新

如需了解如何将此功能与 BigQuery 订阅搭配使用,请参阅 BigQuery 变更数据捕获

删除未知字段

此选项与使用主题架构使用表架构选项搭配使用。启用此选项后,Pub/Sub 可以舍弃主题架构或消息中存在但 BigQuery 架构中不存在的任何字段。将消息写入 BigQuery 表时,不属于 BigQuery 架构的字段会被舍弃。

如果不设置舍弃未知字段,包含额外字段的消息不会写入 BigQuery,并且会保留在订阅积压消息中,除非您配置死信主题

舍弃未知字段设置不会影响 Pub/Sub 主题架构或 BigQuery 表架构中未定义的字段。在这种情况下,有效的 Pub/Sub 消息会传递给订阅。不过,由于 BigQuery 没有为这些额外字段定义列,因此在 BigQuery 写入过程中,这些字段会被舍弃。为防止出现此行为,请确保 Pub/Sub 消息中包含的所有字段也包含在 BigQuery 表架构中。

有关额外字段的行为也可能取决于所用的特定架构类型(Avro、协议缓冲区)和编码(JSON、二进制)。如需了解这些因素如何影响额外字段的处理,请参阅适用于您的特定架构类型和编码的文档。

写入元数据

此选项可让 Pub/Sub 将每条消息的元数据写入 BigQuery 表中的其他列。否则,元数据不会写入 BigQuery 表。

如果您选择写入元数据选项,请确保 BigQuery 表具有下表中所述的字段。

如果您未选择写入元数据选项,则目标 BigQuery 表仅需要 data 字段,除非 use_topic_schema 为 true。如果您同时选择写入元数据使用主题架构选项,则主题的架构不得包含任何名称与元数据参数名称相同的字段。此限制包括这些 snake case 参数的 camelcase 版本。

参数
subscription_name

STRING

订阅的名称。

message_id

STRING

消息的 ID

publish_time

时间戳

发布消息的时间。

data

BYTES、STRING 或 JSON

信息正文。

对于未选择使用主题架构使用表架构的所有目标 BigQuery 表,data 字段都是必需的。如果该字段的类型为 JSON,则信息正文必须是有效的 JSON。

attributes

STRING 或 JSON

包含所有消息属性的 JSON 对象。它还包含 Pub/Sub 消息的其他字段,包括排序键(如果存在)。

服务账号

您可以通过以下方式将消息写入 BigQuery 表:

  • 配置自定义服务账号,以便只有对该服务账号拥有 iam.serviceAccounts.actAs 权限的用户才能创建写入相应表的订阅。包含 iam.serviceAccounts.actAs 权限的角色示例是 Service Account User (roles/iam.serviceAccountUser) 角色。

  • 使用默认的 Pub/Sub 服务代理,该代理允许项目中任何有权创建订阅的用户创建可向表写入数据的订阅。如果您未指定自定义服务账号,则 Pub/Sub 服务代理是默认设置。

创建 BigQuery 订阅

如需创建采用 BigQuery 传送的订阅,请执行以下步骤。

控制台

  1. 在 Google Cloud 控制台中,前往创建订阅页面。

    前往订阅页面

  2. 订阅 ID 字段中,输入一个名称。如需了解如何命名订阅,请参阅主题或订阅命名指南

  3. 选择 Cloud Pub/Sub 主题框中,输入或选择要接收消息的主题。

  4. 传送类型部分,选择写入 BigQuery

  5. 选择 BigQuery 表:

    1. 项目字段中,选择包含 BigQuery 表的 Google Cloud 项目。

    2. 对于数据集,选择现有数据集,或点击创建新数据集以创建新数据集。如需了解如何创建数据集,请参阅创建数据集

    3. 字段中,输入表的名称。如需创建新表,请点击相应链接,前往 BigQuery 的创建新表页面。系统会在另一个标签页中打开该页面。如需了解如何创建表,请参阅创建和使用表

  6. 架构配置部分中,选择以下选项之一:

    • 不使用架构。Pub/Sub 会将消息字节写入名为 data 的列。

    • 使用主题架构。Pub/Sub 使用与主题关联的架构。如需了解详情,请参阅使用主题架构

    • 使用表架构。Pub/Sub 使用 BigQuery 表的架构。如需了解详情,请参阅使用表架构

  7. 可选。如需将消息元数据写入 BigQuery 表,请选择写入元数据。如需了解详情,请参阅写入元数据

  8. 可选。如需舍弃 BigQuery 表架构中不存在的字段,请选择舍弃未知字段。如需了解详情,请参阅舍弃未知字段

  9. 根据需要配置通用订阅属性。我们强烈建议您启用 Dead lettering 以处理消息失败。如需了解详情,请参阅死信主题

  10. 点击创建

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  2. 如需创建 Pub/Sub 订阅,请使用 gcloud pubsub subscriptions create 命令:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --bigquery-table=PROJECT_ID.DATASET_ID.TABLE_ID
    

    如果您想使用自定义服务账号,请将其作为额外的实参提供:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --bigquery-table=PROJECT_ID.DATASET_ID.TABLE_ID \
        --bigquery-service-account-email=SERVICE_ACCOUNT_NAME
    

    替换以下内容:

    • SUBSCRIPTION_ID:指定订阅的 ID。
    • TOPIC_ID:指定主题的 ID。主题需要架构。
    • PROJECT_ID:指定项目的 ID。
    • DATASET_ID:指定现有数据集的 ID。如需创建数据集,请参阅 创建数据集
    • TABLE_ID:指定现有表的 ID。如果您的主题没有架构,则该表需要 data 字段。如需创建表,请参阅创建具有架构定义的空表
    • SERVICE_ACCOUNT_NAME:指定用于写入 BigQuery 的服务账号的名称。

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& table_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_bigquery_config()->set_table(table_id);
  auto sub = client.CreateSubscription(request);
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;

public class CreateBigQuerySubscriptionSample
{
    public Subscription CreateBigQuerySubscription(string projectId, string topicId, string subscriptionId, string bigqueryTableId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            BigqueryConfig = new BigQueryConfig
            {
                Table = bigqueryTableId
            }
        };
        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        return subscription;
    }
}

Go

以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// createBigQuerySubscription creates a Pub/Sub subscription that exports messages to BigQuery.
func createBigQuerySubscription(w io.Writer, projectID, topic, subscription, table string) error {
	// projectID := "my-project"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	// table := "my-project-id.dataset_id.table_id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:  subscription,
		Topic: topic,
		BigqueryConfig: &pubsubpb.BigQueryConfig{
			Table:         table,
			WriteMetadata: true,
		},
	})
	if err != nil {
		return fmt.Errorf("failed to create subscription: %w", err)
	}
	fmt.Fprintf(w, "Created BigQuery subscription: %v\n", sub)

	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.BigQueryConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateBigQuerySubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bigqueryTableId = "your-project.your-dataset.your-table";

    createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId);
  }

  public static void createBigQuerySubscription(
      String projectId, String topicId, String subscriptionId, String bigqueryTableId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      BigQueryConfig bigqueryConfig =
          BigQueryConfig.newBuilder().setTable(bigqueryTableId).setWriteMetadata(true).build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setBigqueryConfig(bigqueryConfig)
                  .build());

      System.out.println("Created a BigQuery subscription: " + subscription.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId,
  subscriptionNameOrId,
  bigqueryTableId,
) {
  const options = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.ts

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  bigqueryTableId: string,
) {
  const options: CreateSubscriptionOptions = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\BigQueryConfig;

/**
 * Creates a Pub/Sub BigQuery subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $table      The BigQuery table to which to write.
 */
function create_bigquery_subscription($projectId, $topicName, $subscriptionName, $table)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $config = new BigQueryConfig(['table' => $table]);
    $subscription->create([
        'bigqueryConfig' => $config
    ]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bigquery_table_id = "your-project.your-dataset.your-table"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(
    table=bigquery_table_id, write_metadata=True
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "bigquery_config": bigquery_config,
        }
    )

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")

Ruby

以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。如需查看 Ruby v2 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bigquery_table_id = "my-project:dataset-id.table-id"

pubsub = Google::Cloud::PubSub.new project_id: project_id
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  bigquery_config: {
    table: bigquery_table_id,
    write_metadata: true
  }

puts "BigQuery subscription created: #{subscription_id}."
puts "Table for subscription is: #{bigquery_table_id}"

监控 BigQuery 订阅

Cloud Monitoring 提供了许多指标来监控订阅

如需查看与 Pub/Sub 相关的所有可用指标及其说明的列表,请参阅 Pub/Sub 监控文档

您还可以在 Pub/Sub 中监控订阅。

后续步骤