將 Pub/Sub 訊息複製到 Kafka

本教學課程說明如何使用 Kafka Connect,將 Pub/Sub 中的訊息擷取至 Managed Service for Apache Kafka 叢集。

Kafka Connect 可管理 Kafka 叢集和其他系統之間的資料移動作業。在本教學課程中,您將建立 Connect 叢集Pub/Sub 來源連接器。Pub/Sub 來源連接器會從 Pub/Sub 主題讀取訊息,並將這些訊息寫入 Kafka 主題。

事前準備

控制台

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Managed Kafka API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Managed Kafka API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  8. 請確認您在專案中擁有下列一或多個角色: 代管 Kafka 叢集編輯者代管 Kafka Connect 叢集編輯者代管 Kafka 連接器編輯者代管 Kafka 主題編輯者Pub/Sub 編輯者

    檢查角色

    1. 前往 Google Cloud 控制台的「IAM」頁面。

      前往「IAM」頁面
    2. 選取專案。
    3. 在「主體」欄中,找出所有識別您或您所屬群組的資料列。如要瞭解自己所屬的群組,請與管理員聯絡。

    4. 針對指定或包含您的所有列,請檢查「角色」欄,確認角色清單是否包含必要角色。

    授予角色

    1. 前往 Google Cloud 控制台的「IAM」頁面。

      前往「IAM」頁面
    2. 選取專案。
    3. 按一下「Grant access」(授予存取權)
    4. 在「New principals」(新增主體) 欄位中,輸入您的使用者 ID。 這通常是指 Google 帳戶的電子郵件地址。

    5. 按一下「選取角色」,然後搜尋角色。
    6. 如要授予其他角色,請按一下「Add another role」(新增其他角色),然後新增其他角色。
    7. 按一下「Save」(儲存)

gcloud

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. 安裝 Google Cloud CLI。

  3. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  4. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  5. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  6. 確認專案已啟用計費功能 Google Cloud

  7. 啟用 Managed Kafka API:

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable managedkafka.googleapis.com
  8. 安裝 Google Cloud CLI。

  9. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  10. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  11. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  12. 確認專案已啟用計費功能 Google Cloud

  13. 啟用 Managed Kafka API:

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable managedkafka.googleapis.com
  14. 將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令: roles/managedkafka.clusterEditor, roles/managedkafka.connectClusterEditor, roles/managedkafka.connectorEditor, roles/managedkafka.topicEditor, roles/pubsub.editor

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    更改下列內容:

    • PROJECT_ID:專案 ID。
    • USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com
    • ROLE:授予使用者帳戶的 IAM 角色。

建立 Pub/Sub 主題和訂閱項目

在本步驟中,您要建立含有訂閱項目的 Pub/Sub 主題

控制台

  1. 前往「Pub/Sub」>「主題」頁面。

    前往「主題」

  2. 按一下 「建立主題」

  3. 在「主題 ID」方塊中,輸入主題名稱。

  4. 確認已勾選「新增預設訂閱項目」核取方塊。

  5. 點選「建立」

gcloud

  1. 如要建立 Pub/Sub 主題,請執行 gcloud pubsub topics create 指令。

    gcloud pubsub topics create TOPIC_ID
    

    TOPIC_ID 替換為 Pub/Sub 主題的名稱。

  2. 如要為主題建立訂閱項目,請執行 gcloud pubsub subscriptions create 指令:

    gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
    

    SUBSCRIPTION_ID 替換為 Pub/Sub 訂閱的名稱。

如要瞭解如何命名 Pub/Sub 主題和訂閱項目,請參閱主題或訂閱項目命名規範

建立 Managed Service for Apache Kafka 資源

在本節中,您將建立下列 Managed Service for Apache Kafka 資源:

  • 具有主題的 Kafka 叢集。
  • 具備 Pub/Sub 連接器的 Connect 叢集。

建立 Kafka 叢集

在這個步驟中,您會建立 Managed Service for Apache Kafka 叢集。建立叢集最多可能需要 30 分鐘。

控制台

  1. 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 點選 「建立」
  3. 在「Cluster name」(叢集名稱) 方塊中輸入叢集的名稱。
  4. 在「Region」(區域) 清單中,選取叢集的位置。
  5. 在「網路設定」中,設定可存取叢集的子網路:
    1. 在「Project」(專案) 部分,選取專案。
    2. 在「Network」(網路) 中選取虛擬私有雲網路。
    3. 在「Subnet」(子網路) 中,選取子網路。
    4. 按一下 [完成]
  6. 點選「建立」

