建立 Confluent Cloud 匯入主題

您可以使用 Confluent Cloud 匯入主題,從Confluent Cloud 持續擷取資料,做為外部來源並匯入 Pub/Sub。然後將資料串流至 Pub/Sub 支援的任何目的地。

本文說明如何建立及管理 Confluent Cloud 匯入主題。如要建立標準主題,請參閱「建立標準主題」。

如要進一步瞭解匯入主題,請參閱「關於匯入主題」。

事前準備

必要角色和權限

如要取得建立及管理 Confluent Cloud 匯入主題所需的權限,請要求管理員在主題或專案中授予您「Pub/Sub 編輯者 」(roles/pubsub.editor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備建立及管理 Confluent Cloud 匯入主題所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要建立及管理 Confluent Cloud 匯入主題,您必須具備下列權限:

  • 建立匯入主題: pubsub.topics.create
  • 刪除匯入主題: pubsub.topics.delete
  • 取得匯入主題: pubsub.topics.get
  • 列出匯入主題: pubsub.topics.list
  • 發布至匯入主題: pubsub.topics.publish and pubsub.serviceAgent
  • 更新匯入主題: pubsub.topics.update
  • 取得匯入主題的身分與存取權管理政策: pubsub.topics.getIamPolicy
  • 設定匯入主題的 IAM 政策 pubsub.topics.setIamPolicy

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以在專案層級和個別資源層級設定存取權控管。

設定聯合身分,以便存取 Confluent Cloud

Workload Identity Federation 可讓 Google Cloud 服務存取 Google Cloud外部執行的工作負載。透過身分聯盟,您不需要維護或傳遞憑證,即可存取其他雲端中的資源。 Google Cloud 您可以改用工作負載本身的 ID 向 Google Cloud 進行驗證,並存取資源。

在 Google Cloud中建立服務帳戶

這個步驟可以省略。如果您已有服務帳戶,可以在這個程序中使用該帳戶,不必建立新的服務帳戶。如果您使用現有服務帳戶,請前往「記錄服務帳戶的專屬 ID」進行下一個步驟。

如果是 Confluent Cloud 匯入主題,Pub/Sub 會使用服務帳戶做為身分,從 Confluent Cloud 存取資源。

如要進一步瞭解如何建立服務帳戶,包括必要條件、必要角色和權限,以及命名規範,請參閱「建立服務帳戶」。建立服務帳戶後,您可能需要等待 60 秒以上,才能使用該服務帳戶。這種行為的發生,是因為讀取作業最終會保持一致性,因此可能需要一段時間,新服務帳戶才會顯示。

記下服務帳戶專屬 ID

您需要服務帳戶的專屬 ID,才能在 Confluent Cloud 控制台中設定身分識別提供者和集區。

  1. 前往 Google Cloud 控制台的「服務帳戶」詳細資料頁面。

    前往服務帳戶

  2. 按一下您剛建立的服務帳戶,或打算使用的服務帳戶。

  3. 在「服務帳戶詳細資料」頁面中,記下專屬 ID 號碼。

    您需要這組 ID,才能在工作流程中於 Confluent Cloud 控制台設定身分識別提供者和集區

為 Pub/Sub 服務帳戶新增服務帳戶憑證建立者角色

服務帳戶憑證建立者角色 (roles/iam.serviceAccountTokenCreator) 可讓主體為服務帳戶建立短期憑證。這些權杖或憑證可用於模擬服務帳戶。

如要進一步瞭解服務帳戶模擬功能,請參閱「服務帳戶模擬功能」。

您也可以在這個程序中新增 Pub/Sub 發布者角色 (roles/pubsub.publisher)。如要進一步瞭解角色和新增原因,請參閱「將 Pub/Sub 發布者角色新增至 Pub/Sub 服務帳戶」。

  1. 前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。

    前往「IAM」(身分與存取權管理) 頁面

  2. 按一下「包含 Google提供的角色授予項目」核取方塊。

  3. 找出格式為 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服務帳戶。

  4. 按一下這個服務帳戶的「編輯主體」按鈕。

  5. 視需要按一下「新增其他角色」

  6. 搜尋並點選「服務帳戶憑證建立者角色」 (roles/iam.serviceAccountTokenCreator)。

  7. 按一下 [儲存]

在 Confluent Cloud 中建立身分識別提供者

如要向 Confluent Cloud 驗證身分,Google Cloud 服務帳戶需要身分集區。您必須先在 Confluent Cloud 中建立身分識別提供者。

如要進一步瞭解如何在 Confluent Cloud 中建立身分識別提供者,請參閱「新增 OAuth/OIDC 身分識別提供者」頁面。

  1. 登入 Confluent Cloud 控制台

  2. 在選單中,按一下「帳戶與存取權」

  3. 按一下「Workload identities」(工作負載身分)

  4. 按一下「新增提供者」

  5. 點選「OAuth/OIDC」,然後點選「下一步」

  6. 點選「Other OIDC Provider」(其他 OIDC 提供者),然後點選「Next」(下一步)

  7. 提供身分識別提供者的名稱和用途說明。

  8. 按一下「顯示進階設定」

  9. 在「核發者 URI」欄位中輸入 https://accounts.google.com

  10. 在「JWKS URI」欄位中輸入 https://www.googleapis.com/oauth2/v3/certs

  11. 按一下「驗證並儲存」

在 Confluent Cloud 中建立身分集區並授予適當角色

您必須在身分識別設定檔下建立身分識別集區,並授予必要角色,允許 Pub/Sub 服務帳戶進行驗證,以及從 Confluent Cloud Kafka 主題讀取資料。

請先在 Confluent Cloud 中建立叢集,再繼續建立身分集區。

如要進一步瞭解如何建立身分集區,請參閱「透過 OAuth/OIDC 識別資訊提供者使用身分集區」頁面。

  1. 登入 Confluent Cloud 控制台

  2. 在選單中,按一下「帳戶與存取權」

  3. 按一下「Workload identities」(工作負載身分)

  4. 按一下您在「在 Confluent Cloud 中建立識別資訊提供者」中建立的識別資訊提供者。

  5. 按一下「新增集區」

  6. 提供身分集區的名稱和說明。

  7. 將「身分識別聲明」設為 claims

  8. 在「設定篩選條件」下方,按一下「進階」分頁標籤。請輸入以下程式碼:

    claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
    

    <SERVICE_ACCOUNT_UNIQUE_ID> 替換為服務帳戶的唯一 ID,該 ID 位於「記錄服務帳戶唯一 ID」一節中。

  9. 點選「下一步」

  10. 按一下「新增權限」。然後點按「下一步」

  11. 在相關叢集中,按一下「新增角色指派」

  12. 按一下「操作員」角色,然後按一下「新增」

    這個角色會授予 Pub/Sub 權限。服務帳戶可存取包含 Confluent Kafka 主題的叢集,該主題要擷取至 Pub/Sub。

  13. 在叢集下方,按一下「主題」。然後按一下「新增角色指派」

  14. 選取「DeveloperRead」DeveloperRead角色。

  15. 按一下適當選項,然後指定主題或前置字元。例如「特定主題」、「前置字串規則」或「所有主題」

  16. 按一下「新增」。

  17. 點選「下一步」

  18. 按一下「驗證並儲存」

將 Pub/Sub 發布者角色新增至 Pub/Sub 主體

如要啟用發布功能,您必須將發布者角色指派給 Pub/Sub 服務帳戶,Pub/Sub 才能發布至 Confluent Cloud 匯入主題。

為 Pub/Sub 服務帳戶新增 Pub/Sub 服務代理角色

如要允許 Pub/Sub 使用匯入主題專案的發布配額,Pub/Sub 服務代理需要匯入主題專案的 serviceusage.services.use 權限。

如要提供這項權限,建議您將 Pub/Sub 服務代理人角色新增至 Pub/Sub 服務帳戶。

如果 Pub/Sub 服務帳戶沒有 Pub/Sub 服務代理人角色,可以按照下列步驟授予:

  1. 前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。

    前往「IAM」(身分與存取權管理) 頁面

  2. 按一下「包含 Google提供的角色授予項目」核取方塊。

  3. 找出格式為 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服務帳戶。

  4. 按一下這個服務帳戶的「編輯主體」按鈕。

  5. 視需要按一下「新增其他角色」

  6. 搜尋並點選「Pub/Sub Service Agent」角色 (roles/pubsub.serviceAgent)

  7. 按一下 [儲存]

允許從所有主題發布內容

如果您尚未建立任何 Confluent Cloud 匯入主題,請使用這個方法。

  1. 前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。

    前往「IAM」(身分與存取權管理) 頁面

  2. 按一下「包含 Google提供的角色授予項目」核取方塊。

  3. 找出格式為 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服務帳戶。

  4. 按一下這個服務帳戶的「編輯主體」按鈕。

  5. 視需要按一下「新增其他角色」

  6. 搜尋並按一下「Pub/Sub 發布者」角色 (roles/pubsub.publisher)。

  7. 按一下 [儲存]

啟用從單一主題發布的功能

只有在 Confluent Cloud 匯入主題已存在時,才使用這個方法。

  1. 在 Google Cloud 控制台中啟用 Cloud Shell。

    啟用 Cloud Shell

    Google Cloud 主控台底部會開啟一個 Cloud Shell 工作階段,並顯示指令列提示。Cloud Shell 是已安裝 Google Cloud CLI 的殼層環境,並已針對您目前的專案設定好相關值。工作階段可能要幾秒鐘的時間才能初始化。

  2. 執行 gcloud pubsub topics add-iam-policy-binding 指令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    更改下列內容:

    • TOPIC_ID:Confluent Cloud 匯入主題的主題 ID。

    • PROJECT_NUMBER:專案編號。如要查看專案編號,請參閱「識別專案」。

將服務帳戶使用者角色新增至服務帳戶

「服務帳戶使用者」角色 (roles/iam.serviceAccountUser) 包含 iam.serviceAccounts.actAs 權限,可讓主體將服務帳戶附加至 Confluent Cloud 匯入主題的擷取設定,並將該服務帳戶用於同盟身分識別。

  1. 前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。

    前往「IAM」(身分與存取權管理) 頁面

  2. 針對發出建立或更新主題呼叫的主體,按一下「編輯主體」按鈕。

  3. 視需要按一下「新增其他角色」

  4. 搜尋並點選「服務帳戶使用者角色」(roles/iam.serviceAccountUser)。

  5. 按一下 [儲存]

使用 Confluent Cloud 匯入主題

您可以建立新的匯入主題,或編輯現有主題。

注意事項

  • 即使是快速連續建立主題和訂閱項目,也可能導致資料遺失。在免付費期間,主題會存在一段時間。如果在這段時間內有任何資料傳送至主題,這些資料都會遺失。先建立主題和訂閱項目,然後將主題轉換為匯入主題,可確保匯入程序不會遺漏任何訊息。

  • 如要使用相同名稱重新建立現有匯入主題的 Kafka 主題,您無法直接刪除並重新建立 Kafka 主題。這項操作可能會導致 Pub/Sub 的位移管理失效,進而造成資料遺失。如要解決這個問題,請按照下列步驟操作:

    • 刪除 Pub/Sub 匯入主題。
    • 刪除 Kafka 主題。
    • 建立 Kafka 主題。
    • 建立 Pub/Sub 匯入主題。
  • 系統一律會從最早的偏移讀取 Confluent Cloud Kafka 主題的資料。

建立 Confluent Cloud 匯入主題

如要進一步瞭解與主題相關聯的屬性,請參閱「主題的屬性」。

請確認已完成下列程序:

如要建立 Confluent Cloud 匯入主題,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「主題」頁面。

    前往「主題」

  2. 按一下「建立主題」
  3. 在「主題 ID」欄位中,輸入匯入主題的 ID。 如要進一步瞭解如何命名主題,請參閱命名規範
  4. 選取「新增預設訂閱項目」
  5. 選取「啟用擷取」
  6. 選取「Confluent Cloud」做為擷取來源。
  7. 輸入下列詳細資料:
    1. 啟動伺服器:叢集的啟動伺服器,內含要擷取至 Pub/Sub 的 Kafka 主題。格式如下:hostname:port
    2. 叢集 ID:叢集 ID,內含要擷取至 Pub/Sub 的 Kafka 主題。
    3. 主題:要擷取至 Pub/Sub 的 Kafka 主題名稱。
    4. 身分集區 ID:用於向 Confluent Cloud 進行驗證的身分集區 ID。
    5. 服務帳戶:您在「在 Google Cloud 中建立服務帳戶」中建立的服務帳戶。
  8. 按一下「建立主題」

gcloud

  1. 在 Google Cloud 控制台中啟用 Cloud Shell。

    啟用 Cloud Shell

    Google Cloud 主控台底部會開啟一個 Cloud Shell 工作階段,並顯示指令列提示。Cloud Shell 是已安裝 Google Cloud CLI 的殼層環境,並已針對您目前的專案設定好相關值。工作階段可能要幾秒鐘的時間才能初始化。

  2. 執行 gcloud pubsub topics create 指令:
    gcloud pubsub topics create TOPIC_ID 
    --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER
    --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID
    --confluent-cloud-ingestion-topic CONFLUENT_TOPIC
    --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID
    --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    更改下列內容:

    • TOPIC_ID:Pub/Sub 主題的名稱或 ID。
    • CONFLUENT_BOOTSTRAP_SERVER:叢集的啟動伺服器,其中包含要擷取至 Pub/Sub 的 Kafka 主題。格式如下:hostname:port
    • CONFLUENT_CLUSTER_ID:叢集 ID,其中包含要擷取至 Pub/Sub 的 Kafka 主題。
    • CONFLUENT_TOPIC:要擷取至 Pub/Sub 的 Kafka 主題名稱。
    • CONFLUENT_IDENTITY_POOL_ID:用於向 Confluent Cloud 進行驗證的身分集區 ID。
    • PUBSUB_SERVICE_ACCOUNT:您在「在 Google Cloud 中建立服務帳戶」中建立的服務帳戶。

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& bootstrap_server,
   std::string const& cluster_id, std::string const& confluent_topic,
   std::string const& identity_pool_id,
   std::string const& gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
                              ->mutable_confluent_cloud();
  confluent_cloud->set_bootstrap_server(bootstrap_server);
  confluent_cloud->set_cluster_id(cluster_id);
  confluent_cloud->set_topic(confluent_topic);
  confluent_cloud->set_identity_pool_id(identity_pool_id);
  confluent_cloud->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Go

下列範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的操作說明設定 Go 環境。詳情請參閱 Pub/Sub Go API 參考說明文件

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, bootstrapServer, clusterID, confluentTopic, poolID, gcpSA string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"

	// // Confluent Cloud ingestion settings.
	// bootstrapServer := "bootstrap-server"
	// clusterID := "cluster-id"
	// confluentTopic := "confluent-topic"
	// poolID := "identity-pool-id"
	// gcpSA := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	topicpb := &pubsubpb.Topic{
		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_ConfluentCloud_{
				ConfluentCloud: &pubsubpb.IngestionDataSourceSettings_ConfluentCloud{
					BootstrapServer:   bootstrapServer,
					ClusterId:         clusterID,
					Topic:             confluentTopic,
					IdentityPoolId:    poolID,
					GcpServiceAccount: gcpSA,
				},
			},
		},
	}
	topic, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Created topic with Confluent Cloud ingestion: %v\n", topic)
	return nil
}

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithConfluentCloudIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Confluent Cloud ingestion settings.
    String bootstrapServer = "bootstrap-server";
    String clusterId = "cluster-id";
    String confluentTopic = "confluent-topic";
    String identityPoolId = "identity-pool-id";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithConfluentCloudIngestionExample(
        projectId,
        topicId,
        bootstrapServer,
        clusterId,
        confluentTopic,
        identityPoolId,
        gcpServiceAccount);
  }

  public static void createTopicWithConfluentCloudIngestionExample(
      String projectId,
      String topicId,
      String bootstrapServer,
      String clusterId,
      String confluentTopic,
      String identityPoolId,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.ConfluentCloud confluentCloud =
          IngestionDataSourceSettings.ConfluentCloud.newBuilder()
              .setBootstrapServer(bootstrapServer)
              .setClusterId(clusterId)
              .setTopic(confluentTopic)
              .setIdentityPoolId(identityPoolId)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId,
  bootstrapServer,
  clusterId,
  confluentTopic,
  identityPoolId,
  gcpServiceAccount,
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Node.ts

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId: string,
  bootstrapServer: string,
  clusterId: string,
  confluentTopic: string,
  identityPoolId: string,
  gcpServiceAccount: string,
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Python

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bootstrap_server = "your-bootstrap-server"
# cluster_id = "your-cluster-id"
# confluent_topic = "your-confluent-topic"
# identity_pool_id = "your-identity-pool-id"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
            bootstrap_server=bootstrap_server,
            cluster_id=cluster_id,
            topic=confluent_topic,
            identity_pool_id=identity_pool_id,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")

如果發生問題,請參閱「排解 Confluent Cloud 匯入主題的問題」。

編輯 Confluent Cloud Hubs 匯入主題

如要編輯 Confluent Cloud 匯入主題的擷取資料來源設定,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Topics」(主題) 頁面。

    前往「主題」

  2. 按一下 Confluent Cloud 匯入主題。

  3. 在主題詳細資料頁面中,按一下「編輯」

  4. 更新要變更的欄位。

  5. 按一下「Update」

gcloud

  1. 在 Google Cloud 控制台中啟用 Cloud Shell。

    啟用 Cloud Shell

    Google Cloud 主控台底部會開啟一個 Cloud Shell 工作階段,並顯示指令列提示。Cloud Shell 是已安裝 Google Cloud CLI 的殼層環境,並已針對您目前的專案設定好相關值。工作階段可能要幾秒鐘的時間才能初始化。

    為避免遺失匯入主題的設定,請務必在每次更新主題時加入所有設定。如果省略任何項目,Pub/Sub 會將設定重設為原始預設值。

  2. 執行 gcloud pubsub topics update 指令,並加上下列範例中提及的所有旗標:

    gcloud pubsub topics update TOPIC_ID \
       --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
       --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
       --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
       --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
       --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    更改下列內容:

    • TOPIC_ID:Pub/Sub 主題的名稱或 ID。
    • CONFLUENT_BOOTSTRAP_SERVER:叢集的啟動伺服器,其中包含要擷取至 Pub/Sub 的 Kafka 主題。格式如下:hostname:port
    • CONFLUENT_CLUSTER_ID:叢集 ID,其中包含要擷取至 Pub/Sub 的 Kafka 主題
    • CONFLUENT_TOPIC:要擷取至 Pub/Sub 的 Kafka 主題名稱。
    • CONFLUENT_IDENTITY_POOL_ID:用於向 Confluent Cloud 進行驗證的身分集區 ID。
    • CONFLUENT_IDENTITY_POOL_ID:您在「在 Google Cloud 中建立服務帳戶」中建立的服務帳戶。

配額與限制

匯入主題的發布者輸送量會受限於主題的發布配額。詳情請參閱「Pub/Sub 配額與限制」。

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。