Modifier le type de thème

Vous pouvez convertir un sujet d'importation en sujet standard, ou inversement.

Convertir un sujet d'importation en sujet standard

Pour convertir un sujet d'importation en sujet standard, effacez les paramètres d'ingestion. Procédez comme suit :

Console

  1. Dans la Google Cloud console, accédez à la page Sujets.

    Accéder aux sujets

  2. Cliquez sur le sujet d'importation.

  3. Sur la page d'informations du sujet, cliquez sur Modifier.

  4. Désactivez l'option Activer l'ingestion.

  5. Cliquez sur Mettre à jour.

gcloud

  1. Dans la Google Cloud console, activez Cloud Shell.

    Activer Cloud Shell

    En bas de la Google Cloud console, une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement de shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Exécutez la gcloud pubsub topics update commande :

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Remplacez TOPIC_ID par l'ID du sujet.

Convertir un sujet standard en sujet d'importation Amazon Kinesis Data Streams

Pour convertir un sujet standard en sujet d'importation Amazon Kinesis Data Streams, vérifiez d'abord que vous remplissez toutes les conditions préalables.

Console

  1. Dans la Google Cloud console, accédez à la page Sujets.

    Accéder aux sujets

  2. Cliquez sur le sujet que vous souhaitez convertir en sujet d'importation.

  3. Sur la page d'informations du sujet, cliquez sur Modifier.

  4. Sélectionnez l'option Activer l'ingestion.

  5. Pour la source d'ingestion, sélectionnez Amazon Kinesis Data Streams.

  6. Saisissez les informations suivantes :

    • ARN du flux Kinesis : ARN du flux Kinesis Data Streams que vous prévoyez d'ingérer dans Pub/Sub. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN du consommateur Kinesis : ARN de la ressource consommateur enregistrée dans le flux AWS Kinesis Data Streams. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN du rôle AWS : ARN du rôle AWS. Le format de l'ARN du rôle est le suivant : arn:aws:iam::${Account}:role/${RoleName}.

    • Compte de service : compte de service que vous avez créé.

  7. Cliquez sur Mettre à jour.

gcloud

  1. Dans la Google Cloud console, activez Cloud Shell.

    Activer Cloud Shell

    En bas de la Google Cloud console, une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement de shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Exécutez la commande gcloud pubsub topics update avec toutes les options mentionnées dans l'exemple suivant :

    gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Remplacez les éléments suivants :

    • TOPIC_ID correspond à l'ID ou au nom du sujet. Ce champ ne peut pas être mis à jour.

    • KINESIS_STREAM_ARN correspond à l'ARN des flux Kinesis Data Streams que vous prévoyez d'ingérer dans Pub/Sub. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN correspond à l'ARN de la ressource consommateur enregistrée dans les flux AWS Kinesis Data Streams. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN correspond à l'ARN du rôle AWS. Le format de l'ARN du rôle est le suivant : arn:aws:iam::${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT correspond au compte de service que vous avez créé.

Go

L'exemple suivant utilise la version majeure de la bibliothèque cliente Go Pub/Sub (v2). Si vous utilisez toujours la bibliothèque v1, consultez le guide de migration vers la v2. Pour afficher la liste des exemples de code v1, consultez les exemples de code obsolètes.

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Java .


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour C++ .

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Pour en savoir plus sur les ARN, consultez la page Noms de ressources Amazon (ARN) et identifiants IAM.

Convertir un sujet standard en sujet d'importation Cloud Storage

Pour convertir un sujet standard en sujet d'importation Cloud Storage, vérifiez d'abord que vous remplissez toutes les conditions préalables.