按一下「建立」後,叢集狀態會顯示為 Creating。叢集準備就緒時,狀態會顯示 Active

gcloud

如要建立 Kafka 叢集,請執行 managed-kafka clusters create 指令。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

更改下列內容:

  • KAFKA_CLUSTER:Kafka 叢集名稱
  • REGION:叢集位置
  • PROJECT_ID:專案 ID
  • SUBNET_NAME:要建立叢集的子網路,例如 default

如需支援位置的相關資訊,請參閱「 Managed Service for Apache Kafka 位置」。

這項指令會以非同步方式執行,並傳回作業 ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

如要追蹤建立作業的進度,請使用 gcloud managed-kafka operations describe 指令:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

叢集準備就緒後,這項指令的輸出內容會包含 state: ACTIVE 項目。詳情請參閱「 監控叢集建立作業」。

建立 Kafka 主題

建立 Managed Service for Apache Kafka 叢集後,請建立 Kafka 主題。

控制台

  1. 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 按一下叢集名稱。

  3. 在叢集詳細資料頁面中,按一下 「Create Topic」(建立主題)

  4. 在「Topic name」(主題名稱) 方塊中,輸入主題名稱。

  5. 點選「建立」

gcloud

如要建立 Kafka 主題,請執行 managed-kafka topics create 指令。

gcloud managed-kafka topics create KAFKA_TOPIC_NAME \
--cluster=KAFKA_CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

更改下列內容:

  • KAFKA_TOPIC_NAME:要建立的 Kafka 主題名稱
  • KAFKA_CLUSTER:Kafka 叢集的名稱
  • REGION:您建立 Kafka 叢集的區域

建立連結叢集

在這個步驟中,您會建立 Connect 叢集。建立 Connect 叢集最多可能需要 30 分鐘。

開始執行這個步驟前,請確認 Managed Service for Apache Kafka 叢集已完全建立。

控制台

  1. 前往「Managed Service for Apache Kafka」>「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 點選 「建立」

  3. 在「Connect cluster name」(Connect 叢集名稱) 中輸入字串。例如:my-connect-cluster

  4. 在「主要 Kafka 叢集」中,選取您先前建立的 Kafka。

  5. 點選「建立」

叢集建立期間,叢集狀態為 Creating。叢集建立完成後,狀態會顯示為 Active

gcloud

如要建立 Connect 叢集,請執行 gcloud managed-kafka connect-clusters create 指令。

gcloud managed-kafka connect-clusters create CONNECT_CLUSTER \
  --location=REGION \
  --cpu=12 \
  --memory=12GiB \
  --primary-subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
  --kafka-cluster=KAFKA_CLUSTER \
  --async

更改下列內容:

  • CONNECT_CLUSTER:Connect 叢集名稱
  • REGION:您建立 Kafka 叢集的區域
  • PROJECT_ID:專案 ID
  • SUBNET_NAME:建立 Kafka 叢集的子網路
  • KAFKA_CLUSTER:Kafka 叢集名稱

這項指令會以非同步方式執行,並傳回作業 ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

如要追蹤建立作業的進度,請使用 gcloud managed-kafka operations describe 指令:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

詳情請參閱「監控叢集建立作業」。

授予 IAM 角色

將下列 Identity and Access Management (IAM) 角色授予代管 Kafka 服務帳戶:

  • Pub/Sub 訂閱者
  • Pub/Sub 檢視者

這些角色可讓連接器從 Pub/Sub 讀取訊息。

控制台

  1. 前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。

    前往「身分與存取權管理」頁面

  2. 選取「包含 Google 提供的角色授予項目」

  3. 找到「Managed Kafka Service Account」(受管理 Kafka 服務帳戶) 列,然後按一下 「Edit principal」(編輯主體)

  4. 按一下「新增其他角色」,然後選取「Pub/Sub 訂閱者」角色。 針對「Pub/Sub 檢視者」角色重複執行這個步驟。

  5. 按一下 [儲存]

如要進一步瞭解如何授予角色,請參閱「使用控制台授予 IAM 角色」。

gcloud

如要將 IAM 角色授予服務帳戶,請執行 gcloud projects add-iam-policy-binding 指令。

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --role=roles/pubsub.subscriber

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --role=roles/pubsub.viewer

更改下列內容:

  • PROJECT_ID:專案 ID
  • PROJECT_NUMBER:您的專案編號

