使用 Kafka 指令列工具產生及取用訊息

瞭解如何使用 Kafka 指令列工具連線至 Managed Service for Apache Kafka 叢集、產生訊息及取用訊息。

事前準備

開始本教學課程前,請先建立新的 Managed Service for Apache Kafka 叢集。如果已有叢集,可以略過這個步驟。

如何建立叢集

控制台

  1. 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 點選 「Create」(建立)
  3. 在「Cluster name」(叢集名稱) 方塊中輸入叢集的名稱。
  4. 在「Region」(區域) 清單中,選取叢集的位置。
  5. 在「網路設定」中,設定可存取叢集的子網路:
    1. 在「Project」(專案) 部分,選取專案。
    2. 在「Network」(網路) 中選取虛擬私有雲網路。
    3. 在「Subnet」(子網路) 中,選取子網路。
    4. 按一下 [完成]
  6. 點選「建立」

按一下「建立」後,叢集狀態會顯示為 Creating。叢集準備就緒時,狀態會顯示 Active

gcloud

如要建立 Kafka 叢集,請執行 managed-kafka clusters create 指令。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

更改下列內容:

  • KAFKA_CLUSTER:Kafka 叢集名稱
  • REGION:叢集位置
  • PROJECT_ID:專案 ID
  • SUBNET_NAME:要建立叢集的子網路,例如 default

如要瞭解支援的位置,請參閱「 Managed Service for Apache Kafka 位置」。

這項指令會非同步執行,並傳回作業 ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

如要追蹤建立作業的進度,請使用 gcloud managed-kafka operations describe 指令:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

叢集準備就緒後,這項指令的輸出內容會包含 state: ACTIVE 項目。詳情請參閱「 監控叢集建立作業」。

必要的角色

如要取得建立及設定用戶端 VM 所需的權限,請要求管理員在專案中授予您下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

您或許也能透過自訂角色或其他預先定義的角色,取得必要權限。

建立用戶端 VM

在 Compute Engine 中建立可存取 Kafka 叢集的 Linux 虛擬機器 (VM) 執行個體。設定 VM 時,請設定下列選項:

  • 區域。在與 Kafka 叢集相同的區域中建立 VM。

  • 子網路。在與 Kafka 叢集設定中使用的子網路相同的虛擬私有雲網路中建立 VM。詳情請參閱「查看叢集的子網路」。

  • 存取權範圍。將 https://www.googleapis.com/auth/cloud-platform 存取範圍指派給 VM。這個範圍會授權 VM 將要求傳送至 Managed Kafka API。

下列步驟說明如何設定這些選項。

控制台

  1. 前往 Google Cloud 控制台的「Create an instance」(建立執行個體) 頁面。

    建立執行個體

  2. 在「機器設定」窗格中,執行下列操作:

    1. 在「Name」(名稱) 欄位中,指定執行個體的名稱。詳情請參閱資源命名慣例

    2. 在「Region」(區域) 清單中,選取與 Kafka 叢集相同的區域。

    3. 在「Zone」(可用區) 清單中選取可用區。

  3. 在導覽選單中,按一下「Networking」(網路)。在隨即顯示的「Networking」(網路) 窗格中,執行下列操作:

    1. 前往「網路介面」部分。

    2. 如要展開預設網路介面,請點選 箭頭。

    3. 在「Network」(網路) 欄位中,選擇虛擬私有雲網路。

    4. 在「Subnetwork」(子網路) 清單中,選取子網路。

    5. 按一下 [完成]

  4. 按一下導覽選單中的「Security」(安全性)。在隨即顯示的「安全性」窗格中,執行下列操作:

    1. 為「Access scopes」(存取權範圍) 選取 [Set access for each API] (針對各個 API 設定存取權)

    2. 在存取範圍清單中,找到「Cloud Platform」下拉式清單,然後選取「Enabled」

  5. 按一下「建立」即可建立 VM。

gcloud

如要建立 VM 執行個體,請使用 gcloud compute instances create 指令。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

更改下列內容:

  • VM_NAME:VM 的名稱
  • PROJECT_ID:專案 ID
  • REGION:您建立 Kafka 叢集的區域,例如 us-central1
  • SUBNET:與叢集設定中使用的子網路位於相同虛擬私有雲網路的子網路
  • ZONE:您建立叢集的區域中的可用區,例如 us-central1-c

如要進一步瞭解如何建立 VM,請參閱「在特定子網路中建立 VM 執行個體」。

授予 IAM 角色

將下列 Identity and Access Management (IAM) 角色授予 Compute Engine 預設服務帳戶

  • 代管 Kafka 用戶端 (roles/managedkafka.client)
  • 服務帳戶憑證建立者 (roles/iam.serviceAccountTokenCreator)
  • 服務帳戶 OpenID 權杖建立者 (roles/iam.serviceAccountOpenIdTokenCreator)

