Pub/Sub Sink コネクタを作成する

Pub/Sub Sink コネクタは、Kafka トピックから Pub/Sub トピックにメッセージをストリーミングします。これにより、Kafka ベースのアプリケーションを Pub/Sub と統合し、イベント ドリブン アーキテクチャとリアルタイム データ処理を容易に実現できます。

始める前に

Pub/Sub Sink コネクタを作成する前に、次のことを確認してください。

必要なロールと権限

Pub/Sub シンク コネクタの作成に必要な権限を取得するには、Connect クラスタを含むプロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

これらの事前定義ロールには、Pub/Sub Sink コネクタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

Pub/Sub シンク コネクタを作成するには、次の権限が必要です。

  • 親 Connect クラスタでコネクタの作成権限を付与します。 managedkafka.connectors.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka Connector 編集者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

Managed Service for Apache Kafka クラスタが Connect クラスタと同じプロジェクトにある場合、追加の権限は必要ありません。Connect クラスタが別のプロジェクトにある場合は、別のプロジェクトに Connect クラスタを作成するをご覧ください。

Pub/Sub トピックにパブリッシュする権限を付与する

Connect クラスタ サービス アカウント(形式は service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com)には、Pub/Sub トピックにメッセージをパブリッシュする権限が必要です。これを行うには、Pub/Sub トピックを含むプロジェクトの Connect クラスタ サービス アカウントに Pub/Sub パブリッシャー ロールroles/pubsub.publisher)を付与します。

Pub/Sub シンクコネクタの仕組み

Pub/Sub シンクコネクタは、1 つ以上の Kafka トピックからメッセージを pull し、Pub/Sub トピックにパブリッシュします。

Pub/Sub シンクコネクタがデータをコピーする仕組みの詳細を以下に示します。

  • コネクタは、ソースクラスタ内の 1 つ以上の Kafka トピックからメッセージを消費します。

  • コネクタは、cps.topic 構成プロパティを使用して指定されたターゲット Pub/Sub トピック ID にメッセージを書き込みます。これは必須プロパティです。

  • また、コネクタでは、cps.project 構成プロパティを使用して、Pub/Sub トピックを含む Google Cloud プロジェクトを指定する必要があります。これは必須プロパティです。

  • コネクタは、cps.endpoint プロパティを使用して指定されたカスタム Pub/Sub エンドポイントを必要に応じて使用することもできます。デフォルトのエンドポイントは "pubsub.googleapis.com:443" です。

  • パフォーマンスを最適化するため、コネクタはメッセージをバッファリングしてから Pub/Sub に公開します。maxBufferSizemaxBufferBytesmaxDelayThresholdMsmaxOutstandingRequestBytesmaxOutstandingMessages を構成して、バッファリングを制御できます。

  • Kafka レコードには、ヘッダー、キー、値の 3 つのコンポーネントがあります。コネクタは、キーと値のコンバータを使用して、Kafka メッセージ データを Pub/Sub で想定される形式に変換します。構造体またはマップ値スキーマを使用する場合、messageBodyName プロパティは、Pub/Sub メッセージ本文として使用するフィールドまたはキーを指定します。

  • コネクタは、metadata.publish プロパティを true に設定することで、Kafka トピック、パーティション、オフセット、タイムスタンプをメッセージ属性として含めることができます。

  • コネクタは、headers.publish プロパティを true に設定することで、Kafka メッセージ ヘッダーを Pub/Sub メッセージ属性として含めることができます。

  • コネクタは、orderingKeySource プロパティを使用して Pub/Sub メッセージの順序指定キーを含めることができます。値のオプションには、"none"(デフォルト)、"key""partition" があります。

  • tasks.max プロパティは、コネクタの並列処理のレベルを制御します。tasks.max を増やすとスループットを改善できますが、実際の並列処理は Kafka トピックのパーティション数によって制限されます。

Pub/Sub シンクコネクタのプロパティ

Pub/Sub シンクコネクタを作成するときは、次のプロパティを指定する必要があります。

コネクタ名

Connect クラスタ内のコネクタの一意の名前。リソースの命名ガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。

コネクタ プラグインのタイプ

コネクタ プラグインのタイプとして [Pub/Sub シンク] を選択します。これにより、Kafka から Pub/Sub へのデータフローの方向と、使用される特定のコネクタ実装が決まります。ユーザー インターフェースを使用してコネクタを構成しない場合は、コネクタ クラスも指定する必要があります。

Kafka トピック

コネクタがメッセージを消費する Kafka トピック。1 つ以上のトピックを指定することも、正規表現を使用して複数のトピックを照合することもできます。たとえば、topic.* は「topic」で始まるすべてのトピックに一致します。これらのトピックは、Connect クラスタに関連付けられている Managed Service for Apache Kafka クラスタ内に存在する必要があります。

Pub/Sub トピック

コネクタがメッセージをパブリッシュする既存の Pub/Sub トピック。始める前にで説明したように、Connect クラスタのサービス アカウントにトピックのプロジェクトに対する roles/pubsub.publisher ロールがあることを確認します。

構成

このセクションでは、コネクタ固有の追加の構成プロパティを指定できます。

Kafka トピックのデータは、Avro、JSON、未加工のバイトなど、さまざまな形式で指定できるため、構成の重要な部分としてコンバータの指定があります。コンバータは、Kafka トピックで使用される形式のデータを Kafka Connect の標準化された内部形式に変換します。Pub/Sub シンク コネクタは、この内部データを取得し、Pub/Sub で必要な形式に変換してから書き込みます。

