Cloud Storage 取り込みを使用してトピックを作成する

Cloud Storage バケットから取り込む Pub/Sub トピックを作成する

もっと見る

このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。

コードサンプル

C++

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

このサンプルを試す前に、C# Pub/Sub クイックスタート: クライアント ライブラリの使用にある設定手順に従ってください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。


using Google.Cloud.PubSub.V1;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using System;

public class CreateTopicWithCloudStorageIngestionSample
{
    public Topic CreateTopicWithCloudStorageIngestion(string projectId, string topicId, string bucket, string inputFormat, string textDelimiter, string matchGlob, DateTimeOffset minimumObjectCreateTime)
    {

        IngestionDataSourceSettings.Types.CloudStorage cloudStorageSettings = new IngestionDataSourceSettings.Types.CloudStorage
        {
            Bucket = bucket,
            MinimumObjectCreateTime = Timestamp.FromDateTimeOffset(minimumObjectCreateTime),
        };

        switch (inputFormat)
        {
            case "text":
                cloudStorageSettings.TextFormat = new IngestionDataSourceSettings.Types.CloudStorage.Types.TextFormat
                {
                    Delimiter = textDelimiter
                };
                break;
            case "avro":
                cloudStorageSettings.AvroFormat = new IngestionDataSourceSettings.Types.CloudStorage.Types.AvroFormat();
                break;
            case "pubsub_avro":
                cloudStorageSettings.PubsubAvroFormat = new IngestionDataSourceSettings.Types.CloudStorage.Types.PubSubAvroFormat();
                break;
            default:
                throw new RpcException(new Status(StatusCode.InvalidArgument, $"inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: {inputFormat}"));
        }

        if (!string.IsNullOrEmpty(matchGlob))
        {
            cloudStorageSettings.MatchGlob = matchGlob;
        }

        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        Topic topic = new Topic()
        {
            Name = TopicName.FormatProjectTopic(projectId, topicId),
            IngestionDataSourceSettings = new IngestionDataSourceSettings() { CloudStorage = cloudStorageSettings }
        };
        Topic createdTopic = publisher.CreateTopic(topic);
        Console.WriteLine($"Topic {createdTopic.Name} created.");

        return createdTopic;
    }
}

Go

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。Go詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/timestamppb"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime, delimiter string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"
	// delimiter := ","

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	topicpb := &pubsubpb.Topic{
		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_CloudStorage_{
				CloudStorage: &pubsubpb.IngestionDataSourceSettings_CloudStorage{
					Bucket: bucket,
					// Alternatively, can be Avro or PubSubAvro formats. See
					InputFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
						TextFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat{
							Delimiter: &delimiter,
						},
					},
					MatchGlob:               matchGlob,
					MinimumObjectCreateTime: timestamppb.New(minCreateTime),
				},
			},
		},
	}
	t, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

このサンプルを試す前に、Java Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

PHP

このサンプルを試す前に、PHPPub/Sub クイックスタート: クライアント ライブラリの使用にある PHP 向けの手順に従って設定を行ってください。詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\IngestionDataSourceSettings\CloudStorage\AvroFormat;
use Google\Cloud\PubSub\V1\IngestionDataSourceSettings\CloudStorage\PubSubAvroFormat;
use Google\Cloud\PubSub\V1\IngestionDataSourceSettings\CloudStorage\TextFormat;
use Google\Protobuf\Timestamp;

/**
 * Creates a topic with Cloud Storage Ingestion.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $bucket  Cloud Storage bucket.
 * @param string $inputFormat  Input format for the Cloud Storage data. Must be one of text, avro, or pubsub_avro.
 * @param string $minimumObjectCreatedTime  Only objects with a larger or equal creation timestamp will be ingested.
 * @param string $textDelimiter  Delimiter for text format input.
 * @param string $matchGlob  Glob pattern used to match objects that will be ingested. If unset, all objects will be ingested.
 */
function create_topic_with_cloud_storage_ingestion(
    string $projectId,
    string $topicName,
    string $bucket,
    string $inputFormat,
    string $minimumObjectCreatedTime,
    string $textDelimiter = '',
    string $matchGlob = ''
): void {
    $datetime = new \DateTimeImmutable($minimumObjectCreatedTime);
    $timestamp = (new Timestamp())
        ->setSeconds($datetime->getTimestamp())
        ->setNanos($datetime->format('u') * 1000);

    $cloudStorageData = [
        'bucket' => $bucket,
        'minimum_object_create_time' => $timestamp
    ];

    $cloudStorageData[$inputFormat . '_format'] = match($inputFormat) {
        'text' => new TextFormat(['delimiter' => $textDelimiter]),
        'avro' => new AvroFormat(),
        'pubsub_avro' => new PubSubAvroFormat(),
        default => throw new \InvalidArgumentException(
            'inputFormat must be in (\'text\', \'avro\', \'pubsub_avro\'); got value: ' . $inputFormat
        )
    };

    if (!empty($matchGlob)) {
        $cloudStorageData['match_glob'] = $matchGlob;
    }

    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->createTopic($topicName, [
        'ingestionDataSourceSettings' => [
            'cloud_storage' => $cloudStorageData
        ]
    ]);

    printf('Topic created: %s' . PHP_EOL, $topic->name());
}

Ruby

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Ruby 向けの手順に従って設定を行ってください。Ruby詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

# topic_id = "your-topic-id"
# bucket = "your-bucket-id"
# input_format = "text"
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = Google::Protobuf::Timestamp.new

pubsub = Google::Cloud::Pubsub.new
topic_admin = pubsub.topic_admin
cloud_storage =
  Google::Cloud::PubSub::V1::IngestionDataSourceSettings::CloudStorage

settings = cloud_storage.new \
  bucket: bucket,
  minimum_object_create_time: minimum_object_create_time

case input_format
when "text"
  settings.text_format = cloud_storage::TextFormat.new \
    delimiter: text_delimiter
when "avro"
  settings.avro_format = cloud_storage::AvroFormat.new
when "pubsub_avro"
  settings.pubsub_avro_format = cloud_storage::PubSubAvroFormat.new
else
  raise "input_format must be in ('text', 'avro', 'pubsub_avro');" \
        "got value: #{input_format}"
end

if !match_glob.nil? && !match_glob.empty?
  settings.match_glob = match_glob
end

topic = topic_admin.create_topic name: pubsub.topic_path(topic_id),
                                 ingestion_data_source_settings: {
                                   cloud_storage: settings
                                 }

puts "Topic with Cloud Storage Ingestion #{topic.name} created."

次のステップ

他の Google Cloud プロダクトのコードサンプルを検索およびフィルタするには、Google Cloud サンプル ブラウザをご覧ください。