创建 Cloud Storage 订阅

本文档介绍了如何创建 Cloud Storage 订阅。您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API 创建 Cloud Storage 订阅。

准备工作

在阅读本文档之前,请确保您熟悉以下内容:

所需的角色和权限

如需获得创建 Cloud Storage 订阅所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建 Cloud Storage 存储空间订阅所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建 Cloud Storage 订阅,您需要具备以下权限:

  • 针对项目的 pubsub.subscriptions.create 权限
  • pubsub.topics.attachSubscription 关于该主题

您也可以使用自定义角色或其他预定义角色来获取这些权限。

跨项目订阅

如果您在一个项目中为另一个项目中的主题创建订阅,则必须对创建订阅的项目拥有 pubsub.subscriptions.create 权限,并对相应主题拥有 pubsub.topics.attachSubscription 权限。

向服务账号授予 IAM 角色

Pub/Sub 使用 Identity and Access Management (IAM) 服务账号来访问 Google Cloud 资源。默认情况下,它使用 Pub/Sub 服务代理 (service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com)。

如需允许 Pub/Sub 写入 Cloud Storage,服务账号需要具有以下角色:

  • Storage Object Creator (roles/storage.objectCreator)
  • Storage Legacy Bucket Reader (roles/storage.legacyBucketReader)

您可以为服务账号授予项目级权限或 Cloud Storage 存储桶级权限,具体操作如下:

项目

  1. 在 Google Cloud 控制台中,前往存储桶页面。

    进入“存储桶”

  2. 选择包括 Google 提供的角色授权

  3. 找到 Cloud Pub/Sub 服务账号对应的行,然后点击 修改主账号

  4. 点击添加其他角色,然后选择 Storage Object Creator 角色。 针对 Storage Legacy Bucket Reader 角色重复执行此步骤。

如需了解详情,请参阅使用控制台授予 IAM 角色

Cloud Storage 存储桶

  1. 在 Google Cloud 控制台中,前往存储桶

    进入“存储桶”

  2. 点击要向其授予权限的 Cloud Storage 存储桶的名称。

  3. 存储桶详情页面中,点击配置标签页。

  4. 权限窗格中,点击按主账号查看标签页。

  5. 点击授予访问权限

  6. 新的主账号字段中,输入服务账号标识符,格式如下:

    service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com

  7. 分配角色列表中,选择 Storage Object Creator

  8. 点击 添加其他角色,然后选择 Storage Legacy Bucket Reader

  9. 点击保存。系统会向主账号授予资源上的角色。

使用自定义服务账号

通过向 Cloud Pub/Sub 服务账号授予 Storage Object CreatorStorage Legacy Bucket Reader 角色,任何有权在您的项目中创建订阅的用户都可以向 Cloud Storage 存储桶写入数据。如果您想提供更精细的权限,请改为配置用户代管式服务账号

如需将用户代管式服务账号配置为写入 Cloud Storage,您需要具有以下权限:

  • 用户代管式服务账号必须具有 Storage Object CreatorStorage Legacy Bucket Reader 角色。

  • Cloud Pub/Sub 服务账号必须具有用户代管式服务账号的 iam.serviceAccounts.getAccessToken 权限。

  • 创建订阅的用户必须拥有用户代管式服务账号的 iam.serviceAccounts.actAs 权限。

创建订阅时,请将用户代管式服务账号指定为订阅服务账号

Cloud Storage 订阅属性

Cloud Storage 订阅支持所有常见订阅属性。以下部分介绍了 Cloud Storage 订阅特有的属性。

存储桶名称

在创建 Cloud Storage 订阅之前,必须先创建 Cloud Storage 存储桶。

这些消息会以批次形式发送,并存储在 Cloud Storage 存储桶中。单个批次或文件以对象的形式存储在存储桶中。

Cloud Storage 存储桶必须停用请求者付款

如需创建 Cloud Storage 存储桶,请参阅创建存储分区

文件名前缀、后缀和日期时间