Console

  1. Dans la Google Cloud console, accédez à la page Sujets.

    Accéder aux sujets

  2. Cliquez sur le sujet que vous souhaitez convertir en sujet d'importation Cloud Storage.

  3. Sur la page d'informations du sujet, cliquez sur Modifier.

  4. Sélectionnez l'option Activer l'ingestion.

  5. Pour la source d'ingestion, sélectionnez Google Cloud Storage.

  6. Pour le bucket Cloud Storage, cliquez sur Parcourir.

    La page Sélectionner un bucket s'ouvre. Sélectionnez l'une des options suivantes :

    • Sélectionnez un bucket existant dans n'importe quel projet approprié.

    • Cliquez sur l'icône de création, puis suivez les instructions à l'écran pour créer un bucket. Une fois le bucket créé, sélectionnez-le pour le sujet d'importation Cloud Storage.

  7. Lorsque vous spécifiez le bucket, Pub/Sub vérifie que le compte de service Pub/Sub dispose des autorisations appropriées sur le bucket. En cas de problème d'autorisation, un message d'erreur s'affiche.

    Si vous rencontrez des problèmes d'autorisation, cliquez sur Définir des autorisations. Pour en savoir plus, consultez la section Accorder des autorisations Cloud Storage au compte de service Pub/Sub.

  8. Pour le format d'objet, sélectionnez Texte, Avro ou Pub/Sub Avro.

    Si vous sélectionnez Texte, vous pouvez éventuellement spécifier un délimiteur avec lequel diviser les objets en messages.

    Pour en savoir plus sur ces options, consultez la section Format d'entrée.

  9. Facultatif. Vous pouvez spécifier une durée minimale pour créer l'objet pour votre sujet. Si cette option est définie, seuls les objets créés après la durée minimale de création de l'objet sont ingérés.

    Pour en savoir plus, consultez la section Durée minimale pour créer l'objet.

  10. Vous devez spécifier un modèle glob. Pour ingérer tous les objets du bucket, utilisez ** comme modèle glob. Seuls les objets qui correspondent au modèle donné sont ingérés.

    Pour en savoir plus, consultez Mettre en correspondance un modèle glob.

  11. Conservez les autres paramètres par défaut.
  12. Cliquez sur Mettre à jour le sujet.

gcloud

  1. Dans la Google Cloud console, activez Cloud Shell.

    Activer Cloud Shell

    En bas de la Google Cloud console, une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement de shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Pour éviter de perdre vos paramètres pour le sujet d'importation, veillez à tous les inclure chaque fois que vous mettez à jour le sujet. Si vous en omettez un, Pub/Sub rétablit sa valeur par défaut d'origine.

    Exécutez la commande gcloud pubsub topics update avec toutes les options mentionnées dans l'exemple suivant :

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Remplacez les éléments suivants :

    • TOPIC_ID correspond à l'ID ou au nom du sujet. Ce champ ne peut pas être mis à jour.

    • BUCKET_NAME : spécifie le nom d'un bucket existant. Exemple : prod_bucket. Le nom du bucket ne doit pas inclure l'ID du projet. Pour créer un bucket, consultez la section Créer des buckets.

    • INPUT_FORMAT : spécifie le format des objets ingérés. Il peut s'agir de text, avro ou pubsub_avro. Pour en savoir plus sur ces options, consultez la section Format d'entrée.

    • TEXT_DELIMITER : spécifie le délimiteur avec lequel diviser les objets texte en messages Pub/Sub. Il doit s'agir d'un seul caractère et ne doit être défini que lorsque INPUT_FORMAT est text. La valeur par défaut est le caractère de nouvelle ligne (\n).

      Lorsque vous utilisez la gcloud CLI pour spécifier le délimiteur, faites très attention à la gestion des caractères spéciaux tels que la nouvelle ligne \n. Utilisez le format '\n' pour vous assurer que le délimiteur est correctement interprété. Si vous utilisez simplement \n sans guillemets ni échappement, le délimiteur sera "n".

    • MINIMUM_OBJECT_CREATE_TIME : spécifie l'heure minimale à laquelle un objet a été créé pour pouvoir être ingéré. Elle doit être au format UTC YYYY-MM-DDThh:mm:ssZ. Exemple : 2024-10-14T08:30:30Z.

      Toute date, passée ou future, comprise entre 0001-01-01T00:00:00Z et 9999-12-31T23:59:59Z inclus est valide.

    • MATCH_GLOB : spécifie le modèle glob à mettre en correspondance pour qu'un objet soit ingéré. Lorsque vous utilisez la gcloud CLI, un glob de correspondance avec des caractères * doit avoir le caractère * mis en forme comme échappé sous la forme \*\*.txt ou l'ensemble du glob de correspondance doit être entre guillemets "**.txt" ou '**.txt'. Pour en savoir plus sur les syntaxes acceptées pour les modèles glob, consultez la documentation Cloud Storage.