Pub/Sub ソース コネクタを作成する

Pub/Sub ソースコネクタは、Pub/Sub から Kafka にメッセージをストリーミングします。これにより、Pub/Sub を Kafka ベースのアプリケーションやデータ パイプラインと統合できます。

コネクタは、Pub/Sub サブスクリプションからメッセージを読み取り、各メッセージを Kafka レコードに変換して、Kafka トピックにレコードを書き込みます。デフォルトでは、コネクタは次のように Kafka レコードを作成します。

  • Kafka レコードキーは null です。
  • Kafka レコード値は、Pub/Sub メッセージ データ(バイト単位)です。
  • Kafka レコード ヘッダーが空です。

ただし、この動作は構成できます。詳細については、コネクタを構成するをご覧ください。

始める前に

Pub/Sub ソースコネクタを作成する前に、次のことを確認してください。

必要なロールと権限

Pub/Sub ソース コネクタの作成に必要な権限を取得するには、Connect クラスタを含むプロジェクトに対する Managed Kafka Connector 編集者 roles/managedkafka.connectorEditor)IAM ロールの付与を管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、Pub/Sub Source コネクタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

Pub/Sub ソース コネクタを作成するには、次の権限が必要です。

  • 親 Connect クラスタでコネクタの作成権限を付与します。 managedkafka.connectors.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka コネクタ編集者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

Managed Service for Apache Kafka クラスタが Connect クラスタと同じプロジェクトにある場合、追加の権限は必要ありません。Connect クラスタが別のプロジェクトにある場合は、別のプロジェクトに Connect クラスタを作成するをご覧ください。

Pub/Sub から読み取る権限を付与する

マネージド Kafka サービス アカウントには、Pub/Sub サブスクリプションからメッセージを読み取る権限が必要です。Pub/Sub サブスクリプションを含むプロジェクトのサービス アカウントに次の IAM ロールを付与します。

  • Pub/Sub サブスクライバーroles/pubsub.subscriber
  • Pub/Sub 閲覧者roles/pubsub.viewer

マネージド Kafka サービス アカウントの形式は service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com です。PROJECT_NUMBER は、プロジェクト番号に置き換えます。

Pub/Sub ソースコネクタを作成する

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. コネクタを作成する Connect クラスタをクリックします。

  3. [コネクタを作成] をクリックします。

  4. コネクタ名には文字列を入力します。

    コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。

  5. [コネクタ プラグイン] で、[Pub/Sub ソース] を選択します。

  6. [Cloud Pub/Sub サブスクリプション] リストで、Pub/Sub サブスクリプションを選択します。コネクタはこのサブスクリプションからメッセージを pull します。サブスクリプションは、projects/{project}/subscriptions/{subscription} のように完全なリソース名で表示されます。

  7. [Kafka トピック] リストで、メッセージが書き込まれる Kafka トピックを選択します。

  8. 省略可: [構成] ボックスで、構成プロパティを追加するか、デフォルトのプロパティを編集します。詳細については、コネクタを構成するをご覧ください。

  9. [タスクの再起動ポリシー] を選択します。詳細については、タスクの再起動ポリシーをご覧ください。

  10. [作成] をクリックします。

gcloud

  1. gcloud managed-kafka connectors create コマンドを実行します。

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    次のように置き換えます。

    • CONNECTOR_ID: コネクタの ID または名前。コネクタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。コネクタの名前は変更できません。

    • LOCATION: Connect クラスタのロケーション。

    • CONNECT_CLUSTER_ID: コネクタが作成される Connect クラスタの ID。

    • CONFIG_FILE: YAML または JSON 構成ファイルへのパス。

構成ファイルの例を次に示します。

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

次のように置き換えます。

  • PROJECT_ID: Pub/Sub サブスクリプションが存在する Google Cloudプロジェクトの ID。

  • PUBSUB_SUBSCRIPTION_ID: データの pull 元となる Pub/Sub サブスクリプションの ID。

  • KAFKA_TOPIC_ID: データが書き込まれる Kafka トピックの ID。

cps.projectcps.subscriptionkafka.topic 構成プロパティは必須です。その他の構成オプションについては、コネクタを構成するをご覧ください。

Terraform

Terraform リソースを使用してコネクタを作成できます。

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

Go

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