如要找出專案編號,請使用 gcloud projects describe 指令。

建立 Pub/Sub 來源連接器

在本步驟中,您要建立 Pub/Sub 來源連接器。這個連接器會從 Pub/Sub 讀取訊息,並寫入 Kafka 主題。

控制台

  1. 前往「Managed Service for Apache Kafka」>「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下 Connect 叢集名稱。

  3. 按一下 「建立連接器」

  4. 在「Connector name」(連接器名稱) 中輸入字串。範例:pubsub-source

  5. 在「連接器外掛程式」清單中,選取「Pub/Sub Source」。

  6. 在「Cloud Pub/Sub subscription」(Cloud Pub/Sub 訂閱項目),選取建立 Pub/Sub 主題時建立的預設 Pub/Sub。

  7. 在「Kafka 主題」中,選取您先前建立的 Kafka 主題。

  8. 點選「建立」

gcloud

如要建立 Pub/Sub 來源連接器,請執行 gcloud managed-kafka connectors create 指令。

gcloud managed-kafka connectors create PUBSUB_CONNECTOR_NAME \
  --connect-cluster=CONNECT_CLUSTER \
  --location=REGION \
  --configs=connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector,\
cps.project=PROJECT_ID,\
cps.streamingPull.enabled=true,\
cps.subscription=SUBSCRIPTION_ID,\
kafka.topic=KAFKA_TOPIC_NAME,\
key.converter=org.apache.kafka.connect.storage.StringConverter,\
tasks.max=3,\
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

更改下列內容:

  • PUBSUB_CONNECTOR_NAME:連接器名稱,例如 pubsub-source-connector
  • CONNECT_CLUSTER:Connect 叢集名稱
  • REGION:您建立 Connect 叢集的區域
  • PROJECT_ID:專案 ID
  • KAFKA_TOPIC_NAME:Kafka 主題的名稱
  • SUBSCRIPTION_ID:Pub/Sub 訂閱項目名稱

查看結果

如要查看結果,請將一些訊息發布至 Pub/Sub。

控制台

  1. 在 Google Cloud 控制台,依序前往「Pub/Sub」>「Topics」(主題) 頁面。

    前往「主題」

  2. 在主題清單中,按一下 Pub/Sub 主題的名稱。

  3. 按一下「訊息」

  4. 按一下「發布訊息」

  5. 在「Number of messages」(訊息數) 中輸入 10

  6. 在「Message body」(訊息內文) 中輸入 {"name": "Alice", "customer_id": 1}

  7. 按一下「發布」

gcloud

如要將訊息發布至 Pub/Sub 主題,請使用 gcloud pubsub topics publish 指令。

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

TOPIC_ID 替換為 Pub/Sub 主題名稱。

現在可以從 Kafka 主題取用訊息。詳情請參閱「使用 CLI 產生及接收訊息」。

清除所用資源

為避免因為本教學課程所用資源,導致系統向 Google Cloud 帳戶收取費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。

控制台

  1. 刪除 Pub/Sub 主題。

    1. 前往「Pub/Sub」>「主題」頁面。

      前往「主題」

    2. 選取主題,然後按一下「刪除」

  2. 刪除 Pub/Sub 訂閱項目。

    1. 前往「Pub/Sub」>「訂閱項目」頁面。

      前往「訂閱項目」頁面

    2. 選取以主題建立的訂閱項目,然後按一下「Delete」(刪除)

  3. 刪除 Connect 叢集。

    1. 前往「Managed Service for Apache Kafka」>「Connect Clusters」(連結叢集) 頁面。

      前往「Connect Clusters」(連結叢集)

    2. 選取 Connect 叢集,然後按一下「刪除」

  4. 刪除 Kafka 叢集。

    1. 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。

      前往「Clusters」(叢集)

    2. 選取 Kafka 叢集,然後按一下「Delete」(刪除)

gcloud

  1. 如要刪除 Pub/Sub 訂閱項目和主題,請使用 gcloud pubsub subscriptions deletegcloud pubsub topics delete 指令。

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. 如要刪除 Connect 叢集,請使用 gcloud managed-kafka connect-clusters delete 指令。

    gcloud managed-kafka connect-clusters delete CONNECT_CLUSTER \
      --location=REGION --async
    
  3. 如要刪除 Kafka 叢集,請使用 gcloud managed-kafka clusters delete 指令。

    gcloud managed-kafka clusters delete KAFKA_CLUSTER \
      --location=REGION --async
    

後續步驟