Crie um tópico de importação do Cloud Storage

Um tópico de importação do Cloud Storage permite-lhe ingerir continuamente dados do Cloud Storage no Pub/Sub. Em seguida, pode transmitir os dados para qualquer um dos destinos suportados pelo Pub/Sub. O Pub/Sub deteta automaticamente novos objetos adicionados ao contentor do Cloud Storage e carrega-os.

O Cloud Storage é um serviço para armazenar os seus objetos em Google Cloud. Um objeto é um fragmento de dados imutável que consiste num ficheiro de qualquer formato. Armazena objetos em contentores denominados contentores. Os contentores também podem conter pastas geridas, que usa para fornecer acesso expandido a grupos de objetos com um prefixo de nome partilhado.

Para mais informações sobre o Cloud Storage, consulte a documentação do Cloud Storage.

Para mais informações sobre tópicos de importação, consulte o artigo Acerca dos tópicos de importação.

Antes de começar

Funções e autorizações necessárias

Para receber as autorizações de que precisa para criar e gerir um tópico de importação do Cloud Storage, peça ao seu administrador para lhe conceder a função de IAM de Editor do Pub/Sub (roles/pubsub.editor) no seu tópico ou projeto. Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Esta função predefinida contém as autorizações necessárias para criar e gerir um tópico de importação do Cloud Storage. Para ver as autorizações exatas que são necessárias, expanda a secção Autorizações necessárias:

Autorizações necessárias

As seguintes autorizações são necessárias para criar e gerir um tópico de importação do Cloud Storage:

  • Crie um tópico de importação: pubsub.topics.create
  • Elimine um tópico de importação: pubsub.topics.delete
  • Obtenha um tópico de importação: pubsub.topics.get
  • Indique um tópico de importação: pubsub.topics.list
  • Publicar num tópico de importação: pubsub.topics.publish
  • Atualize um tópico de importação: pubsub.topics.update
  • Obtenha a Política IAM para um tópico de importação: pubsub.topics.getIamPolicy
  • Configure a política IAM para um tópico de importação: pubsub.topics.setIamPolicy

Também pode conseguir estas autorizações com funções personalizadas ou outras funções predefinidas.

Pode configurar o controlo de acesso ao nível do projeto e ao nível do recurso individual.

A política de armazenamento de mensagens está em conformidade com a localização do contentor

A política de armazenamento de mensagens do tópico do Pub/Sub tem de sobrepor-se às regiões onde o seu contentor do Cloud Storage está localizado. Esta política determina onde o Pub/Sub tem autorização para armazenar os seus dados de mensagens.

  • Para grupos com o tipo de localização como região: a política tem de incluir essa região específica. Por exemplo, se o seu contentor estiver na região us-central1, a política de armazenamento de mensagens também tem de incluir us-central1.

  • Para contentores com o tipo de localização como duas regiões ou várias regiões: a política tem de incluir, pelo menos, uma região na localização de duas regiões ou várias regiões. Por exemplo, se o seu contentor estiver em US multi-region, a política de armazenamento de mensagens pode incluir us-central1, us-east1 ou qualquer outra região em US multi-region.

    Se a política não incluir a região do contentor, a criação do tópico falha. Por exemplo, se o seu contentor estiver em europe-west1 e a sua política de armazenamento de mensagens incluir apenas asia-east1, recebe um erro.

    Se a política de armazenamento de mensagens incluir apenas uma região que se sobreponha à localização do contentor, a redundância multirregional pode ser comprometida. Isto deve-se ao facto de, se essa única região ficar indisponível, os seus dados poderem não ficar acessíveis. Para garantir a redundância total, recomenda-se que inclua, pelo menos, duas regiões na política de armazenamento de mensagens que façam parte da localização multirregional ou birregional do contentor.

Para mais informações sobre as localizações dos contentores, consulte a documentação.

Ative a publicação

Para ativar a publicação, tem de atribuir a função de publicador do Pub/Sub à conta de serviço do Pub/Sub para que o Pub/Sub possa publicar no tópico de importação do Cloud Storage.

Ativar a publicação em todos os tópicos de importação do Cloud Storage

Escolha esta opção quando não tiver um tópico de importação do Cloud Storage disponível no seu projeto.

  1. Na Google Cloud consola, aceda à página IAM.

    Aceda ao IAM

  2. Selecione a caixa de verificação Incluir concessões de funções fornecidas pela Google.

  3. Procure a conta de serviço do Pub/Sub com o seguinte formato:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. Para esta conta de serviço, clique no botão Editar principal.

  5. Se necessário, clique em Adicionar outra função.

  6. Pesquise e selecione a função de publicador do Pub/Sub (roles/pubsub.publisher).

  7. Clique em Guardar.