由 Cloud Storage 订阅生成的输出 Cloud Storage 文件会作为对象存储在 Cloud Storage 存储桶中。存储在 Cloud Storage 存储桶中的对象的名称采用以下格式:<file-prefix><UTC-date-time>_<uuid><file-suffix>

以下列表详细介绍了文件格式以及您可以自定义的字段:

  • <file-prefix> 是自定义文件名前缀。这是一个可选字段。

  • <UTC-date-time> 是根据对象创建时间自动生成的自定义字符串。

  • <uuid> 是为对象自动生成的随机字符串。

  • <file-suffix> 是自定义文件名后缀。这是一个可选字段。文件名后缀不能以“/”结尾。

  • 您可以更改文件名前缀和后缀:

    • 例如,如果文件名前缀的值为 prod_,文件后缀的值为 _archive,则示例对象名称为 prod_2023-09-25T04:10:00+00:00_uN1QuE_archive

    • 如果您未指定文件名前缀和后缀,则存储在 Cloud Storage 存储桶中的对象名称的格式为:<UTC-date-time>_<uuid>

    • Cloud Storage 对象命名要求也适用于文件名前缀和后缀。如需了解详情,请参阅关于 Cloud Storage 对象

  • 您可以更改文件名中日期和时间的显示方式:

    • 只能使用一次的必需日期时间匹配器:年(YYYYYY)、月(MM)、日(DD)、时(hh)、分(mm)、秒(ss)。例如,YY-YYYYMMM 无效。

    • 只能使用一次的可选匹配器:日期时间分隔符 (T) 和时区偏移量(Z+00:00)。

    • 可多次使用的可选元素:连字符 (-)、下划线 (_)、英文冒号 (:) 和正斜杠 (/)。

    • 例如,如果文件名日期时间格式的值为 YYYY-MM-DD/hh_mm_ssZ,则示例对象名称为 prod_2023-09-25/04_10_00Z_uNiQuE_archive

    • 如果文件名日期时间格式以非匹配字符结尾,该字符将替换 <UTC-date-time><uuid> 之间的分隔符。例如,如果文件名日期时间格式的值为 YYYY-MM-DDThh_mm_ss-,则示例对象名称为 prod_2023-09-25T04_10_00-uNiQuE_archive

文件批量处理

借助 Cloud Storage 订阅,您可以决定何时创建新输出文件,并将其作为对象存储在 Cloud Storage 存储桶中。当满足指定批处理条件之一时,Pub/Sub 会写入输出文件。以下是 Cloud Storage 批处理条件:

  • 批量存储时长上限。此设置是必需的。 如果超过指定的最大时长值,Pub/Sub 会写入新的输出文件。此时长是指从 Pub/Sub 开始写入新文件到文件最终确定之间的时间。例如,如果您将最长时长设置为 5 分钟,则 Pub/Sub 会在开始向文件写入数据后最多 5 分钟内完成该文件的写入。系统可能会在时长达到上限前创建新文件。如果您未指定值,系统会应用 5 分钟的默认值。以下是最大时长的适用值:

    • 最小值 = 1 分钟
    • 默认值 = 5 分钟
    • 最大值 = 10 分钟
  • 批量存储字节数上限。这是可选设置。如果超过指定的最大字节数值,Cloud Storage 订阅会写入新的输出文件。以下是最大字节数的适用值:

    • 最小值 = 1 KB
    • 最大值 = 10 GiB
  • 批量存储消息数上限。这是可选设置。如果超过指定的最大消息数,Cloud Storage 订阅会写入新的输出文件。以下是“最多消息数”的适用值:

    • 最小值 = 1000

例如,您可以将最长时长配置为 6 分钟,并将最大字节数配置为 2 GB。如果在第 4 分钟时,输出文件的大小达到 2 GB,Pub/Sub 会完成上一个文件的写入,并开始写入新文件。

Cloud Storage 存储空间订阅可能会同时写入 Cloud Storage 存储桶中的多个文件。如果您已将订阅配置为每 6 分钟创建一个新文件,您可能会发现每 6 分钟会创建多个 Cloud Storage 文件。