控制台

  1. 前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。

    前往「身分與存取權管理」頁面

  2. 找出「Compute Engine default service account」(Compute Engine 預設服務帳戶) 的資料列,然後按一下 「Edit principal」(編輯主體)

  3. 按一下「新增其他角色」,然後選取「受管理 Kafka 用戶端」角色。 針對「服務帳戶憑證建立者」和「服務帳戶 OpenID 憑證建立者」角色重複這個步驟。

  4. 按一下 [儲存]

gcloud

如要授予 IAM 角色,請使用 gcloud projects add-iam-policy-binding 指令。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

更改下列內容:

  • PROJECT_ID:專案 ID

  • PROJECT_NUMBER:您的專案編號

如要取得專案編號,請執行 gcloud projects describe 指令:

gcloud projects describe PROJECT_ID

詳情請參閱「尋找專案名稱、編號和 ID」。

連線至 VM

使用 SSH 連線至 VM 執行個體。

控制台

  1. 前往「VM instances」(VM 執行個體) 頁面。

    前往 VM 執行個體

  2. 在 VM 執行個體清單中找到 VM 名稱,然後按一下「SSH」SSH

gcloud

如要連線至 VM,請使用 gcloud compute ssh 指令。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

更改下列內容:

  • VM_NAME:VM 的名稱
  • PROJECT_ID:專案 ID
  • ZONE:您建立 VM 的可用區

首次使用 SSH 時,可能需要額外設定。詳情請參閱「關於 SSH 連線」一文。

安裝 Kafka 指令列工具

在 SSH 工作階段中,執行下列指令來安裝 Kafka 指令列工具。

  1. 安裝 Java (執行 Kafka 指令列工具時需要),並安裝 wget,協助下載依附元件。下列指令假設您使用 Debian Linux 環境。

    sudo apt-get install default-jre wget
    
  2. 安裝 Kafka 指令列工具。

    wget -O kafka_2.13-3.7.2.tgz https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
    tar xfz kafka_2.13-3.7.2.tgz
    
  3. 請設定下列環境變數:

    export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
    export PATH=$PATH:$KAFKA_HOME/bin
    export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
    

設定驗證方法

在 SSH 工作階段中,執行下列步驟來設定 Managed Service for Apache Kafka 驗證程式庫。

  1. 下載程式庫並在本機安裝。

    wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
    sudo apt-get install unzip
    unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/
    

    這個指令會將程式庫安裝在 Kafka 安裝目錄的 lib 目錄中。Kafka 指令列工具會在這個目錄中尋找 Java 依附元件。

  2. 使用文字編輯器建立名為 client.properties 的檔案,並貼入下列內容:

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    

    儲存檔案。這個檔案會使用下列設定,設定 Kafka 用戶端:

    • 使用 SASL_SSL 與 Kafka 叢集進行安全通訊。

    • 使用 OAuth 2.0 不記名權杖進行驗證。

    • 使用程式庫提供的 GcpLoginCallbackHandler 類別做為登入回呼處理常式,取得 OAuth 2.0 權杖。

產生及讀取訊息

在 SSH 工作階段中,執行下列指令來產生及取用 Kafka 訊息。

  1. 將啟動位址設為環境變數。

    export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    

    更改下列內容:

    • CLUSTER_ID:叢集名稱
    • REGION:您建立叢集的區域
    • PROJECT_ID:專案 ID

    詳情請參閱「取得啟動位址」。

  2. 列出叢集中的主題。

    kafka-topics.sh --list \
      --bootstrap-server $BOOTSTRAP \
      --command-config client.properties
    
  3. 撰寫主題訊息。

    echo "hello world" | kafka-console-producer.sh \
      --topic KAFKA_TOPIC_NAME \
      --bootstrap-server $BOOTSTRAP \
      --producer.config client.properties
    

    KAFKA_TOPIC_NAME 替換為主題名稱。

  4. 從主題取用訊息。

    kafka-console-consumer.sh \
      --topic KAFKA_TOPIC_NAME \
      --from-beginning \
      --bootstrap-server $BOOTSTRAP \
      --consumer.config client.properties
    

    如要停止取用訊息,請輸入 Ctrl+C

  5. 執行製作人效能測試。

    kafka-producer-perf-test.sh \
      --topic KAFKA_TOPIC_NAME \
      --num-records 1000000 --throughput 1000 --print-metrics --record-size 1024 \
      --producer-props bootstrap.servers=$BOOTSTRAP \
      --producer.config client.properties
    

清除所用資源

為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請按照下列步驟操作。

控制台

  1. 刪除 VM 執行個體。

    1. 前往「VM instances」(VM 執行個體) 頁面。

      前往 VM 執行個體

    2. 選取 VM,然後按一下「Delete」(刪除)

  2. 刪除 Kafka 叢集。

    1. 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。

      前往「Clusters」(叢集)

    2. 選取 Kafka 叢集,然後按一下「Delete」(刪除)

gcloud

  1. 如要刪除 VM,請使用 gcloud compute instances delete 指令。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. 如要刪除 Kafka 叢集,請使用 gcloud managed-kafka clusters delete 指令。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。