BigQuery 接收器連接器可讓您將 Kafka 的資料串流至 BigQuery,在 BigQuery 中即時擷取及分析資料。BigQuery 接收器連接器會取用一或多個 Kafka 主題的記錄,並將資料寫入單一 BigQuery 資料集內的一或多個資料表。
事前準備
建立 BigQuery Sink 連接器前,請確認您具備下列項目:
為 Connect 叢集建立 Managed Service for Apache Kafka 叢集。這個叢集是與 Connect 叢集相關聯的主要 Kafka 叢集。這個叢集也是來源叢集,構成 BigQuery Sink 連接器管道的一端。
建立 Connect 叢集,代管 BigQuery Sink 連接器。
建立 BigQuery 資料集,儲存從 Kafka 串流的資料。
在來源叢集中建立及設定 Kafka 主題。資料會從這個 Kafka 主題移至目的地 BigQuery 資料集。
必要角色和權限
如要取得建立 BigQuery Sink 連接器所需的權限,請要求管理員授予您專案的「Managed Kafka Connector 編輯者」 (roles/managedkafka.connectorEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
這個預先定義的角色具備建立 BigQuery Sink 連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:
所需權限
如要建立 BigQuery Sink 連接器,必須具備下列權限:
-
在上層 Connect 叢集上授予建立連接器的權限:
managedkafka.connectors.create
如要進一步瞭解「Managed Kafka Connector Editor」角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。
如果 Managed Service for Apache Kafka 叢集與 Connect 叢集位於同一個專案,則不需要其他權限。如果叢集位於其他專案,請參閱「在其他專案中建立 Connect 叢集」。
授予寫入 BigQuery 資料表的權限
Connect 叢集服務帳戶 (格式為 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com) 必須具備寫入 BigQuery 資料表的權限。如要這麼做,請在包含 BigQuery 資料表的專案中,將「BigQuery 資料編輯者」 (roles/bigquery.dataEditor) 角色授予 Connect 叢集服務帳戶。
BigQuery Sink 連接器的結構定義
BigQuery 接收器連接器會使用設定的值轉換器 (value.converter),將 Kafka 記錄值剖析為欄位。然後將欄位寫入 BigQuery 資料表中同名的資料欄。
連接器需要結構定義才能運作。您可以透過下列方式提供結構定義:
- 以訊息為準的結構定義:結構定義會納入每則訊息。
- 以資料表為準的結構定義:連接器會根據 BigQuery 資料表結構定義推論訊息結構定義。
- 結構定義儲存庫:連接器會從結構定義儲存庫讀取結構定義,例如 Managed Service for Apache Kafka 結構定義儲存庫 (預覽版)。
以下各節將說明這些選項。
以訊息為基礎的結構定義
在這個模式中,每筆 Kafka 記錄都包含 JSON 結構定義。連接器會使用結構定義,將記錄資料寫入為 BigQuery 資料表列。
如要使用以訊息為準的結構定義,請在連接器上設定下列屬性:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true
Kafka 記錄值範例:
{
"schema": {
"type": "struct",
"fields": [
{
"field": "user",
"type": "string",
"optional": false
},
{
"field": "age",
"type": "int64",
"optional": false
}
]
},
"payload": {
"user": "userId",
"age": 30
}
}
如果目的地資料表已存在,BigQuery 資料表結構定義必須與內嵌訊息結構定義相容。如果
autoCreateTables=true,連接器會視需要自動建立目的地資料表。詳情請參閱「建立資料表」。
如要讓連接器在訊息結構定義變更時更新 BigQuery 資料表結構定義,請將 allowNewBigQueryFields、allowSchemaUnionization 或 allowBigQueryRequiredFieldRelaxation 設為 true。
以資料表為準的結構定義
在此模式下,Kafka 記錄包含純 JSON 資料,沒有明確的結構定義。連接器會根據目的地資料表推論結構定義。
需求條件:
- BigQuery 資料表必須已存在。
- Kafka 記錄資料必須與資料表結構定義相容。
- 這個模式不支援根據傳入的訊息動態更新結構定義。
如要使用以表格為基礎的結構定義,請在連接器上設定下列屬性:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=falsebigQueryPartitionDecorator=false
如果 BigQuery 資料表使用時間分區,且分區頻率為每日,則 bigQueryPartitionDecorator 可以是 true。否則,請將這個屬性設為 false。
Kafka 記錄值範例:
{
"user": "userId",
"age": 30
}
結構定義儲存庫
在這個模式中,每筆 Kafka 記錄都包含 Apache Avro 資料,且訊息結構定義會儲存在結構定義儲存庫中。
如要搭配結構定義登錄使用 BigQuery Sink 連接器,請在連接器上設定下列屬性:
value.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url=SCHEMA_REGISTRY_URL
將 SCHEMA_REGISTRY_URL 替換為結構定義登錄的網址。
如要搭配 Managed Service for Apache Kafka 結構定義登錄使用連接器,請設定下列屬性:
value.converter.bearer.auth.credentials.source=GCP
詳情請參閱「Use Kafka Connect with schema registry」。
BigQuery 中的 Apache Iceberg 專用 BigLake 資料表
BigQuery Sink 連接器支援BigQuery 中的 Apache Iceberg 專用 BigLake 資料表 (以下簡稱 BigQuery 中的 BigLake Iceberg 資料表) 做為接收器目標。
BigQuery 中的 BigLake Iceberg 資料表是 Google Cloud上建構開放格式湖倉的基礎。BigQuery 中的 BigLake Iceberg 資料表提供與 BigQuery 資料表相同的全代管體驗,但會使用 Parquet 將資料儲存在客戶擁有的儲存空間 bucket 中,以便與 Apache Iceberg 開放式資料表格式互通。
如要瞭解如何建立 Apache Iceberg 資料表,請參閱「建立 Apache Iceberg 資料表」。
建立 BigQuery 接收器連接器
控制台
前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。
按一下要建立連接器的 Connect 叢集。
按一下「Create connector」(建立連接器)。
輸入連接器名稱字串。
如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。
針對「連接器外掛程式」,選取「BigQuery 接收器」。
在「主題」部分,指定要讀取資料的 Kafka 主題。您可以指定主題清單或規則運算式,比對主題名稱。
方法 1:選擇「選取 Kafka 主題清單」。在「Kafka topics」(Kafka 主題) 清單中,選取一或多個主題。然後點選「OK」。
方法 2:選擇「使用主題規則運算式」。在「主題規則運算式」欄位中,輸入規則運算式。
按一下「資料集」,然後指定 BigQuery 資料集。您可以選擇現有資料集或建立新資料集。
選用:在「設定」方塊中,新增設定屬性或編輯預設屬性。詳情請參閱「設定連接器」。
選取「任務重新啟動政策」。詳情請參閱「工作重新啟動政策」。
點選「建立」。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
執行
gcloud managed-kafka connectors create指令:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE更改下列內容:
CONNECTOR_ID:連接器的 ID 或名稱。 如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 連接器名稱無法變更。
LOCATION:建立連接器的位置。這個位置必須與您建立 Connect 叢集的位置相同。
CONNECT_CLUSTER_ID:建立連接器的 Connect 叢集 ID。
CONFIG_FILE:BigQuery Sink 連接器的 YAML 設定檔路徑。
以下是 BigQuery Sink 連接器的設定檔範例:
name: "BQ_SINK_CONNECTOR_ID" project: "GCP_PROJECT_ID" topics: "GMK_TOPIC_ID" tasks.max: 3 connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" key.converter: "org.apache.kafka.connect.storage.StringConverter" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" defaultDataset: "BQ_DATASET_ID"更改下列內容:
BQ_SINK_CONNECTOR_ID:BigQuery Sink 連接器的 ID 或名稱。如要查看連接器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 連接器名稱無法變更。
GCP_PROJECT_ID:BigQuery 資料集所在的 Google Cloud專案 ID。
GMK_TOPIC_ID:資料從中流向 BigQuery 接收器連接器的 Managed Service for Apache Kafka 主題 ID。
BQ_DATASET_ID:做為管道接收器的 BigQuery 資料集 ID。
Terraform
您可以使用 Terraform 資源建立連接器。
如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。
Go
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。
Java
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。
Python
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。
建立連接器後,您可以編輯、刪除、暫停、停止或重新啟動連接器。
設定連接器
本節說明您可以在連接器上設定的部分設定屬性。如需這個連接器專屬屬性的完整清單,請參閱 BigQuery Sink 連接器設定。
資料表名稱
根據預設,連接器會使用主題名稱做為 BigQuery 資料表名稱。如要使用其他資料表名稱,請設定 topic2TableMap 屬性,格式如下:
topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...
建立資料表
如果目的地資料表不存在,BigQuery Sink 連接器可以建立這些資料表。
如果
autoCreateTables=true,連接器會嘗試建立不存在的 BigQuery 資料表。這是預設行為。如果為
autoCreateTables=false,連接器不會建立任何資料表。如果目的地資料表不存在,就會發生錯誤。
如果 autoCreateTables 為 true,您可以使用下列設定屬性,更精細地控管連接器建立及設定新資料表的方式:
allBQFieldsNullableclusteringPartitionFieldNamesconvertDoubleSpecialValuespartitionExpirationMssanitizeFieldNamessanitizeTopicstimestampPartitionFieldName
如要瞭解這些屬性,請參閱「BigQuery Sink 連接器設定」。
Kafka 中繼資料
您可以分別設定 kafkaDataFieldName 和 kafkaKeyFieldName 欄位,將 Kafka 中的其他資料 (例如中繼資料和鍵值資訊) 對應至 BigQuery 資料表。中繼資料資訊的例子包括 Kafka 主題、分割區、偏移和插入時間。