在某些情况下,Pub/Sub 可能会比文件批处理条件配置的时间更早开始写入新文件。如果订阅接收的消息大于“最大字节数”值,文件也可能会超出该值。

文件格式

创建 Cloud Storage 订阅时,您可以将要存储在 Cloud Storage 存储桶中的输出文件格式指定为文本Avro

  • 文本:消息以纯文本形式存储。换行符用于分隔消息与文件中的上一条消息。系统只会存储消息载荷,而不会存储属性或其他元数据。

  • Avro:消息以 Apache Avro 二进制格式存储。选择 Avro 时,您可以启用以下其他属性:

    • 写入元数据:此选项可让您将消息元数据与消息一起存储。subscription_namemessage_idpublish_timeattributes 字段等元数据会写入输出 Avro 对象中的顶级字段,而除数据之外的所有其他消息属性(例如 ordering_key,如果存在)都会作为条目添加到 attributes 映射中。

      如果停用了写入元数据,则只会将消息载荷写入输出 Avro 对象。以下是禁用了写入元数据的输出消息的 Avro 架构:

      {
        "type": "record",
        "namespace": "com.google.pubsub",
        "name": "PubsubMessage",
        "fields": [
          { "name": "data", "type": "bytes" }
        ]
      }
      

      以下是启用了写入元数据的输出消息的 Avro 架构:

      {
        "type": "record",
        "namespace": "com.google.pubsub",
        "name": "PubsubMessageWithMetadata",
        "fields": [
          { "name": "subscription_name", "type": "string" },
          { "name": "message_id", "type": "string"  },
          { "name": "publish_time", "type": {
              "type": "long",
              "logicalType": "timestamp-micros"
            }
          },
          { "name": "attributes", "type": { "type": "map", "values": "string" } },
          { "name": "data", "type": "bytes" }
        ]
      }
      
    • 使用主题架构:此选项可让 Pub/Sub 在写入 Avro 文件时使用订阅所附加的 Pub/Sub 主题的架构

      使用此选项时,请务必检查是否满足以下附加要求:

      • 主题架构必须采用 Apache Avro 格式

      • 如果同时启用了使用主题架构写入元数据,则主题架构的根必须具有 Record 对象。Pub/Sub 将展开记录的字段列表,以包含元数据字段。因此,记录不能包含任何与元数据字段(subscription_namemessage_idpublish_timeattributes)同名的字段。

服务账号

您可以通过以下方式将消息写入 Cloud Storage 存储桶:

  • 配置自定义服务账号,以便只有对该服务账号拥有 iam.serviceAccounts.actAs 权限的用户才能创建写入相应存储桶的订阅。包含 iam.serviceAccounts.actAs 权限的角色示例是 Service Account User (roles/iam.serviceAccountUser) 角色。

  • 使用默认的 Pub/Sub 服务代理,以便任何有权在项目中创建订阅的用户都能创建写入存储桶的订阅。如果您未指定自定义服务账号,则 Pub/Sub 服务代理是默认设置。

创建 Cloud Storage 订阅

