すべてのコネクタを一覧表示する

Connect クラスタで実行されているすべてのコネクタを一覧表示すると、構成されているデータ統合の概要を確認できます。コネクタの健全性とステータスをモニタリングし、潜在的な問題を特定して、データフローを効果的に管理できます。

Connect クラスタ内のすべてのコネクタを一覧表示するには、 Google Cloud コンソール、gcloud CLI、Managed Service for Apache Kafka クライアント ライブラリ、または Managed Kafka API を使用します。オープンソースの Apache Kafka API を使用してコネクタを一覧表示することはできません。

すべてのコネクタを一覧表示するために必要なロールと権限

すべてのコネクタを一覧表示するために必要な権限を取得するには、プロジェクトに対する Managed Kafka 閲覧者 roles/managedkafka.viewer)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、すべてのコネクタを一覧表示するために必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

すべてのコネクタを一覧表示するには、次の権限が必要です。

  • 親 Connect クラスタでリスト コネクタの権限を付与します。 managedkafka.connectors.list
  • 親 Connect クラスタに対するコネクタの詳細を取得する権限を付与します。 managedkafka.connectors.get

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka 閲覧者ロールの詳細については、 Managed Service for Apache Kafka の事前定義ロールをご覧ください。

すべてのコネクタを表示

このビューでは、コネクタのステータスをすばやくモニタリングし、注意が必要なコネクタを特定できます。必要に応じて、個々のコネクタをドリルダウンして、詳細と構成を表示できます。

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. コネクタの一覧を取得する Connect クラスタをクリックします。

    [クラスタの詳細を接続] ページが表示されます。

  3. [リソース] タブには、クラスタで実行されているすべてのコネクタのリストが表示されます。リストには、各コネクタに関する次の情報が含まれます。

    • 名前: コネクタの名前。
    • 状態: コネクタの動作状態。たとえば、Running、Failed などです。
    • コネクタの種類: コネクタ プラグインのタイプ。

    [フィルタ] オプションを使用すると、名前で特定のコネクタを検索できます。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka connectors list コマンドを使用して、コネクタを一覧表示します。

    gcloud managed-kafka connectors list CONNECT_CLUSTER_ID \
        --location=LOCATION
    
  3. コネクタのリストをさらに絞り込むには、次の追加のフラグを使用します。

    gcloud managed-kafka connectors list CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--filter=EXPRESSION] \
        [--limit=LIMIT] \
        [--page-size=PAGE_SIZE] \
        [--sort-by=SORT_BY]
    

    次のように置き換えます。

    • CONNECT_CLUSTER_ID: 必須。一覧表示するコネクタを含む Connect クラスタの ID。
    • LOCATION: 必須。一覧表示するコネクタを含む Connect クラスタのロケーション。
    • EXPRESSION:(省略可)リストに適用するブール値フィルタ式。式が True と評価された場合、そのアイテムはリストに含まれます。詳細と例については、gcloud topic filters を実行してください。

      例:

      • 「RUNNING」状態のコネクタのみを一覧表示するには:

        --filter="state=RUNNING"
        
      • 「Pub/Sub Sink」コネクタのみを一覧表示するには:

        --filter="connector_plugin='Pub/Sub Sink'"
        
      • 名前に「prod」を含むコネクタを一覧表示するには:

        --filter="name ~ 'prod'"
        
      • 「FAILED」または「Pub/Sub Source」プラグインのコネクタを一覧表示するには:

        --filter="state=FAILED OR connector_plugin='Pub/Sub Source'"
        
    • LIMIT: 省略可。表示するコネクタの最大数。指定しない場合は、すべてのコネクタが一覧表示されます。

    • PAGE_SIZE:(省略可)ページごとに表示する結果の数。指定しない場合、サービスは適切なページサイズを決定します。

    • SORT_BY:(省略可)並べ替えに使用するフィールドのカンマ区切りのリスト。デフォルトの並べ替え順は昇順です。降順で並べ替えるには、フィールドの前に ~ を付けます。サポートされているフィールドは namestate です。

  4. 並べ替えを含むコマンドの例:

    gcloud managed-kafka connectors list test-connect-cluster \
        --location=us-central1 \
        --sort-by=~state,name
    

    フィルタリングと制限を含むコマンドの例:

    gcloud managed-kafka connectors list test-connect-cluster \
        --location=us-central1 \
        --filter="state=RUNNING AND connector_plugin='Pub/Sub Sink'" \
        --limit=5
    

    出力例:

    NAME                                    STATE     CONNECTOR_PLUGIN
    pubsub-sink-connector                   RUNNING   Pub/Sub Sink
    another-pubsub-sink                     RUNNING   Pub/Sub Sink
    prod-pubsub-sink                        RUNNING   Pub/Sub Sink
    

    Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/iterator"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func listConnectors(w io.Writer, projectID, region, connectClusterID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    	req := &managedkafkapb.ListConnectorsRequest{
    		Parent: parent,
    	}
    	connectorIter := client.ListConnectors(ctx, req)
    	for {
    		connector, err := connectorIter.Next()
    		if err == iterator.Done {
    			break
    		}
    		if err != nil {
    			return fmt.Errorf("connectorIter.Next() got err: %w", err)
    		}
    		fmt.Fprintf(w, "Got connector: %v", connector)
    	}
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    
    public class ListConnectors {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        listConnectors(projectId, region, clusterId);
      }
    
      public static void listConnectors(String projectId, String region, String clusterId)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          ConnectClusterName parent = ConnectClusterName.of(projectId, region, clusterId);
          // This operation is handled synchronously.
          for (Connector connector : managedKafkaConnectClient.listConnectors(parent).iterateAll()) {
            System.out.println(connector.getAllFields());
          }
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.listConnectors got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.api_core.exceptions import GoogleAPICallError
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.ListConnectorsRequest(
        parent=connect_client.connect_cluster_path(project_id, region, connect_cluster_id),
    )
    
    try:
        response = connect_client.list_connectors(request=request)
        for connector in response:
            print("Got connector:", connector)
    except GoogleAPICallError as e:
        print(f"Failed to list connectors with error: {e}")
    

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。