コネクタを作成した後は、コネクタの編集、削除、一時停止、停止、再起動を行うことができます。

コネクタを構成する

このセクションでは、コネクタで設定できる構成プロパティについて説明します。

このコネクタに固有のプロパティの一覧については、Pub/Sub ソースコネクタの構成をご覧ください。

プルモード

pull モードでは、コネクタが Pub/Sub メッセージを取得する方法を指定します。次のモードがサポートされています。

  • プルモード(デフォルト)。メッセージはバッチで取得されます。このモードを有効にするには、cps.streamingPull.enabled=false. を設定します。バッチサイズを構成するには、cps.maxBatchSize プロパティを設定します。

    プルモードの詳細については、Pull API をご覧ください。

  • ストリーミング プルモード。Pub/Sub からメッセージを取得する際に、最大スループットと最小レイテンシを実現します。このモードを有効にするには、cps.streamingPull.enabled=true を設定します。

    ストリーミング プルモードの詳細については、StreamingPull API をご覧ください。

    ストリーミング プルが有効になっている場合は、次の構成プロパティを設定してパフォーマンスを調整できます。

    • cps.streamingPull.flowControlBytes: タスクあたりの未処理メッセージ バイトの最大数。
    • cps.streamingPull.flowControlMessages: タスクあたりの未処理メッセージの最大数。
    • cps.streamingPull.maxAckExtensionMs: コネクタがサブスクライブの期限を延長する最大時間(ミリ秒単位)。
    • cps.streamingPull.maxMsPerAckExtension: コネクタが拡張ごとにサブスクライブの期限を延長する最大時間(ミリ秒単位)。
    • cps.streamingPull.parallelStreams: サブスクリプションからメッセージを pull するストリームの数。

Pub/Sub エンドポイント

デフォルトでは、コネクタはグローバル Pub/Sub エンドポイントを使用します。エンドポイントを指定するには、cps.endpoint プロパティをエンドポイント アドレスに設定します。エンドポイントの詳細については、Pub/Sub エンドポイントをご覧ください。

Kafka レコード

Pub/Sub ソースコネクタは、Pub/Sub メッセージを Kafka レコードに変換します。以降のセクションでは、変換プロセスについて説明します。

レコードキー

キー コンバータは org.apache.kafka.connect.storage.StringConverter である必要があります。

  • デフォルトでは、レコードキーは null です。

  • Pub/Sub メッセージ属性をキーとして使用するには、kafka.key.attribute を属性の名前に設定します。例: kafka.key.attribute=username

  • Pub/Sub の順序キーをキーとして使用するには、kafka.key.attribute=orderingKey を設定します。

レコード ヘッダー

デフォルトでは、レコード ヘッダーは空です。

kafka.record.headerstrue の場合、Pub/Sub メッセージ属性はレコード ヘッダーとして書き込まれます。順序指定キーを含めるには、cps.makeOrderingKeyAttribute=true を設定します。

レコード値

kafka.record.headerstrue の場合、または Pub/Sub メッセージにカスタム属性がない場合、レコード値はメッセージ データ(バイト配列)になります。値コンバータを org.apache.kafka.connect.converters.ByteArrayConverter に設定します。

それ以外の場合、kafka.record.headersfalse で、メッセージに 1 つ以上のカスタム属性がある場合、コネクタはレコード値を struct として書き込みます。値コンバータを org.apache.kafka.connect.json.JsonConverter に設定します。

struct には次のフィールドがあります。

  • message: Pub/Sub メッセージ データ(バイト単位)。

  • Pub/Sub メッセージ属性のフィールド。順序キーを含めるには、cps.makeOrderingKeyAttribute=true を設定します。

たとえば、メッセージに username 属性があるとします。

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

value.converter.schemas.enabletrue の場合、struct にはペイロードとスキーマの両方が含まれます。

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Kafka パーティション

デフォルトでは、コネクタはトピックの単一のパーティションに書き込みます。コネクタが書き込むパーティションの数を指定するには、kafka.partition.count プロパティを設定します。値はトピックのパーティション数を超えないようにする必要があります。

コネクタがパーティションにメッセージを割り当てる方法を指定するには、kafka.partition.scheme プロパティを設定します。詳細については、Pub/Sub ソース コネクタの構成をご覧ください。

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。