控制台

  1. 在 Google Cloud 控制台中,前往订阅页面。

    前往“订阅”

  2. 点击创建订阅

  3. 订阅 ID 字段中,输入一个名称。

    如需了解如何命名订阅,请参阅主题或订阅命名指南

  4. 从下拉菜单中选择或创建一个主题。

    订阅将接收来自该主题的消息。

    如需了解如何创建主题,请参阅创建和管理主题

  5. 选择传送类型,然后选择写入 Cloud Storage

  6. 对于 Cloud Storage 存储桶,请点击浏览

    • 您可以从任何合适的项目中选择现有存储桶。

    • 您也可以点击创建图标,然后按照屏幕上的说明创建新存储桶。

      创建存储桶后,选择该存储桶以用于 Cloud Storage 存储空间订阅。

      如需详细了解如何创建存储桶,请参阅创建存储桶

    指定存储桶后,Pub/Sub 会检查 Pub/Sub 服务代理是否对该存储桶拥有适当的权限。如果存在权限问题,您会看到类似以下内容的消息:Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions

  7. 如果您遇到权限问题,请点击设置权限,然后按照屏幕上的说明操作。

    或者,按照向 Pub/Sub 服务代理分配 Cloud Storage 角色中的说明操作。

  8. 对于文件格式,请选择文本Avro

    如果您选择 Avro,还可以选择指定是否要在输出中存储消息元数据。

    如需详细了解这两个选项(包括 Avro 格式的消息元数据选项),请参阅文件格式

  9. 可选:您可以为要写入 Cloud Storage 存储桶的所有文件指定文件名前缀、后缀和日期时间。文件以对象形式存储在存储桶中。

    如需详细了解如何设置文件前缀、后缀和日期时间,请参阅文件名前缀、后缀和日期时间

  10. 对于文件批处理,请指定创建新文件之前经过的最长时间。

    您还可以选择性地设置文件的最大文件大小或最大消息数量。

    如需详细了解这两种文件批处理选项,请参阅文件批处理

  11. 我们强烈建议您启用 Dead lettering 来处理消息失败。

    如需了解详情,请参阅死信主题

  12. 您可以将其他设置保留为默认值,然后点击创建

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  2. 如需创建 Cloud Storage 订阅,请运行 gcloud pubsub subscriptions create 命令
    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --cloud-storage-bucket=BUCKET_NAME \
        --cloud-storage-file-prefix=CLOUD_STORAGE_FILE_PREFIX \
        --cloud-storage-file-suffix=CLOUD_STORAGE_FILE_SUFFIX \
        --cloud-storage-file-datetime-format=CLOUD_STORAGE_FILE_DATETIME_FORMAT \
        --cloud-storage-max-duration=CLOUD_STORAGE_MAX_DURATION \
        --cloud-storage-max-bytes=CLOUD_STORAGE_MAX_BYTES \
        --cloud-storage-max-messages=CLOUD_STORAGE_MAX_MESSAGES \
        --cloud-storage-output-format=CLOUD_STORAGE_OUTPUT_FORMAT \
        --cloud-storage-write-metadata
        --cloud-storage-use-topic-schema

    如果您想使用自定义服务账号,请将其作为额外的实参提供:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --cloud-storage-bucket=BUCKET_NAME \
        --cloud-storage-file-prefix=CLOUD_STORAGE_FILE_PREFIX \
        --cloud-storage-file-suffix=CLOUD_STORAGE_FILE_SUFFIX \
        --cloud-storage-file-datetime-format=CLOUD_STORAGE_FILE_DATETIME_FORMAT \
        --cloud-storage-max-duration=CLOUD_STORAGE_MAX_DURATION \
        --cloud-storage-max-bytes=CLOUD_STORAGE_MAX_BYTES \
        --cloud-storage-max-messages=CLOUD_STORAGE_MAX_MESSAGES \
        --cloud-storage-output-format=CLOUD_STORAGE_OUTPUT_FORMAT \
        --cloud-storage-write-metadata
        --cloud-storage-use-topic-schema
        --cloud-storage-service-account-email=SERVICE_ACCOUNT_NAME
        

    在该命令中,仅需使用 SUBSCRIPTION_ID--topic 标志和 --cloud-storage-bucket 标志。其余标志是可选的,可以省略。

    替换以下内容:

    • SUBSCRIPTION_ID:新 Cloud Storage 订阅的名称或 ID。
    • TOPIC_ID:主题的名称或 ID。
    • BUCKET_NAME:指定现有存储桶的名称。例如 prod_bucket。存储桶名称不得包含项目 ID。如需创建存储桶,请参阅创建存储桶
    • CLOUD_STORAGE_FILE_PREFIX:指定 Cloud Storage 文件名的前缀。例如 log_events_
    • CLOUD_STORAGE_FILE_SUFFIX:指定 Cloud Storage 文件名的后缀。例如 .txt
    • CLOUD_STORAGE_FILE_DATETIME_FORMAT: 指定 Cloud Storage 文件名的日期时间格式。 例如 YYYY-MM-DD/hh_mm_ssZ
    • CLOUD_STORAGE_MAX_DURATION:在创建新的 Cloud Storage 文件之前可以经过的最长时间。该值必须介于 1 米到 10 米之间。例如 5m
    • CLOUD_STORAGE_MAX_BYTES:在创建新文件之前可以写入 Cloud Storage 文件的最大字节数。值必须介于 1KB 到 10GB 之间。例如 20MB
    • CLOUD_STORAGE_MAX_MESSAGES:在创建新文件之前,可写入 Cloud Storage 文件的消息数量上限。该值必须大于或等于 1000。例如 100000
    • CLOUD_STORAGE_OUTPUT_FORMAT:写入 Cloud Storage 的数据的输出格式。值如下所示:
      • text:消息以原始文本形式撰写,并以换行符分隔。
      • avro:消息以 Avro 二进制格式写入。--cloud-storage-write-metadata--cloud-storage-use-topic-schema 仅影响输出格式为 avro 的订阅。
    • SERVICE_ACCOUNT_NAME:指定用于写入 Cloud Storage 的服务账号的名称。