Ative a publicação num único tópico de importação do Cloud Storage

Se quiser conceder ao Pub/Sub autorização para publicar num tópico de importação do Cloud Storage específico que já exista, siga estes passos:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Substitua o seguinte:

    • TOPIC_ID é o ID ou o nome do tópico de importação do Cloud Storage.

    • PROJECT_NUMBER é o número do projeto. Para ver o número do projeto, consulte o artigo Identificar projetos.

  3. Atribua funções do Cloud Storage à conta de serviço do Pub/Sub

    Para criar um tópico de importação do Cloud Storage, a conta de serviço do Pub/Sub tem de ter autorização para ler a partir do contentor do Cloud Storage específico. As seguintes autorizações são necessárias:

    • storage.objects.list
    • storage.objects.get
    • storage.buckets.get

    Para atribuir estas autorizações à conta de serviço do Pub/Sub, escolha um dos seguintes procedimentos:

    • Conceda autorizações ao nível do contentor. No contentor do Cloud Storage específico, conceda a função de leitor de objetos antigos do Storage (roles/storage.legacyObjectReader) e as funções de leitor de contentores antigos do Storage (roles/storage.legacyBucketReader) à conta de serviço do Pub/Sub.

    • Se tiver de conceder funções ao nível do projeto, pode, em alternativa, conceder a função de administrador de armazenamento (roles/storage.admin) no projeto que contém o contentor do Cloud Storage. Conceda esta função à conta de serviço do Pub/Sub.

    Autorizações de contentor

    Execute os seguintes passos para conceder a função Storage Legacy Object Reader (roles/storage.legacyObjectReader) e as funções Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) à conta de serviço do Pub/Sub ao nível do contentor:

    1. Na Google Cloud consola, aceda à página Cloud Storage.

      Aceda ao Cloud Storage

    2. Clique no contentor do Cloud Storage a partir do qual quer ler mensagens e importar para o tópico de importação do Cloud Storage.

      É apresentada a página Detalhes do contentor.

    3. Na página Detalhes do contentor, clique no separador Autorizações.

    4. No separador Autorizações > Ver por principais, clique em Conceder acesso.

      É aberta a página Conceder acesso.

    5. Na secção Adicionar membros, introduza o nome da sua conta de serviço do Pub/Sub.

      O formato da conta de serviço é service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Por exemplo, para um projeto com PROJECT_NUMBER=112233445566, a conta de serviço tem o formato service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

    6. No menu pendente Atribuir funções > Selecionar uma função, introduza Object Reader e selecione a função Storage Legacy Object Reader.

    7. Clique em Adicionar outra função.

    8. No menu pendente Selecionar uma função, introduza Bucket Reader e selecione a função Leitor de contentores antigos de armazenamento.

    9. Clique em Guardar.

    Autorizações do projeto

    Execute os seguintes passos para conceder a função de administrador de armazenamento (roles/storage.admin) ao nível do projeto:

    1. Na Google Cloud consola, aceda à página IAM.

      Aceda ao IAM

    2. No separador Autorizações > Ver por principais, clique em Conceder acesso.

      É aberta a página Conceder acesso.

    3. Na secção Adicionar membros, introduza o nome da sua conta de serviço do Pub/Sub.

      O formato da conta de serviço é service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Por exemplo, para um projeto com PROJECT_NUMBER=112233445566, a conta de serviço tem o formato service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

    4. No menu pendente Atribuir funções > Selecionar uma função, introduza Storage Admin e selecione a função Administrador de armazenamento.

    5. Clique em Guardar.

    Para mais informações sobre o IAM do Cloud Storage, consulte o artigo Cloud Storage Identity and Access Management.

    Propriedades dos tópicos de importação do Cloud Storage

    Para mais informações sobre as propriedades comuns em todos os tópicos, consulte o artigo Propriedades de um tópico.

    Nome do contentor

    Este é o nome do contentor do Cloud Storage a partir do qual o Pub/Sub lê os dados publicados num tópico de importação do Cloud Storage.

    Formato de entrada

    Quando cria um tópico de importação do Cloud Storage, pode especificar o formato dos objetos a serem carregados como Texto, Avro ou Pub/Sub Avro.

    • Texto. Presume-se que os objetos contêm dados com texto simples. Este formato de entrada tenta carregar todos os objetos no contentor, desde que o objeto cumpra o tempo mínimo de criação de objetos e corresponda aos critérios de padrão glob.

      Delimitador. Também pode especificar um delimitador pelo qual os objetos são divididos em mensagens. Se não for definido, a predefinição é o caráter de nova linha (\n). O delimitador só pode ser um único caráter.

    • Avro. Os objetos estão no formato binário Apache Avro. Qualquer objeto que não esteja num formato Apache Avro válido não é carregado. Seguem-se as limitações relativas ao formato Avro:

      • As versões 1.1.0 e 1.2.0 do Avro não são suportadas.
      • O tamanho máximo de um bloco Avro é de 16 MB.
    • Pub/Sub Avro. Os objetos estão no formato binário Apache Avro com um esquema correspondente ao de um objeto escrito no Cloud Storage através de uma subscrição do Cloud Storage do Pub/Sub com o formato de ficheiro Avro. Seguem-se algumas diretrizes importantes para o Pub/Sub Avro:

      • O campo de dados do registo Avro é usado para preencher o campo de dados da mensagem Pub/Sub gerada.

      • Se a opção write_metadata for especificada para a subscrição do Cloud Storage, todos os valores no campo de atributos são preenchidos como os atributos da mensagem Pub/Sub gerada.

      • Se for especificada uma chave de ordenação na mensagem original escrita no Cloud Storage, este campo é preenchido como um atributo com o nome original_message_ordering_key na mensagem do Pub/Sub gerada.

    Tempo mínimo de criação de objetos

    Opcionalmente, pode especificar um tempo mínimo de criação de objetos quando cria um tópico de importação do Cloud Storage. Apenas são carregados objetos criados a partir desta data/hora. Esta data/hora tem de ser fornecida num formato como YYYY-MM-DDThh:mm:ssZ. Qualquer data, passada ou futura, entre 0001-01-01T00:00:00Z e 9999-12-31T23:59:59Z, inclusive, é válida.

    Corresponda ao padrão glob

    Opcionalmente, pode especificar um padrão glob de correspondência quando cria um tópico de importação do Cloud Storage. Apenas são carregados objetos com nomes que correspondem a este padrão. Por exemplo, para carregar todos os objetos com o sufixo .txt, pode especificar o padrão glob como **.txt.

    Para ver informações sobre a sintaxe suportada para padrões glob, consulte a documentação do Cloud Storage.

    Use tópicos de importação do Cloud Storage

    Pode criar um novo tópico de importação ou editar um tópico existente.

    Considerações

    • A criação do tópico e da subscrição em separado, mesmo que seja feita em rápida sucessão, pode levar à perda de dados. Existe um curto período durante o qual o tópico existe sem uma subscrição. Se forem enviados dados para o tópico durante este período, estes são perdidos. Se criar primeiro o tópico, criar a subscrição e, em seguida, converter o tópico num tópico de importação, garante que não perde nenhuma mensagem durante o processo de importação.

    Crie um tópico de importação do Cloud Storage

    Para criar um tópico de importação do Cloud Storage, siga estes passos:

    Consola

    1. Na Google Cloud consola, aceda à página Tópicos.

      Aceda a Tópicos

    2. Clique em Criar tópico.

      É apresentada a página de detalhes do tópico.

    3. No campo ID do tópico, introduza um ID para o tópico de importação do Cloud Storage.

      Para mais informações sobre a atribuição de nomes a tópicos, consulte as diretrizes de nomenclatura.

    4. Selecione Adicionar uma subscrição predefinida.

    5. Selecione Ativar carregamento.

    6. Para a origem da carregamento, selecione Google Cloud Storage.

    7. Para o contentor do Cloud Storage, clique em Procurar.

      É aberta a página Selecionar contentor. Selecione uma das seguintes opções:

      • Selecione um contentor existente de qualquer projeto adequado.

      • Clique no ícone de criação e siga as instruções no ecrã para criar um novo contentor. Depois de criar o contentor, selecione-o para o tópico de importação do Cloud Storage.

    8. Quando especifica o contentor, o Pub/Sub verifica se existem as autorizações adequadas no contentor para a conta de serviço do Pub/Sub. Se existirem problemas de autorizações, é apresentada uma mensagem semelhante à seguinte:

      Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

      Se tiver problemas de autorização, clique em Definir autorizações. Para mais informações, consulte o artigo Conceda autorizações do Cloud Storage à conta de serviço do Pub/Sub.

    9. Para o Formato do objeto, selecione Texto, Avro ou Pub/Sub Avro.

      Se selecionar Texto, pode especificar opcionalmente um Delimitador com o qual dividir os objetos em mensagens.

      Para mais informações sobre estas opções, consulte o artigo Formato de entrada.

    10. Opcional. Pode especificar um tempo mínimo de criação de objetos para o seu tópico. Se estiver definido, apenas são carregados objetos criados após a hora mínima de criação de objetos.

      Para mais informações, consulte o artigo Tempo mínimo de criação de objetos.

    11. Tem de especificar um padrão glob. Para carregar todos os objetos no contentor, use ** como padrão glob. Se estiver definido, apenas os objetos que correspondam ao padrão indicado são carregados.

      Para mais informações, consulte o artigo Faça corresponder um padrão glob.

    12. Manter as outras predefinições.

    13. Clique em Criar tópico.

    gcloud

    1. In the Google Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Execute o comando gcloud pubsub topics create:

      gcloud pubsub topics create TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME \
          --cloud-storage-ingestion-input-format=INPUT_FORMAT \
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER \
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \
          --cloud-storage-ingestion-match-glob=MATCH_GLOB
      

      No comando, apenas TOPIC_ID, a flag --cloud-storage-ingestion-bucket e a flag --cloud-storage-ingestion-input-format são obrigatórias. As restantes flags são opcionais e podem ser omitidas.

      Substitua o seguinte:

      • TOPIC_ID: o nome ou o ID do seu tópico.
      • BUCKET_NAME: especifica o nome de um contentor existente. Por exemplo, prod_bucket. O nome do contentor não pode incluir o ID do projeto. Para criar um contentor, consulte o artigo Crie contentores.
      • INPUT_FORMAT: especifica o formato dos objetos carregados. Pode ser text, avro ou pubsub_avro. Para mais informações sobre estas opções, consulte o artigo Formato de entrada.
      • TEXT_DELIMITER: Especifica o delimitador com o qual dividir os objetos de texto em mensagens do Pub/Sub. Este deve ser um único caráter e só deve ser definido quando INPUT_FORMAT é text. A predefinição é o caráter de nova linha (\n).

        Quando usar a CLI gcloud para especificar o delimitador, preste muita atenção ao processamento de carateres especiais, como a nova linha \n. Use o formato '\n' para garantir que o delimitador é interpretado corretamente. A utilização simples de \n sem aspas nem carateres de escape resulta num delimitador de "n".

      • MINIMUM_OBJECT_CREATE_TIME: especifica o tempo mínimo em que um objeto foi criado para que seja carregado. Deve estar em UTC no formato YYYY-MM-DDThh:mm:ssZ. Por exemplo, 2024-10-14T08:30:30Z.

        Qualquer data, passada ou futura, de 0001-01-01T00:00:00Z a 9999-12-31T23:59:59Z, inclusive, é válida.

      • MATCH_GLOB: especifica o padrão glob a corresponder para que um objeto seja carregado. Quando usa a CLI gcloud, um padrão glob com carateres * tem de ter o carater * formatado como carater de escape no formato \*\*.txt ou todo o padrão glob tem de estar entre aspas "**.txt" ou '**.txt'. Para ver informações sobre a sintaxe suportada para padrões glob, consulte a documentação do Cloud Storage.

    3. C++

      Antes de experimentar este exemplo, siga as instruções de configuração do C++ no artigo Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API C++ do Pub/Sub.

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string bucket, std::string const& input_format,
         std::string text_delimiter, std::string match_glob,
         std::string const& minimum_object_create_time) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                                   ->mutable_cloud_storage();
        cloud_storage.set_bucket(std::move(bucket));
        if (input_format == "text") {
          cloud_storage.mutable_text_format()->set_delimiter(
              std::move(text_delimiter));
        } else if (input_format == "avro") {
          cloud_storage.mutable_avro_format();
        } else if (input_format == "pubsub_avro") {
          cloud_storage.mutable_pubsub_avro_format();
        } else {
          std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                       "got value: "
                    << input_format << std::endl;
          return;
        }
      
        if (!match_glob.empty()) {
          cloud_storage.set_match_glob(std::move(match_glob));
        }
      
        if (!minimum_object_create_time.empty()) {
          google::protobuf::Timestamp timestamp;
          if (!google::protobuf::util::TimeUtil::FromString(
                  minimum_object_create_time,
                  cloud_storage.mutable_minimum_object_create_time())) {
            std::cout << "Invalid minimum object create time: "
                      << minimum_object_create_time << std::endl;
          }
        }
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Ir

      O exemplo seguinte usa a versão principal da biblioteca de cliente Go Pub/Sub (v2). Se ainda estiver a usar a biblioteca v1, consulte o guia de migração para a v2. Para ver uma lista de exemplos de código da v1, consulte os exemplos de código descontinuados.

      Antes de experimentar este exemplo, siga as instruções de configuração do Go em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do Pub/Sub.

      import (
      	"context"
      	"fmt"
      	"io"
      	"time"
      
      	"cloud.google.com/go/pubsub/v2"
      	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
      	"google.golang.org/protobuf/types/known/timestamppb"
      )
      
      func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime, delimiter string) error {
      	// projectID := "my-project-id"
      	// topicID := "my-topic"
      	// bucket := "my-bucket"
      	// matchGlob := "**.txt"
      	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"
      	// delimiter := ","
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
      	if err != nil {
      		return err
      	}
      
      	topicpb := &pubsubpb.Topic{
      		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
      		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
      			Source: &pubsubpb.IngestionDataSourceSettings_CloudStorage_{
      				CloudStorage: &pubsubpb.IngestionDataSourceSettings_CloudStorage{
      					Bucket: bucket,
      					// Alternatively, can be Avro or PubSubAvro formats. See
      					InputFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
      						TextFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat{
      							Delimiter: &delimiter,
      						},
      					},
      					MatchGlob:               matchGlob,
      					MinimumObjectCreateTime: timestamppb.New(minCreateTime),
      				},
      			},
      		},
      	}
      	t, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
      	if err != nil {
      		return fmt.Errorf("CreateTopic: %w", err)
      	}
      	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
      	return nil
      }
      

      Java

      Antes de experimentar este exemplo, siga as instruções de configuração do Java no artigo Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API Java do Pub/Sub.

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.protobuf.util.Timestamps;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      import java.text.ParseException;
      
      public class CreateTopicWithCloudStorageIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Cloud Storage ingestion settings.
          // bucket and inputFormat are required arguments.
          String bucket = "your-bucket";
          String inputFormat = "text";
          String textDelimiter = "\n";
          String matchGlob = "**.txt";
          String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";
      
          createTopicWithCloudStorageIngestionExample(
              projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
        }
      
        public static void createTopicWithCloudStorageIngestionExample(
            String projectId,
            String topicId,
            String bucket,
            String inputFormat,
            String textDelimiter,
            String matchGlob,
            String minimumObjectCreateTime)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
                IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
            switch (inputFormat) {
              case "text":
                cloudStorageBuilder.setTextFormat(
                    IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                        .setDelimiter(textDelimiter)
                        .build());
                break;
              case "avro":
                cloudStorageBuilder.setAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
                break;
              case "pubsub_avro":
                cloudStorageBuilder.setPubsubAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
                break;
              default:
                throw new IllegalArgumentException(
                    "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
            }
      
            if (matchGlob != null && !matchGlob.isEmpty()) {
              cloudStorageBuilder.setMatchGlob(matchGlob);
            }
      
            if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
              try {
                cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
              } catch (ParseException e) {
                System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
              }
            }
      
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder()
                    .setCloudStorage(cloudStorageBuilder.build())
                    .build();
      
            TopicName topicName = TopicName.of(projectId, topicId);
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println(
                "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      Antes de experimentar este exemplo, siga as instruções de configuração do Node.js em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Node.js do Pub/Sub.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId,
        bucket,
        inputFormat,
        textDelimiter,
        matchGlob,
        minimumObjectCreateTime,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Node.ts

      Antes de experimentar este exemplo, siga as instruções de configuração do Node.js em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Node.js do Pub/Sub.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      import {PubSub, TopicMetadata} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId: string,
        bucket: string,
        inputFormat: string,
        textDelimiter: string,
        matchGlob: string,
        minimumObjectCreateTime: string,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata: TopicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Python

      Antes de experimentar este exemplo, siga as instruções de configuração do Python em Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API Python Pub/Sub.

      from google.cloud import pubsub_v1
      from google.protobuf import timestamp_pb2
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # bucket = "your-bucket"
      # input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
      # text_delimiter = "\n"
      # match_glob = "**.txt"
      # minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
          bucket=bucket,
      )
      if input_format == "text":
          cloud_storage_settings.text_format = (
              IngestionDataSourceSettings.CloudStorage.TextFormat(
                  delimiter=text_delimiter
              )
          )
      elif input_format == "avro":
          cloud_storage_settings.avro_format = (
              IngestionDataSourceSettings.CloudStorage.AvroFormat()
          )
      elif input_format == "pubsub_avro":
          cloud_storage_settings.pubsub_avro_format = (
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
          )
      else:
          print(
              "Invalid input_format: "
              + input_format
              + "; must be in ('text', 'avro', 'pubsub_avro')"
          )
          return
      
      if match_glob:
          cloud_storage_settings.match_glob = match_glob
      
      if minimum_object_create_time:
          try:
              minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
              minimum_object_create_time_timestamp.FromJsonString(
                  minimum_object_create_time
              )
              cloud_storage_settings.minimum_object_create_time = (
                  minimum_object_create_time_timestamp
              )
          except ValueError:
              print("Invalid minimum_object_create_time: " + minimum_object_create_time)
              return
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              cloud_storage=cloud_storage_settings,
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

    Se tiver problemas, consulte o artigo Resolução de problemas de um tópico de importação do Cloud Storage.

    Edite um tópico de importação do Cloud Storage

    Pode editar um tópico de importação do Cloud Storage para atualizar as respetivas propriedades.

    Por exemplo, para reiniciar o carregamento, pode alterar o contentor ou atualizar o tempo mínimo de criação de objetos.

    Para editar um tópico de importação do Cloud Storage, siga estes passos:

    Consola

    1. Na Google Cloud consola, aceda à página Tópicos.

      Aceda a Tópicos

    2. Clique no tópico de importação do Cloud Storage.

    3. Na página de detalhes do tópico, clique em Editar.

    4. Atualize os campos que quer alterar.

    5. Clique em Atualizar.

    gcloud

    1. In the Google Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Para evitar perder as definições do tópico de importação, certifique-se de que inclui todas as definições sempre que atualizar o tópico. Se omitir algo, o Pub/Sub repõe a definição para o respetivo valor predefinido original.

      Execute o comando gcloud pubsub topics update com todos os sinalizadores mencionados no seguinte exemplo:

      gcloud pubsub topics update TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME\
          --cloud-storage-ingestion-input-format=INPUT_FORMAT\
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
          --cloud-storage-ingestion-match-glob=MATCH_GLOB

      Substitua o seguinte:

      • TOPIC_ID é o ID ou o nome do tópico. Não é possível atualizar este campo.

      • BUCKET_NAME: especifica o nome de um contentor existente. Por exemplo, prod_bucket. O nome do contentor não pode incluir o ID do projeto. Para criar um contentor, consulte o artigo Crie contentores.

      • INPUT_FORMAT: especifica o formato dos objetos carregados. Pode ser text, avro ou pubsub_avro. Consulte o formato de entrada para mais informações sobre estas opções.

      • TEXT_DELIMITER: Especifica o delimitador com o qual dividir os objetos de texto em mensagens do Pub/Sub. Este deve ser um único caráter e só deve ser definido quando INPUT_FORMAT é text. A predefinição é o caráter de nova linha (\n).

        Quando usar a CLI gcloud para especificar o delimitador, preste muita atenção ao processamento de carateres especiais, como a nova linha \n. Use o formato '\n' para garantir que o delimitador é interpretado corretamente. A utilização simples de \n sem aspas nem carateres de escape resulta num delimitador de "n".

      • MINIMUM_OBJECT_CREATE_TIME: especifica o tempo mínimo em que um objeto foi criado para que seja carregado. Deve estar em UTC no formato YYYY-MM-DDThh:mm:ssZ. Por exemplo, 2024-10-14T08:30:30Z.

        Qualquer data, passada ou futura, de 0001-01-01T00:00:00Z a 9999-12-31T23:59:59Z, inclusive, é válida.

      • MATCH_GLOB: especifica o padrão glob a corresponder para que um objeto seja carregado. Quando usa a CLI gcloud, um padrão glob com carateres * tem de ter o carater * formatado como carater de escape no formato \*\*.txt ou todo o padrão glob tem de estar entre aspas "**.txt" ou '**.txt'. Para ver informações sobre a sintaxe suportada para padrões glob, consulte a documentação do Cloud Storage.

    Quotas e limites para tópicos de importação do Cloud Storage

    O débito do publicador para tópicos de importação está limitado pela quota de publicação do tópico. Para mais informações, consulte o artigo Quotas e limites do Pub/Sub.

    O que se segue?