Cloud Storage 接收器連接器可將 Kafka 主題的資料串流至 Cloud Storage bucket。這項功能有助於以經濟實惠且可擴充的方式,儲存及處理大量資料。
事前準備
建立 Cloud Storage 接收器連接器前,請確認您已備妥下列項目:
為 Connect 叢集建立 Managed Service for Apache Kafka 叢集。這是與 Connect 叢集相關聯的主要 Kafka 叢集。這也是構成連接器管道一端的來源叢集。
建立 Connect 叢集,用於代管 Cloud Storage 接收器連接器。
建立 Cloud Storage bucket,用來儲存從 Kafka 串流的資料。
在來源叢集中建立及設定 Kafka 主題。資料會從這個 Kafka 主題移至目的地 Cloud Storage bucket。
必要角色和權限
如要取得建立 Cloud Storage Sink 連接器所需的權限,請要求管理員授予您專案的受管理 Kafka 連接器編輯者 (roles/managedkafka.connectorEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
這個預先定義的角色具備建立 Cloud Storage 接收器連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:
所需權限
如要建立 Cloud Storage Sink 連接器,必須具備下列權限:
-
在上層 Connect 叢集上授予建立連接器的權限:
managedkafka.connectors.create
如要進一步瞭解「Managed Kafka Connector Editor」角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。
如果 Managed Service for Apache Kafka 叢集與 Connect 叢集位於同一個專案,則不需要其他權限。如果 Connect 叢集位於其他專案,請參閱「在其他專案中建立 Connect 叢集」。
授予寫入 Cloud Storage bucket 的權限
Connect 叢集服務帳戶 (格式為 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com) 必須具備下列 Cloud Storage 權限:
storage.objects.createstorage.objects.delete
如要這麼做,請在包含 Cloud Storage 值區的專案中,將「Storage 物件使用者」 (roles/storage.objectUser) 角色授予 Connect 叢集服務帳戶。
Cloud Storage 接收器連接器的運作方式
Cloud Storage 接收器連接器會從一或多個 Kafka 主題提取資料,並將資料寫入單一 Cloud Storage bucket 中的物件。
以下詳細說明 Cloud Storage Sink 連接器如何複製資料:
連接器會取用來源叢集內一或多個 Kafka 主題的訊息。
連接器會將資料寫入您在連接器設定中指定的目標 Cloud Storage 值區。
連接器會參照連接器設定中的特定屬性,在將資料寫入 Cloud Storage bucket 時格式化資料。根據預設,輸出檔案為 CSV 格式。您可以設定
format.output.type屬性,指定不同的輸出格式,例如 JSON。連接器也會為寫入 Cloud Storage bucket 的檔案命名。您可以使用
file.name.prefix和file.name.template屬性自訂檔案名稱。舉例來說,您可以在檔案名稱中加入 Kafka 主題名稱或訊息金鑰。Kafka 記錄包含三個元件:標頭、鍵、值。
您可以將
format.output.fields設為包含標頭,在輸出檔案中加入標頭。例如:format.output.fields=value,headers。如要將金鑰納入輸出檔案,請將
format.output.fields設為包含key。例如:format.output.fields=key,value,headers。您也可以在
file.name.template屬性中加入key,依鍵分組記錄。
根據預設,您可以將值納入輸出檔案,因為
format.output.fields預設為value。連接器會將轉換及格式化後的資料寫入指定的 Cloud Storage 值區。
如果您使用
file.compression.type屬性設定檔案壓縮,連接器會壓縮儲存在 Cloud Storage bucket 中的檔案。轉換器設定受
format.output.type屬性限制。舉例來說,如果
format.output.type設為csv,則鍵轉換器必須是org.apache.kafka.connect.converters.ByteArrayConverter或org.apache.kafka.connect.storage.StringConverter,值轉換器必須是org.apache.kafka.connect.converters.ByteArrayConverter。如果
format.output.type設為json,即使value.converter.schemas.enable屬性為 true,值和鍵結構定義也不會寫入輸出檔案中的資料。
tasks.max屬性可控制連接器的平行處理層級。增加tasks.max可提高總處理量,但實際的平行處理能力會受限於 Kafka 主題中的分區數量。
Cloud Storage 接收器連接器的屬性
建立 Cloud Storage 接收器連接器時,請指定下列屬性。
連接器名稱
連接器的名稱或 ID。如要查看資源命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 名稱無法變更。
連接器外掛程式類型
在Google Cloud 控制台中,選取「Cloud Storage Sink」做為連接器外掛程式類型。如果您未使用使用者介面設定連接器,則必須指定連接器類別。
主題
連接器從中取用訊息的 Kafka 主題。
您可以指定一或多個主題,也可以使用規則運算式比對多個主題。例如,topic.* 可比對所有以「topic」開頭的主題。這些主題必須位於與 Connect 叢集相關聯的 Managed Service for Apache Kafka 叢集內。
Cloud Storage 值區
選擇或建立儲存資料的 Cloud Storage bucket。
設定
本節可讓您為 Cloud Storage Sink 連接器指定其他連接器專屬的設定屬性。
由於 Kafka 主題中的資料格式可能不一,例如 Avro、JSON 或原始位元組,因此設定的關鍵部分是指定轉換器。轉換器會將 Kafka 主題中使用的格式資料,轉換為 Kafka Connect 的標準內部格式。接著,Cloud Storage Sink 連接器會取得這項內部資料,並轉換成 Cloud Storage bucket 要求的格式,然後再寫入資料。
如要進一步瞭解轉換器在 Kafka Connect 中的角色、支援的轉換器類型和常見設定選項,請參閱轉換器。
以下是 Cloud Storage 接收器連接器專屬的一些設定:
gcs.credentials.default:是否要從執行環境自動探索 Google Cloud 憑證。必須設為「true」。gcs.bucket.name:指定要寫入資料的 Cloud Storage bucket 名稱。必須設定。file.compression.type:設定儲存在 Cloud Storage bucket 中的檔案壓縮類型。例如gzip、snappy、zstd和none。預設值為none。file.name.prefix:要加到 Cloud Storage bucket 中每個檔案名稱的前置字元。預設值為空白。format.output.type:用於將資料寫入 Cloud Storage 輸出檔案的資料格式類型。支援的值為:csv、json、jsonl和parquet。預設值為csv。
如需這個連接器可用的特定設定屬性清單,請參閱「Cloud Storage Sink 連接器設定」。
建立 Cloud Storage 接收器連接器
建立連接器前,請先參閱「Cloud Storage 接收器連接器屬性」說明文件。
控制台
前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。
按一下要建立連接器的 Connect 叢集。
系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。
按一下「Create connector」(建立連接器)。
系統會顯示「建立 Kafka 連接器」頁面。
輸入連接器名稱字串。
如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。
在「Connector plugin」(連接器外掛程式) 部分,選取「Cloud Storage Sink」(Cloud Storage 接收器)。
指定可串流資料的主題。
選擇要儲存資料的「Storage Bucket」(儲存空間值區)。
(選用) 在「設定」部分中,調整其他設定。
選取「任務重新啟動政策」。詳情請參閱「工作重新啟動政策」。
點選「建立」。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
執行
gcloud managed-kafka connectors create指令:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE更改下列內容:
CONNECTOR_ID:連接器的 ID 或名稱。 如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 連接器名稱無法變更。
LOCATION:建立連接器的位置。這個位置必須與您建立 Connect 叢集的位置相同。
CONNECT_CLUSTER_ID:建立連接器的 Connect 叢集 ID。
CONFIG_FILE:BigQuery Sink 連接器的 YAML 設定檔路徑。
以下是 Cloud Storage Sink 連接器的設定檔範例:
connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector" tasks.max: "1" topics: "GMK_TOPIC_ID" gcs.bucket.name: "GCS_BUCKET_NAME" gcs.credentials.default: "true" format.output.type: "json" name: "GCS_SINK_CONNECTOR_ID" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" key.converter: "org.apache.kafka.connect.storage.StringConverter"更改下列內容:
GMK_TOPIC_ID:Managed Service for Apache Kafka 主題的 ID,資料會從該主題流向 Cloud Storage 接收器連接器。
GCS_BUCKET_NAME:做為管線接收器的 Cloud Storage bucket 名稱。
GCS_SINK_CONNECTOR_ID:Cloud Storage Sink 連接器的 ID 或名稱。如要查看連接器命名準則,請參閱Managed Service for Apache Kafka 資源命名指南。連接器名稱無法變更。
Terraform
您可以使用 Terraform 資源建立連接器。
如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。
Go
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。
Java
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。
Python
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。
建立連接器後,您可以編輯、刪除、暫停、停止或重新啟動連接器。