コネクタを表示する

コネクタの詳細を表示すると、構成、運用状態、タスクの再起動ポリシーを確認し、パフォーマンス指標をモニタリングできます。

Connect クラスタ内のコネクタの詳細を表示するには、 Google Cloud コンソール、gcloud CLI、Managed Service for Apache Kafka クライアント ライブラリ、または Managed Kafka API を使用します。オープンソースの Apache Kafka API を使用してコネクタを表示することはできません。

コネクタの表示に必要なロールと権限

コネクタを表示するために必要な権限を取得するには、プロジェクトに対する Managed Kafka 閲覧者 roles/managedkafka.viewer)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、コネクタの表示に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

コネクタを表示するには、次の権限が必要です。

  • 親 Connect クラスタでリスト コネクタの権限を付与します。 managedkafka.connectors.list
  • 親 Connect クラスタに対するコネクタの詳細を取得する権限を付与します。 managedkafka.connectors.get

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka 閲覧者ロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

コネクタの詳細を表示する

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. 表示するコネクタをホストする Connect クラスタをクリックします。

    [クラスタの詳細を接続] ページが表示されます。

  3. [リソース] タブで、リストからコネクタを見つけて名前をクリックします。[コネクタの詳細] ページにリダイレクトされます。

  4. [コネクタの詳細] ページには、次のタブが表示されます。

    • 構成: コネクタの構成が表示されます。以下が含まれます。
      • 名前: コネクタの名前。
      • 状態: コネクタの動作状態。たとえば、Running。
      • タスクの再起動ポリシー: 失敗したタスクを再起動するためのポリシー。たとえば、失敗したタスクを再起動するかどうか、どのバックオフ設定を使用するかなどです。
      • 構成プロパティ: コネクタの構成を定義する Key-Value ペアのリスト。
    • モニタリング: コネクタのモニタリング用のグラフを提供します。例:
      • タスク エラー数: エラーが発生したタスクの数。
      • アクティブなタスク数: 現在アクティブなタスクの数。

    このページには、コネクタの編集削除一時停止停止再起動を行うボタンも含まれています。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. コネクタの説明を取得するには、gcloud managed-kafka connectors describe コマンドを使用します。

    gcloud managed-kafka connectors describe CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    次のように置き換えます。

    • CONNECTOR_ID: 必須。説明するコネクタの ID。
    • LOCATION: 必須。コネクタを含む Connect クラスタのロケーション。
    • CONNECT_CLUSTER_ID: 必須。コネクタを含む Connect クラスタの ID。
  3. コマンドの例:

    gcloud managed-kafka connectors describe test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster
    

    出力例:

    config:
      connector.class: com.google.cloud.kafka.connect.pubsub.PubsubSinkConnector
      kafka.topic.regex: .*
      key.converter: org.apache.kafka.connect.storage.StringConverter
      project: test-project
      tasks.max: '1'
      topic: test-pubsub-topic
      value.converter: org.apache.kafka.connect.json.JsonConverter
      value.converter.schemas.enable: 'false'
    createTime: '2024-03-13T05:17:34.123456Z'
    labels:
      test-label-key: test-label-value
    name: projects/test-project/locations/us-central1/connectClusters/test-connect-cluster/connectors/test-connector
    state: RUNNING
    taskRestartPolicy: RESTART_WITH_EXPONENTIAL_BACKOFF
    updateTime: '2024-03-13T05:18:15.987654Z'
    

    Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func getConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.GetConnectorRequest{
    		Name: connectorPath,
    	}
    	connector, err := client.GetConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.GetConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Got connector: %#v\n", connector)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    
    public class GetConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        getConnector(projectId, region, clusterId, connectorId);
      }
    
      public static void getConnector(
          String projectId, String region, String clusterId, String connectorId) throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          ConnectorName name = ConnectorName.of(projectId, region, clusterId, connectorId);
          // This operation is handled synchronously.
          Connector connector = managedKafkaConnectClient.getConnector(name);
          System.out.println(connector.getAllFields());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.getConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import NotFound
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    connector_path = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    request = managedkafka_v1.GetConnectorRequest(
        name=connector_path,
    )
    
    try:
        connector = connect_client.get_connector(request=request)
        print("Got connector:", connector)
    except NotFound as e:
        print(f"Failed to get connector {connector_id} with error: {e}")
    

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。