Mudar o tipo de tópico

É possível converter um tópico de importação em um padrão ou, inversamente, um tópico padrão em um de importação.

Converter um tópico de importação em um tópico padrão

Para converter um tópico de importação em um padrão, desmarque as configurações de ingestão. Siga as etapas abaixo:

Console

  1. No Google Cloud console do, acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tópico de importação.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Desmarque a opção Ativar ingestão.

  5. Clique em Atualizar.

gcloud

  1. No Google Cloud console do, ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte de baixo do Google Cloud console do Cloud, uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a Google Cloud CLI já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.

  2. Execute o gcloud pubsub topics update comando:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Substitua TOPIC_ID pelo ID do tópico.

Converter um tópico padrão em um tópico de importação do Amazon Kinesis Data Streams

Para converter um tópico padrão em um tópico de importação do Amazon Kinesis Data Streams, primeiro verifique se você atende a todos os pré-requisitos.

Console

  1. No Google Cloud console do, acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tópico que você quer converter em um tópico de importação.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Selecione a opção Ativar ingestão.

  5. Para a origem da ingestão, selecione Amazon Kinesis Data Streams.

  6. Digite os seguintes detalhes:

    • ARN da transmissão do Kinesis: o ARN do Kinesis Data Stream que você planeja ingerir no Pub/Sub. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN do consumidor do Kinesis: o ARN do recurso do consumidor que está registrado no AWS Kinesis Data Stream. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN do papel da AWS: o ARN do papel da AWS. O formato do ARN do papel é o seguinte: arn:aws:iam::${Account}:role/${RoleName}.

    • Conta de serviço: a conta de serviço que você criou.

  7. Clique em Atualizar.

gcloud

  1. No Google Cloud console do, ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte de baixo do Google Cloud console do Cloud, uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a Google Cloud CLI já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.

  2. Execute o comando gcloud pubsub topics update com todas as flags mencionadas no exemplo a seguir:

    gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Substitua:

    • TOPIC_ID é o ID ou nome do tópico. Não é possível atualizar este campo.

    • KINESIS_STREAM_ARN é o ARN do Kinesis Data Streams que você planeja ingerir no Pub/Sub. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN é o ARN do recurso do consumidor registrado no AWS Kinesis Data Streams. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN é o ARN do papel da AWS. O formato do ARN do papel é o seguinte: arn:aws:iam::${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT é a conta de serviço que você criou.

Go

O exemplo a seguir usa a versão principal da biblioteca de cliente do Go Pub/Sub (v2). Se você ainda estiver usando a biblioteca v1, consulte o guia de migração para a v2. Para conferir uma lista de exemplos de código da v1, consulte os exemplos de código obsoletos.

Antes de tentar esse exemplo, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Java (em inglês).


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Node.js (em inglês).

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Antes de tentar esse exemplo, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do Pub/Sub.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Antes de tentar esse exemplo, siga as instruções de configuração do C++ em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Node.js (em inglês).

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Para mais informações sobre ARNs, consulte Nomes de recursos da Amazon (ARNs) e identificadores do IAM.

Converter um tópico padrão em um tópico de importação do Cloud Storage

Para converter um tópico padrão em um tópico de importação do Cloud Storage, primeiro verifique se você atende a todos os pré-requisitos.

Console

  1. No Google Cloud console do, acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tópico que você quer converter em um tópico de importação do Cloud Storage.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Selecione a opção Ativar ingestão.

  5. Para a origem da ingestão, selecione Google Cloud Storage.

  6. Para o bucket do Cloud Storage, clique em Procurar.

    A página Selecionar bucket será aberta. Selecione uma das seguintes opções:

    • Selecione um bucket atual de qualquer projeto adequado.

    • Clique no ícone de criação e siga as instruções na tela para criar um bucket. Depois de criar o bucket, selecione-o para o tópico de importação do Cloud Storage.

  7. Ao especificar o bucket, o Pub/Sub verifica se há permissões adequadas no bucket para a conta de serviço do Pub/Sub. Se houver problemas de permissão, uma mensagem de erro relacionada às permissões será exibida.

    Se você tiver problemas de permissão, clique em Definir permissões. Para mais informações, consulte Conceder permissões do Cloud Storage à conta de serviço do Pub/Sub.

  8. Em Formato do objeto, selecione Texto, Avro ou Pub/Sub Avro.

    Se você selecionar Texto, poderá especificar um Delimitador para dividir objetos em mensagens.

    Para mais informações sobre essas opções, consulte Formato de entrada.

  9. Opcional. Você pode especificar um Tempo mínimo de criação de objeto para o seu tópico. Se definido, somente os objetos criados após o tempo mínimo de criação de objeto serão ingeridos.

    Para mais informações, consulte Tempo mínimo de criação de objeto.

  10. É necessário especificar um padrão glob. Para ingerir todos os objetos no bucket, use ** como o padrão glob. Somente os objetos que correspondem ao padrão fornecido são ingeridos.

    Para mais informações, consulte Corresponder a um padrão glob.

  11. Mantenha as outras configurações padrão.
  12. Clique em Atualizar tópico.

gcloud

  1. No Google Cloud console do, ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte de baixo do Google Cloud console do Cloud, uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a Google Cloud CLI já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.

  2. Para não perder as configurações do tópico de importação, certifique-se de incluir todas elas sempre que atualizar o tópico. Se você deixar algo de fora, o Pub/Sub vai redefinir a configuração para o valor padrão original.

    Execute o comando gcloud pubsub topics update com todas as flags mencionadas no exemplo a seguir:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Substitua:

    • TOPIC_ID é o ID ou nome do tópico. Não é possível atualizar este campo.

    • BUCKET_NAME: especifica o nome de um bucket atual. Por exemplo, prod_bucket. O nome do bucket não pode incluir o ID do projeto. Para criar um bucket, consulte Criar buckets.

    • INPUT_FORMAT: especifica o formato dos objetos ingeridos. Pode ser text, avro ou pubsub_avro. Para mais informações sobre essas opções, consulte Formato de entrada.

    • TEXT_DELIMITER: especifica o delimitador com que os objetos de texto são divididos em mensagens do Pub/Sub. Ele precisa ser um caractere único e só pode ser definido quando INPUT_FORMAT é text. O padrão é o caractere de nova linha (\n).

      Ao usar a CLI gcloud para especificar o delimitador, preste atenção ao tratamento de caracteres especiais, como a nova linha \n. Use o formato '\n' para garantir que o delimitador seja interpretado corretamente. Simplesmente usar \n sem aspas ou escape resulta em um delimitador de "n".

    • MINIMUM_OBJECT_CREATE_TIME: especifica o tempo mínimo em que um objeto foi criado para que ele seja ingerido. Ele precisa estar no formato UTC YYYY-MM-DDThh:mm:ssZ. Por exemplo, 2024-10-14T08:30:30Z.

      Qualquer data, passada ou futura, de 0001-01-01T00:00:00Z a 9999-12-31T23:59:59Z, inclusive, é válida.

    • MATCH_GLOB: especifica o padrão glob a ser correspondido em ordem para que um objeto seja ingerido. Ao usar a CLI gcloud, um glob de correspondência com * caracteres precisa ter o * caractere formatado como escape na forma \*\*.txt ou o glob de correspondência inteiro precisa estar entre aspas "**.txt" ou '**.txt'. Para informações sobre a sintaxe com suporte para padrões glob, consulte a documentação do Cloud Storage.