トピックの種類を変更する

インポート トピックは標準トピックに、あるいは標準トピックをインポート トピックに変換できます。

インポート トピックを標準トピックに変換する

インポート トピックを標準トピックに変換するには、取り込み設定をクリアします。次の手順を行います。

コンソール

  1. Google Cloud コンソールで、[トピック] ページに移動します。

    [トピック] に移動

  2. インポート トピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. [取り込みを有効にする] チェックボックスをオフにします。

  5. [更新] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics update コマンドを実行します。

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    TOPIC_ID は、トピック ID に置き換えます。

標準トピックを Amazon Kinesis Data Streams インポート トピックに変換する

標準トピックを Amazon Kinesis Data Streams インポート トピックに変換するには、まず、すべての前提条件を満たしていることを確認します。

コンソール

  1. Google Cloud コンソールで、[トピック] ページに移動します。

    [トピック] に移動

  2. インポート トピックに変換するトピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. [取り込みを有効にする] オプションを選択します。

  5. 取り込みソースには、[Amazon Kinesis Data Streams] を選択します。

  6. 次の詳細情報を入力します。

    • Kinesis Stream ARN: Pub/Sub に取り込む予定の Kinesis Data Stream の ARN。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis Consumer ARN: AWS Kinesis Data Stream に登録されているコンシューマ リソースの ARN。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS Role ARN: AWS ロールの ARN。ロールの ARN 形式は次のとおりです: arn:aws:iam::${Account}:role/${RoleName}

    • サービス アカウント: 作成したサービス アカウント。

  7. [更新] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 次のサンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

    gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    次のように置き換えます。

    • TOPIC_ID はトピック ID または名前です。このフィールドは更新できません。

    • KINESIS_STREAM_ARN は、Pub/Sub に取り込む予定の Kinesis Data Streams の ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN は、AWS Kinesis Data Streams に登録されているコンシューマ リソースの ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN は AWS ロールの ARN です。ロールの ARN 形式は次のとおりです: arn:aws:iam::${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT は、作成したサービス アカウントです。

  3. Go

    次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用しています。まだ v1 ライブラリを使用している場合は、v2 への移行ガイドをご覧ください。v1 コードサンプルの一覧については、 非推奨のコードサンプルをご覧ください。

    このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    )
    
    func updateTopicType(w io.Writer, projectID, topic string) error {
    	// projectID := "my-project-id"
    	// topic := "projects/my-project-id/topics/my-topic"
    	streamARN := "stream-arn"
    	consumerARN := "consumer-arn"
    	awsRoleARN := "aws-role-arn"
    	gcpServiceAccount := "gcp-service-account"
    
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	pbTopic := &pubsubpb.Topic{
    		Name: topic,
    		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
    			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
    				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
    					StreamArn:         streamARN,
    					ConsumerArn:       consumerARN,
    					AwsRoleArn:        awsRoleARN,
    					GcpServiceAccount: gcpServiceAccount,
    				},
    			},
    		},
    	}
    	updateReq := &pubsubpb.UpdateTopicRequest{
    		Topic: pbTopic,
    		UpdateMask: &fieldmaskpb.FieldMask{
    			Paths: []string{"ingestion_data_source_settings"},
    		},
    	}
    	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
    	if err != nil {
    		return fmt.Errorf("topic.Update: %w", err)
    	}
    	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
    	return nil
    }
    

    Java

    このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

    
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.protobuf.FieldMask;
    import com.google.pubsub.v1.IngestionDataSourceSettings;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import com.google.pubsub.v1.UpdateTopicRequest;
    import java.io.IOException;
    
    public class UpdateTopicTypeExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
        // Kinesis ingestion settings.
        String streamArn = "stream-arn";
        String consumerArn = "consumer-arn";
        String awsRoleArn = "aws-role-arn";
        String gcpServiceAccount = "gcp-service-account";
    
        UpdateTopicTypeExample.updateTopicTypeExample(
            projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
      }
    
      public static void updateTopicTypeExample(
          String projectId,
          String topicId,
          String streamArn,
          String consumerArn,
          String awsRoleArn,
          String gcpServiceAccount)
          throws IOException {
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          TopicName topicName = TopicName.of(projectId, topicId);
    
          IngestionDataSourceSettings.AwsKinesis awsKinesis =
              IngestionDataSourceSettings.AwsKinesis.newBuilder()
                  .setStreamArn(streamArn)
                  .setConsumerArn(consumerArn)
                  .setAwsRoleArn(awsRoleArn)
                  .setGcpServiceAccount(gcpServiceAccount)
                  .build();
          IngestionDataSourceSettings ingestionDataSourceSettings =
              IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();
    
          // Construct the topic with Kinesis ingestion settings.
          Topic topic =
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build();
    
          // Construct a field mask to indicate which field to update in the topic.
          FieldMask updateMask =
              FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();
    
          UpdateTopicRequest request =
              UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();
    
          Topic response = topicAdminClient.updateTopic(request);
    
          System.out.println(
              "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
        }
      }
    }

    Node.js

    このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    // const awsRoleArn = 'arn:aws:iam:...';
    // const gcpServiceAccount = 'ingestion-account@...';
    // const streamArn = 'arn:aws:kinesis:...';
    // const consumerArn = 'arn:aws:kinesis:...';
    
    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function updateTopicIngestionType(
      topicNameOrId,
      awsRoleArn,
      gcpServiceAccount,
      streamArn,
      consumerArn,
    ) {
      const metadata = {
        ingestionDataSourceSettings: {
          awsKinesis: {
            awsRoleArn,
            gcpServiceAccount,
            streamArn,
            consumerArn,
          },
        },
      };
    
      await pubSubClient.topic(topicNameOrId).setMetadata(metadata);
    
      console.log('Topic updated with Kinesis source successfully.');
    }

    Python

    このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

    from google.cloud import pubsub_v1
    from google.pubsub_v1.types import Topic
    from google.pubsub_v1.types import IngestionDataSourceSettings
    from google.pubsub_v1.types import UpdateTopicRequest
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    # stream_arn = "your-stream-arn"
    # consumer_arn = "your-consumer-arn"
    # aws_role_arn = "your-aws-role-arn"
    # gcp_service_account = "your-gcp-service-account"
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    update_request = UpdateTopicRequest(
        topic=Topic(
            name=topic_path,
            ingestion_data_source_settings=IngestionDataSourceSettings(
                aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                    stream_arn=stream_arn,
                    consumer_arn=consumer_arn,
                    aws_role_arn=aws_role_arn,
                    gcp_service_account=gcp_service_account,
                )
            ),
        ),
        update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
    )
    
    topic = publisher.update_topic(request=update_request)
    print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")
    
    

    C++

    このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。

    namespace pubsub = ::google::cloud::pubsub;
    namespace pubsub_admin = ::google::cloud::pubsub_admin;
    [](pubsub_admin::TopicAdminClient client, std::string project_id,
       std::string topic_id, std::string stream_arn, std::string consumer_arn,
       std::string aws_role_arn, std::string gcp_service_account) {
      google::pubsub::v1::UpdateTopicRequest request;
    
      request.mutable_topic()->set_name(
          pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
      auto* aws_kinesis = request.mutable_topic()
                              ->mutable_ingestion_data_source_settings()
                              ->mutable_aws_kinesis();
      aws_kinesis->set_stream_arn(stream_arn);
      aws_kinesis->set_consumer_arn(consumer_arn);
      aws_kinesis->set_aws_role_arn(aws_role_arn);
      aws_kinesis->set_gcp_service_account(gcp_service_account);
      *request.mutable_update_mask()->add_paths() =
          "ingestion_data_source_settings";
    
      auto topic = client.UpdateTopic(request);
      if (!topic) throw std::move(topic).status();
    
      std::cout << "The topic was successfully updated: " << topic->DebugString()
                << "\n";
    }

    Node.js

    このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    // const awsRoleArn = 'arn:aws:iam:...';
    // const gcpServiceAccount = 'ingestion-account@...';
    // const streamArn = 'arn:aws:kinesis:...';
    // const consumerArn = 'arn:aws:kinesis:...';
    
    // Imports the Google Cloud client library
    import {PubSub, TopicMetadata} from '@google-cloud/pubsub';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function updateTopicIngestionType(
      topicNameOrId: string,
      awsRoleArn: string,
      gcpServiceAccount: string,
      streamArn: string,
      consumerArn: string,
    ) {
      const metadata: TopicMetadata = {
        ingestionDataSourceSettings: {
          awsKinesis: {
            awsRoleArn,
            gcpServiceAccount,
            streamArn,
            consumerArn,
          },
        },
      };
    
      await pubSubClient.topic(topicNameOrId).setMetadata(metadata);
    
      console.log('Topic updated with Kinesis source successfully.');
    }

ARN の詳細については、Amazon リソース名(ARN)IAM 識別子をご覧ください。

標準トピックを Cloud Storage インポート トピックに変換する

標準トピックを Cloud Storage インポート トピックに変換するには、まず、すべての前提条件を満たしていることを確認します。

コンソール

  1. Google Cloud コンソールで、[トピック] ページに移動します。

    [トピック] に移動

  2. Cloud Storage インポート トピックに変換するトピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. [取り込みを有効にする] オプションを選択します。

  5. 取り込み元として [Google Cloud Storage] を選択します。

  6. Cloud Storage バケットの場合は、[参照] をクリックします。

    [バケットの選択] ページが開きます。次のオプションのいずれかを選択します。

    • 適切なプロジェクトから既存のバケットを選択します。

    • 作成アイコンをクリックし、画面の指示に沿って新しいバケットを作成します。バケットを作成したら、Cloud Storage インポート トピックのバケットを選択します。

  7. バケットを指定すると、Pub/Sub は Pub/Sub サービス アカウントのバケットに対する適切な権限を確認します。権限の問題がある場合は、権限に関連するエラー メッセージが表示されます。

    権限の問題が発生した場合は、[権限を設定] をクリックします。詳細については、 Pub/Sub サービス アカウントに Cloud Storage 権限を付与するをご覧ください。

  8. [オブジェクト形式] で、[テキスト]、[Avro]、または [Pub/Sub Avro] を選択します。

    [テキスト] を選択した場合は、必要に応じて、オブジェクトをメッセージに分割する 区切り文字を指定できます。

    これらのオプションの詳細については、入力形式をご覧ください。

  9. 省略可。トピックの最短のオブジェクト作成時間を指定できます。設定すると、最短のオブジェクト作成時間以降に作成されたオブジェクトのみが取り込まれます。

    詳細については、 最短のオブジェクト作成時間をご覧ください。

  10. Glob パターンを指定する必要があります。バケット内のすべてのオブジェクトを取り込むには、glob パターンとして ** を使用します。指定されたパターンに一致するオブジェクトのみが取り込まれます。

    詳細については、 グロブ パターンを照合するをご覧ください。

  11. 他のデフォルト設定はそのままにします。
  12. [トピックを更新] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. インポート トピックの設定が失われないように、トピックを更新するたびにすべての設定を含めるようにしてください。省略すると、Pub/Sub は設定を元のデフォルト値にリセットします。

    次のサンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    次のように置き換えます。

    • TOPIC_ID はトピック ID または名前です。このフィールドは更新できません。

    • BUCKET_NAME: 既存のバケットの名前を指定します。例: prod_bucketバケット名にプロジェクト ID を含めてはいけません。バケットを作成するには、バケットを作成するをご覧ください。

    • INPUT_FORMAT: 取り込まれるオブジェクトの形式を指定します。textavropubsub_avro のいずれかになります。これらのオプションの詳細については、入力形式をご覧ください。

    • TEXT_DELIMITER: テキスト オブジェクトを Pub/Sub メッセージに分割する区切り文字を指定します。これは 1 文字にする必要があり、INPUT_FORMATtext の場合にのみ設定する必要があります。デフォルトは改行文字(\n)です。

      gcloud CLI を使用して区切り文字を指定する場合は、改行 \n などの特殊文字の処理に注意してください。区切り文字が正しく解釈されるように、'\n' 形式を使用します。引用符やエスケープなしで \n を使用すると、区切り文字は "n" になります。

    • MINIMUM_OBJECT_CREATE_TIME: オブジェクトが取り込まれるために作成された最小時間を指定します。UTC で YYYY-MM-DDThh:mm:ssZ の形式にする必要があります。例: 2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z から 9999-12-31T23:59:59Z までの任意の日付(過去または未来)が有効です。

    • MATCH_GLOB: オブジェクトを取り込むために照合する glob パターンを指定します。gcloud CLI を使用している場合、* 文字を含む一致グロブでは、* 文字を \*\*.txt の形式でエスケープするか、一致グロブ全体を引用符 "**.txt" または '**.txt' で囲む必要があります。glob パターンでサポートされている構文については、 Cloud Storage のドキュメントをご覧ください。