建立 Google Cloud Managed Service for Apache Kafka 叢集

Managed Service for Apache Kafka 叢集提供環境,可儲存及處理以主題分類的訊息串流。

如要建立叢集,可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Managed Kafka API。您無法使用開放原始碼 Apache Kafka API 建立叢集。

事前準備

請確認您熟悉下列項目:

建立叢集所需的角色和權限

如要取得建立叢集所需的權限,請要求管理員授予您專案的代管 Kafka 叢集編輯者 (roles/managedkafka.clusterEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備建立叢集所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要建立叢集,必須具備下列權限:

  • 建立叢集: managedkafka.clusters.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您無法透過 Managed Kafka 叢集編輯者角色,在 Managed Service for Apache Kafka 叢集上建立、刪除或修改主題和用戶群組。也不允許資料平面存取權,在叢集內發布或取用訊息。如要進一步瞭解這個角色,請參閱「Managed Service for Apache Kafka 預先定義的角色」。

Managed Service for Apache Kafka 叢集的屬性

建立或更新 Managed Service for Apache Kafka 叢集時,您必須指定下列屬性。

叢集名稱

待建立 Managed Service for Apache Kafka 叢集的名稱或 ID。如要查看叢集命名準則,請參閱「Managed Service for Apache Kafka 資源命名指南」。叢集名稱無法變更。

位置

要建立叢集的位置,須為其中一個支援的 Google Cloud 區域。叢集位置設定後即無法變更。如要查詢可用位置清單,請參閱「Managed Service for Apache Kafka 位置」。

容量設定

請在「運算資源設定」專區,提供 Kafka 設定的 vCPU 數量和記憶體容量。如要進一步瞭解如何設定叢集運算資源,請參閱「規劃 Kafka 叢集大小」。

容量設定的屬性如下:

  • vCPUs:叢集中的 vCPU 數量。每個叢集至少需要 3 個 vCPU。

  • 記憶體:指派給叢集的記憶體量。 每個 vCPU 必須佈建 1 GiB 至 8 GiB 的記憶體。

    舉例來說,如果您建立的叢集有 6 個 vCPU,則可分配給叢集的記憶體下限為 6 GiB (每個 vCPU 1 GiB),上限為 48 GiB (每個 vCPU 8 GiB)。

如要進一步瞭解如何在叢集建立後變更記憶體和 vCPU 數量,請參閱「更新叢集大小」。

網路設定

網路設定是一份清單,內含可以存取叢集的虛擬私有雲子網路。如要產生或取用訊息,用戶端必須能連上其中一個子網路。

以下是網路設定的一些準則:

  • 叢集至少需要一個子網路。最多可新增 10 個。

  • 每個叢集只能有一個子網路。

  • 每個子網路都必須與叢集位於同一個區域。專案和網路可以不同。

  • 代理程式和啟動伺服器會在各子網路自動獲得 IP 位址。此外,系統會在相應的 VPC 網路中,建立這些 IP 位址的 DNS 項目。

  • 如果您新增其他專案的子網路,必須將權限授予與叢集相關聯的 Google 管理服務帳戶。詳情請參閱「跨專案連結叢集」。

建立叢集後,您可以更新子網路清單。如要進一步瞭解網路,請參閱「設定 Managed Service for Apache Kafka 的網路」。

標籤

標籤是鍵/值組合,可協助您整理及識別資源。 標籤可根據環境將資源分類。例如 "env:production""owner:data-engineering"

您可以根據標籤篩選及搜尋資源。舉例來說,假設您為不同部門建立了多個 Managed Service for Apache Kafka 叢集。你可以設定及搜尋標籤為 "department:marketing" 的叢集,快速找出相關叢集。

重新平衡設定

這項設定會決定服務是否要自動重新平衡代理程式之間的分割區副本。

可用的模式如下:

  • 擴充時自動重新平衡:啟用此選項後,當您擴充叢集時,服務會自動觸發副本重新平衡。這個模式有助於維持平均負載分配,但重新平衡作業期間可能會暫時影響效能。

  • 不重新平衡:啟用這個選項後,服務不會自動重新平衡副本。

加密

Managed Service for Apache Kafka 可以使用Google-owned and Google-managed encryption keys (預設) 或客戶自行管理的加密金鑰 (CMEK) 加密訊息。所有訊息都會經過加密,無論是靜態或傳輸中的資料都安全無虞。叢集的加密類型無法變更。

根據預設,系統會使用 Google-owned and Google-managed encryption keys。這些金鑰完全由 Google Cloud 在其基礎架構內建立、管理及儲存。

CMEK 是指您透過 Cloud Key Management Service 管理的加密金鑰。這項功能可讓您進一步控管用於加密支援服務中靜態資料的金鑰。 Google Cloud 使用 CMEK 會產生與 Cloud Key Management Service 相關的額外費用。如要使用 CMEK,金鑰環必須與您使用的資源位於相同位置。詳情請參閱「設定訊息加密」。

mTLS 設定

您可以選擇設定 mTLS 做為替代驗證方法,使用用戶端憑證。設定包括下列項目:

  • CA 集區:叢集信任的憑證授權單位服務 (CAS) 集區清單,用於用戶端驗證。最多可列出 10 個集區。

  • SSL 主體對應規則:可選用但建議使用的代理程式屬性,可簡化長憑證主體名稱,以用於 Kafka ACL。ssl.principal.mapping.rules

如要進一步瞭解 mTLS,請參閱「設定 mTLS 驗證」。

建立叢集

建立叢集前,請先參閱叢集屬性說明文件。

建立叢集通常需要 20 到 30 分鐘。

如要建立叢集,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 選取「建立」

    「建立 Kafka 叢集」頁面隨即開啟。

  3. 在「Cluster name」(叢集名稱) 中輸入字串。

    如要進一步瞭解如何命名叢集,請參閱「Managed Service for Apache Kafka 資源命名指南」。

  4. 在「Location」部分,輸入支援的地點。

    如要進一步瞭解支援的位置,請參閱「支援的 Managed Service for Apache Kafka 位置」。

  5. 在「運算資源設定」專區,輸入「記憶體」和「vCPU」的值。

    如要進一步瞭解如何調整 Managed Service for Apache Kafka 叢集大小,請參閱「規劃 Kafka 叢集大小」。

  6. 在「Network configuration」(網路設定) 中輸入下列詳細資料:

    1. 專案:子網路所在的專案。子網路必須與叢集位於相同區域,但專案可能不同。
    2. 網路:子網路連線的網路。
    3. 子網路:子網路的名稱。
    4. 子網路 URI 路徑:這個欄位會自動填入資料。或者,您可以在這裡輸入子網路路徑。子網路名稱的格式必須為 projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID
    5. 按一下 [完成]
  7. (選用) 按一下「新增連結的子網路」,新增其他子網路。

    最多可新增 10 個子網路。

  8. 保留其他預設值。

  9. 點選「建立」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud managed-kafka clusters create 指令:

    gcloud managed-kafka clusters create CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --encryption-key=ENCRYPTION_KEY \
        --async \
        --labels=LABELS
    

    更改下列內容:

    • CLUSTER_ID:叢集的 ID 或名稱。

      如要進一步瞭解如何命名叢集,請參閱「Managed Service for Apache Kafka 資源命名指南」。

    • LOCATION:叢集位置。

      如要進一步瞭解支援的位置,請參閱「Managed Service for Apache Kafka 位置」。

    • CPU:叢集的 vCPU 數量。

      如要進一步瞭解如何調整 Managed Service for Apache Kafka 叢集大小,請參閱「規劃 Kafka 叢集大小」。

    • MEMORY:叢集的記憶體量。 請使用「MB」、「MiB」、「GB」、「GiB」、「TB」或「TiB」單位。例如「10GiB」。

    • SUBNETS:要連線的子網路清單。 如有多個子網路值,請使用半形逗號分隔。

      子網路的格式為 projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID

    • auto-rebalance:在叢集中的 CPU 數量變更時,啟用代理程式間的主題分區自動重新平衡功能。這項功能預設為啟用。

    • ENCRYPTION_KEY:要用於叢集的客戶管理加密金鑰 ID。

      格式為 projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/CRYPTO_KEY

    • --async:讓系統傳送建立要求,並立即傳回回應,不必等待作業完成。使用 --async 標記時,您可以在背景建立叢集,同時繼續執行其他工作。如果您未使用這個旗標,系統會等待作業完成,再傳回回應。您必須等到叢集完全更新,才能繼續執行其他工作。

    • LABELS:要與叢集建立關聯的標籤。

      如要進一步瞭解標籤格式,請參閱「標籤」。

    您會收到類似以下的回覆:

    Create request issued for: [CLUSTER_ID]
    Check operation [projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID] for status.
    

    儲存 OPERATION_ID,以便追蹤 progress

  3. REST

    使用任何要求資料之前,請先修改下列項目的值:

    • PROJECT_ID:您的 Google Cloud 專案 ID
    • LOCATION:叢集位置
    • CLUSTER_ID:叢集 ID
    • CPU_COUNT:叢集的 vCPU 數量
    • MEMORY:叢集的記憶體量,以位元組為單位。範例:3221225472
    • SUBNET_ID:要連線的子網路 ID。範例:default

    HTTP 方法和網址:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters?clusterId=CLUSTER_ID

    JSON 要求主體:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    請展開以下其中一個選項,以傳送要求:

    您應該會收到如下的 JSON 回覆:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Terraform

    您可以使用 Terraform 資源建立叢集

    resource "google_managed_kafka_cluster" "default" {
      project    = data.google_project.default.project_id # Replace this with your project ID in quotes
      cluster_id = "my-cluster-id"
      location   = "us-central1"
      capacity_config {
        vcpu_count   = 3
        memory_bytes = 3221225472
      }
      gcp_config {
        access_config {
          network_configs {
            subnet = google_compute_subnetwork.default.id
          }
        }
      }
    }

    如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。

    Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createCluster(w io.Writer, projectID, region, clusterID, subnet string, cpu, memoryBytes int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// subnet := "projects/my-project-id/regions/us-central1/subnetworks/default"
    	// cpu := 3
    	// memoryBytes := 3221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
    	clusterPath := fmt.Sprintf("%s/clusters/%s", locationPath, clusterID)
    
    	// Memory must be between 1 GiB and 8 GiB per CPU.
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		VcpuCount:   cpu,
    		MemoryBytes: memoryBytes,
    	}
    	var networkConfig []*managedkafkapb.NetworkConfig
    	networkConfig = append(networkConfig, &managedkafkapb.NetworkConfig{
    		Subnet: subnet,
    	})
    	platformConfig := &managedkafkapb.Cluster_GcpConfig{
    		GcpConfig: &managedkafkapb.GcpConfig{
    			AccessConfig: &managedkafkapb.AccessConfig{
    				NetworkConfigs: networkConfig,
    			},
    		},
    	}
    	rebalanceConfig := &managedkafkapb.RebalanceConfig{
    		Mode: managedkafkapb.RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:            clusterPath,
    		CapacityConfig:  capacityConfig,
    		PlatformConfig:  platformConfig,
    		RebalanceConfig: rebalanceConfig,
    	}
    
    	req := &managedkafkapb.CreateClusterRequest{
    		Parent:    locationPath,
    		ClusterId: clusterID,
    		Cluster:   cluster,
    	}
    	op, err := client.CreateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateCluster got err: %w", err)
    	}
    	// The duration of this operation can vary considerably, typically taking 10-40 minutes.
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created cluster: %s\n", resp.Name)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.RetryingFuture;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.AccessConfig;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.CreateClusterRequest;
    import com.google.cloud.managedkafka.v1.GcpConfig;
    import com.google.cloud.managedkafka.v1.LocationName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.NetworkConfig;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.RebalanceConfig;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class CreateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
        int cpu = 3;
        long memoryBytes = 3221225472L; // 3 GiB
        createCluster(projectId, region, clusterId, subnet, cpu, memoryBytes);
      }
    
      public static void createCluster(
          String projectId, String region, String clusterId, String subnet, int cpu, long memoryBytes)
          throws Exception {
        CapacityConfig capacityConfig =
            CapacityConfig.newBuilder().setVcpuCount(cpu).setMemoryBytes(memoryBytes).build();
        NetworkConfig networkConfig = NetworkConfig.newBuilder().setSubnet(subnet).build();
        GcpConfig gcpConfig =
            GcpConfig.newBuilder()
                .setAccessConfig(AccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
                .build();
        RebalanceConfig rebalanceConfig =
            RebalanceConfig.newBuilder()
                .setMode(RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP)
                .build();
        Cluster cluster =
            Cluster.newBuilder()
                .setCapacityConfig(capacityConfig)
                .setGcpConfig(gcpConfig)
                .setRebalanceConfig(rebalanceConfig)
                .build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.createClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
    
          CreateClusterRequest request =
              CreateClusterRequest.newBuilder()
                  .setParent(LocationName.of(projectId, region).toString())
                  .setClusterId(clusterId)
                  .setCluster(cluster)
                  .build();
    
          // The duration of this operation can vary considerably, typically taking between 10-40
          // minutes.
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.createClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          while (!future.isDone()) {
            // The pollingFuture gives us the most recent status of the operation
            RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
            OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
            System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
                currentOp.getName(),
                currentOp.isDone());
          }
    
          // NOTE: future.get() blocks completion until the operation is complete (isDone =  True)
          Cluster response = future.get();
          System.out.printf("Created cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.createCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
    # cpu = 3
    # memory_bytes = 3221225472
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.vcpu_count = cpu
    cluster.capacity_config.memory_bytes = memory_bytes
    cluster.gcp_config.access_config.network_configs = [
        managedkafka_v1.NetworkConfig(subnet=subnet)
    ]
    cluster.rebalance_config.mode = (
        managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP
    )
    
    request = managedkafka_v1.CreateClusterRequest(
        parent=client.common_location_path(project_id, region),
        cluster_id=cluster_id,
        cluster=cluster,
    )
    
    try:
        operation = client.create_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        # The duration of this operation can vary considerably, typically taking 10-40 minutes.
        # We can set a timeout of 3000s (50 minutes).
        response = operation.result(timeout=3000)
        print("Created cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

監控叢集建立作業

只有在執行 gcloud CLI 建立叢集時,才能執行下列指令。

  • 建立叢集通常需要 20 到 30 分鐘。如要追蹤叢集建立作業的進度,gcloud managed-kafka clusters create 指令會使用長時間執行的作業 (LRO),您可以透過下列指令監控這項作業:

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    更改下列內容:

    • OPERATION_ID 替換為上一節中的作業 ID 值。
    • LOCATION,並將其替換為上一節中的位置值。

疑難排解

建立叢集時可能會遇到下列錯誤。

Service agent service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com has not been granted the required role cloudkms.cryptoKeyEncrypterDecrypter to encrypt data using the KMS key.

Managed Service for Apache Kafka 服務代理程式缺少存取 Cloud KMS 金鑰的必要權限。請參閱設定 CMEK 的必要角色文件。

Service does not have permission to retrieve subnet. Please grant service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com the managedkafka.serviceAgent role in the IAM policy of the project ${SUBNET_PROJECT} and ensure the Compute Engine API is enabled in project ${SUBNET_PROJECT}

Managed Service for Apache Kafka 服務代理缺少必要角色,無法在 Kafka 用戶端執行的 VPC 網路中設定網路。詳情請參閱「連結跨專案的叢集」。

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。