C++

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 C++ 设置说明执行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& bucket) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_cloud_storage_config()->set_bucket(bucket);
  auto sub = client.CreateSubscription(request);
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 C# 设置说明执行操作。如需了解详情,请参阅 Pub/Sub C# API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证


using Google.Cloud.PubSub.V1;
using Google.Protobuf.WellKnownTypes;
using System;

public class CreateCloudStorageSubscriptionSample
{
    public Subscription CreateCloudStorageSubscription(string projectId, string topicId, string subscriptionId,
        string bucket, string filenamePrefix, string filenameSuffix, TimeSpan maxDuration)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            CloudStorageConfig = new CloudStorageConfig
            {
                Bucket = bucket,
                FilenamePrefix = filenamePrefix,
                FilenameSuffix = filenameSuffix,
                MaxDuration = Duration.FromTimeSpan(maxDuration)
            }
        };
        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        return subscription;
    }
}

Go

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Go 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/durationpb"
)

// createCloudStorageSubscription creates a Pub/Sub subscription that exports messages to Cloud Storage.
func createCloudStorageSubscription(w io.Writer, projectID, topic, subscription, bucket string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	// bucket := "my-bucket" // bucket must not have the gs:// prefix
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:  subscription,
		Topic: topic,
		CloudStorageConfig: &pubsubpb.CloudStorageConfig{
			Bucket:         bucket,
			FilenamePrefix: "log_events_",
			FilenameSuffix: ".avro",
			OutputFormat: &pubsubpb.CloudStorageConfig_AvroConfig_{
				AvroConfig: &pubsubpb.CloudStorageConfig_AvroConfig{
					WriteMetadata: true,
				},
			},
			MaxDuration: durationpb.New(1 * time.Minute),
			MaxBytes:    1e8,
		},
	})
	if err != nil {
		return fmt.Errorf("failed to create cloud storage sub: %w", err)
	}
	fmt.Fprintf(w, "Created Cloud Storage subscription: %v\n", sub)

	return nil
}

