更改主题类型

您可以将导入主题转换为标准主题,也可以将标准主题转换为导入主题。

将导入的主题转换为标准主题

如需将导入主题转换为标准主题,请清除提取设置。执行以下步骤:

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    打开“主题”

  2. 点击相应导入主题。

  3. 在主题详情页面中,点击修改

  4. 清除启用提取选项。

  5. 点击更新

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  2. 运行 gcloud pubsub topics update 命令:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    TOPIC_ID 替换为主题 ID。

将标准主题转换为 Amazon Kinesis Data Streams 导入主题

如需将标准主题转换为 Amazon Kinesis Data Streams 导入主题,请先检查您是否满足所有前提条件

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击要转换为导入主题的主题。

  3. 在主题详情页面中,点击修改

  4. 选择启用提取选项。

  5. 对于提取来源,请选择 Amazon Kinesis Data Streams

  6. 输入以下详细信息:

    • Kinesis 数据流 ARN:您计划注入到 Pub/Sub 中的 Kinesis Data Stream 的 ARN。ARN 格式如下: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis 使用方 ARN:已注册到 AWS Kinesis Data Stream 的使用方资源的 ARN。ARN 格式如下: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

    • 服务账号:您创建的服务账号。

  7. 点击更新

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  2. 运行 gcloud pubsub topics update 命令,并使用以下示例中提及的所有标志:

    gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    替换以下内容:

    • TOPIC_ID 是主题 ID 或名称。此字段无法更新。

    • KINESIS_STREAM_ARN 是您计划注入到 Pub/Sub 中的 Kinesis Data Streams 的 ARN。ARN 格式如下: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN 是已向 AWS Kinesis Data Streams 注册的使用方资源的 ARN。ARN 格式如下: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN 是 AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT 是您创建的服务账号。

Go

以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

如需详细了解 ARN,请参阅 Amazon 资源名称 (ARN)IAM 标识符

将标准主题转换为 Cloud Storage 导入主题

如需将标准主题转换为 Cloud Storage 导入主题,请先检查您是否满足所有前提条件

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击要转换为 Cloud Storage 导入主题的主题。

  3. 在主题详情页面中,点击修改

  4. 选择启用提取选项。

  5. 对于提取源,请选择 Google Cloud Storage

  6. 对于 Cloud Storage 存储桶,请点击浏览

    系统会打开选择存储桶页面。从下列选项中选择一项:

    • 从任何合适的项目中选择现有存储桶。

    • 点击创建图标,然后按照屏幕上的说明创建新存储桶。创建存储桶后,选择该存储桶作为 Cloud Storage 导入主题。

  7. 指定存储桶后,Pub/Sub 会检查 Pub/Sub 服务账号是否对该存储桶拥有适当的权限。如果存在权限问题,您会看到与权限相关的错误消息。

    如果您遇到权限问题,请点击设置权限。如需了解详情,请参阅 向 Pub/Sub 服务账号授予 Cloud Storage 权限

  8. 对于对象格式,请选择文本AvroPub/Sub Avro

    如果您选择文本,可以选择性地指定用于将对象拆分为消息的定界符

    如需详细了解这些选项,请参阅输入格式

  9. 可选。您可以为主题指定创建对象的最短时间。如果设置,则仅提取在最短对象创建时间之后创建的对象。

    如需了解详情,请参阅 创建对象的最低时间

  10. 您必须指定 Glob 模式。如需注入存储桶中的所有对象,请使用 ** 作为 glob 模式。仅注入与给定模式相匹配的对象。

    如需了解详情,请参阅 匹配 glob 模式

  11. 保留其他默认设置。
  12. 点击更新主题

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  2. 为避免丢失导入主题的设置,请务必在每次更新主题时包含所有设置。如果您遗漏了某些内容,Pub/Sub 会将相应设置重置为原始默认值。

    运行 gcloud pubsub topics update 命令,并使用以下示例中提及的所有标志:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    替换以下内容:

    • TOPIC_ID 是主题 ID 或名称。此字段无法更新。

    • BUCKET_NAME:指定现有存储桶的名称。 例如 prod_bucket。 存储桶名称不得包含项目 ID。 如需创建存储桶,请参阅创建存储桶

    • INPUT_FORMAT:指定所提取对象的格式。 可以是 textavropubsub_avro。 如需详细了解这些选项,请参阅输入格式

    • TEXT_DELIMITER:指定用于将文本对象拆分为 Pub/Sub 消息的分隔符。此值必须为单个字符,且仅当 INPUT_FORMATtext 时才必须设置。 默认值为换行符 (\n)。

      使用 gcloud CLI 指定分隔符时,请密切注意换行符 \n 等特殊字符的处理方式。使用 '\n' 格式可确保正确解读定界符。如果直接使用 \n(不带引号或转义),则分隔符为 "n"

    • MINIMUM_OBJECT_CREATE_TIME:指定对象创建的最早时间,只有在此时间之后创建的对象才能被提取。 此值应采用世界协调时间 (UTC),格式为 YYYY-MM-DDThh:mm:ssZ。 例如 2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z(含)之间的任何日期(过去或未来)均有效。

    • MATCH_GLOB:指定要匹配的 glob 模式,以便提取对象。使用 gcloud CLI 时,如果匹配 glob 包含 * 字符,则必须将 * 字符格式设置为转义形式 \*\*.txt,或者整个匹配 glob 必须用英文引号 "**.txt"'**.txt' 括起来。如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档