本教學課程說明如何使用 Kafka Connect,將 Pub/Sub 中的訊息擷取至 Managed Service for Apache Kafka 叢集。
Kafka Connect 可管理 Kafka 叢集和其他系統之間的資料移動作業。在本教學課程中,您將建立 Connect 叢集和 Pub/Sub 來源連接器。Pub/Sub 來源連接器會從 Pub/Sub 主題讀取訊息,並將這些訊息寫入 Kafka 主題。
事前準備
控制台
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Kafka API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Kafka API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
請確認您在專案中擁有下列一或多個角色: 代管 Kafka 叢集編輯者、 代管 Kafka Connect 叢集編輯者、 代管 Kafka 連接器編輯者、 代管 Kafka 主題編輯者、 Pub/Sub 編輯者
檢查角色
-
前往 Google Cloud 控制台的「IAM」頁面。
前往「IAM」頁面 - 選取專案。
-
在「主體」欄中,找出所有識別您或您所屬群組的資料列。如要瞭解自己所屬的群組,請與管理員聯絡。
- 針對指定或包含您的所有列,請檢查「角色」欄,確認角色清單是否包含必要角色。
授予角色
-
前往 Google Cloud 控制台的「IAM」頁面。
前往「IAM」頁面 - 選取專案。
- 按一下「Grant access」(授予存取權)。
-
在「New principals」(新增主體) 欄位中,輸入您的使用者 ID。 這通常是指 Google 帳戶的電子郵件地址。
- 按一下「選取角色」,然後搜尋角色。
- 如要授予其他角色,請按一下「Add another role」(新增其他角色),然後新增其他角色。
- 按一下「Save」(儲存)。
-
gcloud
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Managed Kafka API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable managedkafka.googleapis.com
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Managed Kafka API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable managedkafka.googleapis.com
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/managedkafka.clusterEditor, roles/managedkafka.connectClusterEditor, roles/managedkafka.connectorEditor, roles/managedkafka.topicEditor, roles/pubsub.editorgcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
建立 Pub/Sub 主題和訂閱項目
在本步驟中,您要建立含有訂閱項目的 Pub/Sub 主題。
控制台
前往「Pub/Sub」>「主題」頁面。
按一下 「建立主題」。
在「主題 ID」方塊中,輸入主題名稱。
確認已勾選「新增預設訂閱項目」核取方塊。
點選「建立」。
gcloud
如要建立 Pub/Sub 主題,請執行
gcloud pubsub topics create指令。gcloud pubsub topics create TOPIC_ID將
TOPIC_ID替換為 Pub/Sub 主題的名稱。如要為主題建立訂閱項目,請執行
gcloud pubsub subscriptions create指令:gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID將
SUBSCRIPTION_ID替換為 Pub/Sub 訂閱的名稱。
如要瞭解如何命名 Pub/Sub 主題和訂閱項目,請參閱主題或訂閱項目命名規範。
建立 Managed Service for Apache Kafka 資源
在本節中,您將建立下列 Managed Service for Apache Kafka 資源:
- 具有主題的 Kafka 叢集。
- 具備 Pub/Sub 連接器的 Connect 叢集。
建立 Kafka 叢集
在這個步驟中,您會建立 Managed Service for Apache Kafka 叢集。建立叢集最多可能需要 30 分鐘。
控制台
- 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。
- 點選 「建立」。
- 在「Cluster name」(叢集名稱) 方塊中輸入叢集的名稱。
- 在「Region」(區域) 清單中,選取叢集的位置。
-
在「網路設定」中,設定可存取叢集的子網路:
- 在「Project」(專案) 部分,選取專案。
- 在「Network」(網路) 中選取虛擬私有雲網路。
- 在「Subnet」(子網路) 中,選取子網路。
- 按一下 [完成]。
- 點選「建立」。
按一下「建立」後,叢集狀態會顯示為 Creating。叢集準備就緒時,狀態會顯示 Active。
gcloud
如要建立 Kafka 叢集,請執行 managed-kafka clusters
create 指令。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
更改下列內容:
KAFKA_CLUSTER:Kafka 叢集名稱REGION:叢集位置PROJECT_ID:專案 IDSUBNET_NAME:要建立叢集的子網路,例如default
如需支援位置的相關資訊,請參閱「 Managed Service for Apache Kafka 位置」。
這項指令會以非同步方式執行,並傳回作業 ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
如要追蹤建立作業的進度,請使用 gcloud managed-kafka
operations describe 指令:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
叢集準備就緒後,這項指令的輸出內容會包含 state:
ACTIVE 項目。詳情請參閱「 監控叢集建立作業」。
建立 Kafka 主題
建立 Managed Service for Apache Kafka 叢集後,請建立 Kafka 主題。
控制台
前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。
按一下叢集名稱。
在叢集詳細資料頁面中,按一下 「Create Topic」(建立主題)。
在「Topic name」(主題名稱) 方塊中,輸入主題名稱。
點選「建立」。
gcloud
如要建立 Kafka 主題,請執行 managed-kafka topics create 指令。
gcloud managed-kafka topics create KAFKA_TOPIC_NAME \
--cluster=KAFKA_CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
更改下列內容:
KAFKA_TOPIC_NAME:要建立的 Kafka 主題名稱KAFKA_CLUSTER:Kafka 叢集的名稱REGION:您建立 Kafka 叢集的區域
建立連結叢集
在這個步驟中,您會建立 Connect 叢集。建立 Connect 叢集最多可能需要 30 分鐘。
開始執行這個步驟前,請確認 Managed Service for Apache Kafka 叢集已完全建立。
控制台
前往「Managed Service for Apache Kafka」>「Connect Clusters」(連結叢集) 頁面。
點選 「建立」。
在「Connect cluster name」(Connect 叢集名稱) 中輸入字串。例如:
my-connect-cluster。在「主要 Kafka 叢集」中,選取您先前建立的 Kafka。
點選「建立」。
叢集建立期間,叢集狀態為 Creating。叢集建立完成後,狀態會顯示為 Active。
gcloud
如要建立 Connect 叢集,請執行 gcloud managed-kafka connect-clusters create 指令。
gcloud managed-kafka connect-clusters create CONNECT_CLUSTER \
--location=REGION \
--cpu=12 \
--memory=12GiB \
--primary-subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--kafka-cluster=KAFKA_CLUSTER \
--async
更改下列內容:
CONNECT_CLUSTER:Connect 叢集名稱REGION:您建立 Kafka 叢集的區域PROJECT_ID:專案 IDSUBNET_NAME:建立 Kafka 叢集的子網路KAFKA_CLUSTER:Kafka 叢集名稱
這項指令會以非同步方式執行,並傳回作業 ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
如要追蹤建立作業的進度,請使用 gcloud managed-kafka operations describe 指令:
gcloud managed-kafka operations describe OPERATION_ID \
--location=REGION
詳情請參閱「監控叢集建立作業」。
授予 IAM 角色
將下列 Identity and Access Management (IAM) 角色授予代管 Kafka 服務帳戶:
- Pub/Sub 訂閱者
- Pub/Sub 檢視者
這些角色可讓連接器從 Pub/Sub 讀取訊息。
控制台
前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。
選取「包含 Google 提供的角色授予項目」。
找到「Managed Kafka Service Account」(受管理 Kafka 服務帳戶) 列,然後按一下 「Edit principal」(編輯主體)。
按一下「新增其他角色」,然後選取「Pub/Sub 訂閱者」角色。 針對「Pub/Sub 檢視者」角色重複執行這個步驟。
按一下 [儲存]。
如要進一步瞭解如何授予角色,請參閱「使用控制台授予 IAM 角色」。
gcloud
如要將 IAM 角色授予服務帳戶,請執行 gcloud projects add-iam-policy-binding 指令。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
--role=roles/pubsub.subscriber
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
--role=roles/pubsub.viewer
更改下列內容:
PROJECT_ID:專案 IDPROJECT_NUMBER:您的專案編號
如要找出專案編號,請使用 gcloud projects describe 指令。
建立 Pub/Sub 來源連接器
在本步驟中,您要建立 Pub/Sub 來源連接器。這個連接器會從 Pub/Sub 讀取訊息,並寫入 Kafka 主題。
控制台
前往「Managed Service for Apache Kafka」>「Connect Clusters」(連結叢集) 頁面。
按一下 Connect 叢集名稱。
按一下 「建立連接器」。
在「Connector name」(連接器名稱) 中輸入字串。範例:
pubsub-source。在「連接器外掛程式」清單中,選取「
Pub/Sub Source」。在「Cloud Pub/Sub subscription」(Cloud Pub/Sub 訂閱項目),選取建立 Pub/Sub 主題時建立的預設 Pub/Sub。
在「Kafka 主題」中,選取您先前建立的 Kafka 主題。
點選「建立」。
gcloud
如要建立 Pub/Sub 來源連接器,請執行 gcloud managed-kafka connectors create 指令。
gcloud managed-kafka connectors create PUBSUB_CONNECTOR_NAME \
--connect-cluster=CONNECT_CLUSTER \
--location=REGION \
--configs=connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector,\
cps.project=PROJECT_ID,\
cps.streamingPull.enabled=true,\
cps.subscription=SUBSCRIPTION_ID,\
kafka.topic=KAFKA_TOPIC_NAME,\
key.converter=org.apache.kafka.connect.storage.StringConverter,\
tasks.max=3,\
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
更改下列內容:
PUBSUB_CONNECTOR_NAME:連接器名稱,例如pubsub-source-connectorCONNECT_CLUSTER:Connect 叢集名稱REGION:您建立 Connect 叢集的區域PROJECT_ID:專案 IDKAFKA_TOPIC_NAME:Kafka 主題的名稱SUBSCRIPTION_ID:Pub/Sub 訂閱項目名稱
查看結果
如要查看結果,請將一些訊息發布至 Pub/Sub。
控制台
在 Google Cloud 控制台,依序前往「Pub/Sub」>「Topics」(主題) 頁面。
在主題清單中,按一下 Pub/Sub 主題的名稱。
按一下「訊息」。
按一下「發布訊息」。
在「Number of messages」(訊息數) 中輸入
10。在「Message body」(訊息內文) 中輸入
{"name": "Alice", "customer_id": 1}。按一下「發布」。
gcloud
如要將訊息發布至 Pub/Sub 主題,請使用 gcloud pubsub topics publish 指令。
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
將 TOPIC_ID 替換為 Pub/Sub 主題名稱。
現在可以從 Kafka 主題取用訊息。詳情請參閱「使用 CLI 產生及接收訊息」。
清除所用資源
為避免因為本教學課程所用資源,導致系統向 Google Cloud 帳戶收取費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。
控制台
刪除 Pub/Sub 主題。
前往「Pub/Sub」>「主題」頁面。
選取主題,然後按一下「刪除」。
刪除 Pub/Sub 訂閱項目。
前往「Pub/Sub」>「訂閱項目」頁面。
選取以主題建立的訂閱項目,然後按一下「Delete」(刪除)。
刪除 Connect 叢集。
前往「Managed Service for Apache Kafka」>「Connect Clusters」(連結叢集) 頁面。
選取 Connect 叢集,然後按一下「刪除」。
刪除 Kafka 叢集。
前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。
選取 Kafka 叢集,然後按一下「Delete」(刪除)。
gcloud
如要刪除 Pub/Sub 訂閱項目和主題,請使用
gcloud pubsub subscriptions delete和gcloud pubsub topics delete指令。gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID如要刪除 Connect 叢集,請使用
gcloud managed-kafka connect-clusters delete指令。gcloud managed-kafka connect-clusters delete CONNECT_CLUSTER \ --location=REGION --async如要刪除 Kafka 叢集,請使用
gcloud managed-kafka clusters delete指令。gcloud managed-kafka clusters delete KAFKA_CLUSTER \ --location=REGION --async
後續步驟
- 排解 Pub/Sub 連接器問題。
- 進一步瞭解 Pub/Sub 來源連接器。
- 進一步瞭解 Kafka Connect。