Java

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Java 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.Duration;
import com.google.pubsub.v1.CloudStorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateCloudStorageSubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bucket = "your-bucket";
    String filenamePrefix = "log_events_";
    String filenameSuffix = ".text";
    Duration maxDuration = Duration.newBuilder().setSeconds(300).build();

    createCloudStorageSubscription(
        projectId, topicId, subscriptionId, bucket, filenamePrefix, filenameSuffix, maxDuration);
  }

  public static void createCloudStorageSubscription(
      String projectId,
      String topicId,
      String subscriptionId,
      String bucket,
      String filenamePrefix,
      String filenameSuffix,
      Duration maxDuration)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      CloudStorageConfig cloudStorageConfig =
          CloudStorageConfig.newBuilder()
              .setBucket(bucket)
              .setFilenamePrefix(filenamePrefix)
              .setFilenameSuffix(filenameSuffix)
              .setMaxDuration(maxDuration)
              .build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setCloudStorageConfig(cloudStorageConfig)
                  .build());

      System.out.println("Created a CloudStorage subscription: " + subscription.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicName = 'YOUR_TOPIC_NAME';
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const bucket = 'YOUR_BUCKET_ID';
// const filenamePrefix = 'YOUR_FILENAME_PREFIX';
// const filenameSuffix = 'YOUR_FILENAME_SUFFIX';
// const maxDuration = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createCloudStorageSubscription(
  topicName,
  subscriptionName,
  bucket,
  filenamePrefix,
  filenameSuffix,
  maxDuration,
) {
  const options = {
    cloudStorageConfig: {
      bucket,
      filenamePrefix,
      filenameSuffix,
      maxDuration: {
        seconds: maxDuration,
      },
    },
  };

  await pubSubClient
    .topic(topicName)
    .createSubscription(subscriptionName, options);

  console.log(
    `Created subscription ${subscriptionName} with a cloud storage configuration.`,
  );
}

Node.js

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicName = 'YOUR_TOPIC_NAME';
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const bucket = 'YOUR_BUCKET_ID';
// const filenamePrefix = 'YOUR_FILENAME_PREFIX';
// const filenameSuffix = 'YOUR_FILENAME_SUFFIX';
// const maxDuration = 60;

// Imports the Google Cloud client library
import {CreateSubscriptionOptions, PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createCloudStorageSubscription(
  topicName: string,
  subscriptionName: string,
  bucket: string,
  filenamePrefix: string,
  filenameSuffix: string,
  maxDuration: number,
) {
  const options: CreateSubscriptionOptions = {
    cloudStorageConfig: {
      bucket,
      filenamePrefix,
      filenameSuffix,
      maxDuration: {
        seconds: maxDuration,
      },
    },
  };

  await pubSubClient
    .topic(topicName)
    .createSubscription(subscriptionName, options);

  console.log(
    `Created subscription ${subscriptionName} with a cloud storage configuration.`,
  );
}

PHP

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 PHP 设置说明执行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub GCS subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $bucket The Cloud Storage bucket name without any prefix like "gs://".
 */
function create_cloud_storage_subscription($projectId, $topicName, $subscriptionName, $bucket)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $config = ['bucket' => $bucket];
    $subscription->create([
        'cloudStorageConfig' => $config
    ]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
}

Python

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Python 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

from google.cloud import pubsub_v1
from google.protobuf import duration_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bucket = "my-bucket"

filename_prefix = "log_events_"
filename_suffix = ".avro"
# Either CloudStorageConfig.AvroConfig or CloudStorageConfig.TextConfig
# defaults to TextConfig
avro_config = pubsub_v1.types.CloudStorageConfig.AvroConfig(write_metadata=True)

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
max_duration = duration_pb2.Duration()
max_duration.FromSeconds(300)

cloudstorage_config = pubsub_v1.types.CloudStorageConfig(
    bucket=bucket,
    filename_prefix=filename_prefix,
    filename_suffix=filename_suffix,
    avro_config=avro_config,
    # Min 1 minutes, max 10 minutes
    max_duration=max_duration,
    # Min 1 KB, max 10 GiB
    max_bytes=10000000,
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "cloud_storage_config": cloudstorage_config,
        }
    )

print(f"CloudStorage subscription created: {subscription}.")
print(f"Bucket for subscription is: {bucket}")
print(f"Prefix is: {filename_prefix}")
print(f"Suffix is: {filename_suffix}")

监控 Cloud Storage 订阅

Cloud Monitoring 提供了许多指标来监控订阅

如需查看与 Pub/Sub 相关的所有可用指标及其说明的列表,请参阅 Pub/Sub 监控文档

您还可以在 Pub/Sub 中监控订阅。

后续步骤