使用 Kafka 指令列工具產生及取用訊息
瞭解如何使用 Kafka 指令列工具連線至 Managed Service for Apache Kafka 叢集、產生訊息及取用訊息。
事前準備
開始本教學課程前,請先建立新的 Managed Service for Apache Kafka 叢集。如果已有叢集,可以略過這個步驟。
如何建立叢集
控制台
- 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。
- 點選 「Create」(建立)。
- 在「Cluster name」(叢集名稱) 方塊中輸入叢集的名稱。
- 在「Region」(區域) 清單中,選取叢集的位置。
-
在「網路設定」中,設定可存取叢集的子網路:
- 在「Project」(專案) 部分,選取專案。
- 在「Network」(網路) 中選取虛擬私有雲網路。
- 在「Subnet」(子網路) 中,選取子網路。
- 按一下 [完成]。
- 點選「建立」。
按一下「建立」後,叢集狀態會顯示為 Creating。叢集準備就緒時,狀態會顯示 Active。
gcloud
如要建立 Kafka 叢集,請執行 managed-kafka clusters
create 指令。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
更改下列內容:
KAFKA_CLUSTER:Kafka 叢集名稱REGION:叢集位置PROJECT_ID:專案 IDSUBNET_NAME:要建立叢集的子網路,例如default
如要瞭解支援的位置,請參閱「 Managed Service for Apache Kafka 位置」。
這項指令會非同步執行,並傳回作業 ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
如要追蹤建立作業的進度,請使用 gcloud managed-kafka
operations describe 指令:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
叢集準備就緒後,這項指令的輸出內容會包含 state:
ACTIVE 項目。詳情請參閱「 監控叢集建立作業」。
必要的角色
如要取得建立及設定用戶端 VM 所需的權限,請要求管理員在專案中授予您下列 IAM 角色:
- Compute 執行個體管理員 (v1) (
roles/compute.instanceAdmin.v1) - 專案 IAM 管理員 (
roles/resourcemanager.projectIamAdmin) -
角色檢視者 (
roles/iam.roleViewer) -
服務帳戶使用者 (
roles/iam.serviceAccountUser)
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
建立用戶端 VM
在 Compute Engine 中建立可存取 Kafka 叢集的 Linux 虛擬機器 (VM) 執行個體。設定 VM 時,請設定下列選項:
區域。在與 Kafka 叢集相同的區域中建立 VM。
子網路。在與 Kafka 叢集設定中使用的子網路相同的虛擬私有雲網路中建立 VM。詳情請參閱「查看叢集的子網路」。
存取權範圍。將
https://www.googleapis.com/auth/cloud-platform存取範圍指派給 VM。這個範圍會授權 VM 將要求傳送至 Managed Kafka API。
下列步驟說明如何設定這些選項。
控制台
前往 Google Cloud 控制台的「Create an instance」(建立執行個體) 頁面。
在「機器設定」窗格中,執行下列操作:
在「Name」(名稱) 欄位中,指定執行個體的名稱。詳情請參閱資源命名慣例。
在「Region」(區域) 清單中,選取與 Kafka 叢集相同的區域。
在「Zone」(可用區) 清單中選取可用區。
在導覽選單中,按一下「Networking」(網路)。在隨即顯示的「Networking」(網路) 窗格中,執行下列操作:
前往「網路介面」部分。
如要展開預設網路介面,請點選 箭頭。
在「Network」(網路) 欄位中,選擇虛擬私有雲網路。
在「Subnetwork」(子網路) 清單中,選取子網路。
按一下 [完成]。
按一下導覽選單中的「Security」(安全性)。在隨即顯示的「安全性」窗格中,執行下列操作:
為「Access scopes」(存取權範圍) 選取 [Set access for each API] (針對各個 API 設定存取權)。
在存取範圍清單中,找到「Cloud Platform」下拉式清單,然後選取「Enabled」。
按一下「建立」即可建立 VM。
gcloud
如要建立 VM 執行個體,請使用 gcloud compute instances create 指令。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
更改下列內容:
- VM_NAME:VM 的名稱
- PROJECT_ID:專案 ID
- REGION:您建立 Kafka 叢集的區域,例如
us-central1 - SUBNET:與叢集設定中使用的子網路位於相同虛擬私有雲網路的子網路
- ZONE:您建立叢集的區域中的可用區,例如
us-central1-c
如要進一步瞭解如何建立 VM,請參閱「在特定子網路中建立 VM 執行個體」。
授予 IAM 角色
將下列 Identity and Access Management (IAM) 角色授予 Compute Engine 預設服務帳戶:
- 代管 Kafka 用戶端 (
roles/managedkafka.client) - 服務帳戶憑證建立者 (
roles/iam.serviceAccountTokenCreator) 服務帳戶 OpenID 權杖建立者 (
roles/iam.serviceAccountOpenIdTokenCreator)
控制台
前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。
找出「Compute Engine default service account」(Compute Engine 預設服務帳戶) 的資料列,然後按一下 「Edit principal」(編輯主體)。
按一下「新增其他角色」,然後選取「受管理 Kafka 用戶端」角色。 針對「服務帳戶憑證建立者」和「服務帳戶 OpenID 憑證建立者」角色重複這個步驟。
按一下 [儲存]。
gcloud
如要授予 IAM 角色,請使用 gcloud projects add-iam-policy-binding 指令。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
更改下列內容:
PROJECT_ID:專案 ID
PROJECT_NUMBER:您的專案編號
如要取得專案編號,請執行 gcloud projects describe 指令:
gcloud projects describe PROJECT_ID
詳情請參閱「尋找專案名稱、編號和 ID」。
連線至 VM
使用 SSH 連線至 VM 執行個體。
控制台
前往「VM instances」(VM 執行個體) 頁面。
在 VM 執行個體清單中找到 VM 名稱,然後按一下「SSH」SSH。
gcloud
如要連線至 VM,請使用 gcloud compute ssh 指令。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
更改下列內容:
- VM_NAME:VM 的名稱
- PROJECT_ID:專案 ID
- ZONE:您建立 VM 的可用區
首次使用 SSH 時,可能需要額外設定。詳情請參閱「關於 SSH 連線」一文。
安裝 Kafka 指令列工具
在 SSH 工作階段中,執行下列指令來安裝 Kafka 指令列工具。
安裝 Java (執行 Kafka 指令列工具時需要),並安裝
wget,協助下載依附元件。下列指令假設您使用 Debian Linux 環境。sudo apt-get install default-jre wget安裝 Kafka 指令列工具。
wget -O kafka_2.13-3.7.2.tgz https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz tar xfz kafka_2.13-3.7.2.tgz請設定下列環境變數:
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2 export PATH=$PATH:$KAFKA_HOME/bin export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
設定驗證方法
在 SSH 工作階段中,執行下列步驟來設定 Managed Service for Apache Kafka 驗證程式庫。
下載程式庫並在本機安裝。
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip sudo apt-get install unzip unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/這個指令會將程式庫安裝在 Kafka 安裝目錄的
lib目錄中。Kafka 指令列工具會在這個目錄中尋找 Java 依附元件。使用文字編輯器建立名為
client.properties的檔案,並貼入下列內容:security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;儲存檔案。這個檔案會使用下列設定,設定 Kafka 用戶端:
使用 SASL_SSL 與 Kafka 叢集進行安全通訊。
使用 OAuth 2.0 不記名權杖進行驗證。
使用程式庫提供的
GcpLoginCallbackHandler類別做為登入回呼處理常式,取得 OAuth 2.0 權杖。
產生及讀取訊息
在 SSH 工作階段中,執行下列指令來產生及取用 Kafka 訊息。
將啟動位址設為環境變數。
export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092更改下列內容:
CLUSTER_ID:叢集名稱REGION:您建立叢集的區域PROJECT_ID:專案 ID
詳情請參閱「取得啟動位址」。
列出叢集中的主題。
kafka-topics.sh --list \ --bootstrap-server $BOOTSTRAP \ --command-config client.properties撰寫主題訊息。
echo "hello world" | kafka-console-producer.sh \ --topic KAFKA_TOPIC_NAME \ --bootstrap-server $BOOTSTRAP \ --producer.config client.properties將 KAFKA_TOPIC_NAME 替換為主題名稱。
從主題取用訊息。
kafka-console-consumer.sh \ --topic KAFKA_TOPIC_NAME \ --from-beginning \ --bootstrap-server $BOOTSTRAP \ --consumer.config client.properties如要停止取用訊息,請輸入 Ctrl+C。
執行製作人效能測試。
kafka-producer-perf-test.sh \ --topic KAFKA_TOPIC_NAME \ --num-records 1000000 --throughput 1000 --print-metrics --record-size 1024 \ --producer-props bootstrap.servers=$BOOTSTRAP \ --producer.config client.properties
清除所用資源
為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請按照下列步驟操作。
控制台
刪除 VM 執行個體。
前往「VM instances」(VM 執行個體) 頁面。
選取 VM,然後按一下「Delete」(刪除)。
刪除 Kafka 叢集。
前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。
選取 Kafka 叢集,然後按一下「Delete」(刪除)。
gcloud
如要刪除 VM,請使用
gcloud compute instances delete指令。gcloud compute instances delete VM_NAME --zone=ZONE如要刪除 Kafka 叢集,請使用
gcloud managed-kafka clusters delete指令。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async