Kafka Connect のコンバータの役割、サポートされているコンバータのタイプ、一般的な構成オプションの詳細については、コンバータをご覧ください。

Pub/Sub シンクコネクタに固有の構成を次に示します。

  • cps.project: Pub/Sub トピックを含む Google Cloud プロジェクト ID を指定します。

  • cps.topic: データの公開先の Pub/Sub トピックを指定します。

  • cps.endpoint: 使用する Pub/Sub エンドポイントを指定します。

このコネクタに固有の使用可能な構成プロパティのリストについては、Pub/Sub シンク コネクタの構成をご覧ください。

Pub/Sub シンクコネクタを作成する

コネクタを作成する前に、Pub/Sub シンクコネクタのプロパティのドキュメントを確認してください。

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. コネクタを作成する Connect クラスタをクリックします。

    [クラスタの詳細を接続] ページが表示されます。

  3. [コネクタを作成] をクリックします。

    [Kafka コネクタの作成] ページが表示されます。

  4. コネクタ名には文字列を入力します。

    コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。

  5. [コネクタ プラグイン] で、[Pub/Sub シンク] を選択します。

  6. [トピック] で、[Kafka トピックのリストを選択する] または [トピックの正規表現を使用する] を選択します。次に、このコネクタがメッセージを消費する Kafka トピックを選択または入力します。これらのトピックは、関連付けられた Kafka クラスタにあります。

  7. [Cloud Pub/Sub トピックを選択してください] で、このコネクタがメッセージをパブリッシュする Pub/Sub トピックを選択します。トピックは、完全なリソース名形式(projects/{project}/topics/{topic})で表示されます。

  8. (省略可)[構成] セクションで追加の設定を行います。前のセクションで説明したように、tasks.maxkey.convertervalue.converter などのプロパティを指定します。

  9. [タスクの再起動ポリシー] を選択します。詳細については、タスクの再起動ポリシーをご覧ください。

  10. [作成] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka connectors create コマンドを実行します。

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    次のように置き換えます。

    • CONNECTOR_ID: コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。

    • LOCATION: コネクタを作成するロケーション。これは、Connect クラスタを作成したロケーションと同じである必要があります。

    • CONNECT_CLUSTER_ID: コネクタが作成される Connect クラスタの ID。

    • CONFIG_FILE: BigQuery Sink コネクタの YAML 構成ファイルへのパス。

    Pub/Sub Sink コネクタの構成ファイルの例を次に示します。

    connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
    name: "CPS_SINK_CONNECTOR_ID"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    value.converter: "org.apache.kafka.connect.storage.StringConverter"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    cps.topic: "CPS_TOPIC_ID"
    cps.project: "GCP_PROJECT_ID"
    

    次のように置き換えます。

    • CPS_SINK_CONNECTOR_ID: Pub/Sub Sink コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。

    • GMK_TOPIC_ID: Pub/Sub Sink コネクタがデータを読み取る Managed Service for Apache Kafka トピックの ID。

    • CPS_TOPIC_ID: データの公開先の Pub/Sub トピックの ID。

    • GCP_PROJECT_ID: Pub/Sub トピックが存在する Google Cloudプロジェクトの ID。

  3. Terraform

    Terraform リソースを使用してコネクタを作成できます。

    resource "google_managed_kafka_connector" "example-pubsub-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-pubsub-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class" = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
        "name"            = "my-pubsub-sink-connector"
        "tasks.max"       = "3"
        "topics"          = "TOPIC_NAME"
        "cps.topic"       = "CPS_TOPIC_NAME"
        "cps.project"     = "CPS_PROJECT_NAME"
        "value.converter" = "org.apache.kafka.connect.storage.StringConverter"
        "key.converter"   = "org.apache.kafka.connect.storage.StringConverter"
      }
    
      provider = google-beta
    }

    Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

    Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createPubSubSinkConnector creates a Pub/Sub Sink connector.
    func createPubSubSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, valueConverter, keyConverter, cpsTopic, cpsProject, tasksMax string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "CPS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// valueConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// cpsTopic := "CPS_TOPIC_ID"
    	// cpsProject := "GCP_PROJECT_ID"
    	// tasksMax := "3"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// Pub/Sub Sink sample connector configuration
    	config := map[string]string{
    		"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
    		"name":            connectorID,
    		"tasks.max":       tasksMax,
    		"topics":          topics,
    		"value.converter": valueConverter,
    		"key.converter":   keyConverter,
    		"cps.topic":       cpsTopic,
    		"cps.project":     cpsProject,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Pub/Sub sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreatePubSubSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-pubsub-sink-connector";
        String pubsubProjectId = "my-pubsub-project-id";
        String pubsubTopicName = "my-pubsub-topic";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector";
        String maxTasks = "3";
        String valueConverter = "org.apache.kafka.connect.storage.StringConverter";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createPubSubSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            pubsubProjectId,
            pubsubTopicName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            valueConverter,
            keyConverter);
      }
    
      public static void createPubSubSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String pubsubProjectId,
          String pubsubTopicName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String valueConverter,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("value.converter", valueConverter);
        configMap.put("key.converter", keyConverter);
        configMap.put("cps.topic", pubsubTopicName);
        configMap.put("cps.project", pubsubProjectId);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Pub/Sub Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "topics": topics,
        "value.converter": value_converter,
        "key.converter": key_converter,
        "cps.topic": cps_topic,
        "cps.project": cps_project,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

コネクタを作成した後は、コネクタの編集、削除、一時停止、停止、再起動を行うことができます。

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。