本頁面說明如何在 Dataflow 管道中,使用 Google Cloud Managed Service for Apache Kafka 做為來源或接收器。
您可以採用下列任一做法:
需求條件
在專案中啟用 Cloud Storage、Dataflow 和 Managed Service for Apache Kafka API。請參閱「啟用 API」一文,或執行下列 Google Cloud CLI 指令:
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.comDataflow worker 服務帳戶必須具備代管 Kafka 用戶端 (
roles/managedkafka.client) Identity and Access Management (IAM) 角色。Dataflow worker VM 必須具備 Kafka 啟動伺服器的網路存取權。詳情請參閱「設定 Managed Service for Apache Kafka 網路」。
取得啟動伺服器位址
如要執行連線至 Managed Service for Apache Kafka 叢集的管道,請先取得叢集的啟動伺服器位址。設定管道時,您需要這個地址。
您可以使用 Google Cloud 控制台或 Google Cloud CLI,方法如下:
控制台
前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。
按一下叢集名稱。
按一下「Configurations」(設定) 分頁標籤。
從「Bootstrap URL」(啟動網址) 複製啟動伺服器位址。
gcloud
使用 managed-kafka clusters describe 指令。
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
更改下列內容:
- CLUSTER_ID:叢集 ID 或名稱
- LOCATION:叢集位置
詳情請參閱「查看 Managed Service for Apache Kafka 叢集」。
搭配使用 Managed Service for Apache Kafka 與 Dataflow 範本
Google 提供多個可從 Apache Kafka 讀取資料的 Dataflow 範本:
這些範本可與 Managed Service for Apache Kafka 搭配使用。如果其中一個符合您的用途,建議使用該函式庫,不要自行編寫自訂管道程式碼。
控制台
前往「Dataflow」>「Jobs」(工作) 頁面:
按一下「Create job from template」(依據範本建立工作)。
在「Job name」(工作名稱) 中,輸入工作名稱。
從「Dataflow」範本下拉式選單中,選取要執行的範本。
在「Kafka 啟動伺服器」方塊中,輸入啟動伺服器位址。
在「Kafka topic」(Kafka 主題) 方塊中,輸入主題名稱。
在「Kafka authentication mode」部分,選取「APPLICATION_DEFAULT_CREDENTIALS」。
在「Kafka message format」(Kafka 訊息格式) 部分,選取 Apache Kafka 訊息的格式。
視需要輸入其他參數。每個範本支援的參數都有記錄。
執行工作。
gcloud
使用 gcloud dataflow jobs run 指令。
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
更改下列內容:
- JOB_NAME:工作名稱
- TEMPLATE_FILE:Cloud Storage 中範本檔案的位置
- REGION_NAME:要部署作業的區域
- PROJECT_NAME:您 Google Cloud 專案的名稱
- LOCATION:叢集位置
- CLUSTER_ID:叢集 ID 或名稱
- TOPIC:Kafka 主題的名稱
如果是其他範本 (例如 Kafka 至 Apache Iceberg YAML 範本),則必須將 bootstrapServers 參數設為叢集的啟動位址。此外,您必須設定 consumerConfigUpdates 參數來設定驗證,如下列縮短的指令列所示:
gcloud
gcloud dataflow flex-template run "kafka-to-iceberg-yaml-job" \
--project "PROJECT_NAME" \
--region "REGION_NAME" \
--template-file-gcs-location "gs://REGION_NAME/templates/flex/Kafka_To_Iceberg_Yaml" \
--parameters "bootstrapServers=BOOTSTRAP_ADDRESS" \
--parameters "consumerConfigUpdates='{\"security.protocol\": \"SASL_SSL\",\"sasl.mechanism\": \"OAUTHBEARER\",\"sasl.login.callback.handler.class\": \"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler\",\"sasl.jaas.config\": \"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;\"}'"
將 BOOTSTRAP_ADDRESS 替換為 Kafka 叢集的啟動位址。如需完整指令列,請參閱範本 README 檔案。
搭配 Beam 管道使用 Managed Service for Apache Kafka
本節說明如何使用 Apache Beam SDK 建立及執行 Dataflow 管道,並連線至 Managed Service for Apache Kafka。
在大多數情況下,請使用受管理 I/O 轉換做為 Kafka 來源或接收器。如需更進階的效能調整,請考慮使用 KafkaIO 連接器。如要進一步瞭解使用受管理 I/O 的優點,請參閱「
Dataflow 受管理 I/O」。
需求條件
Kafka 用戶端 3.6.0 以上版本。
Apache Beam SDK 2.61.0 以上版本。
啟動 Dataflow 工作的機器必須具備 Apache Kafka 啟動伺服器的網路存取權。舉例來說,您可以從可存取叢集所在虛擬私有雲的 Compute Engine 執行個體啟動工作。
建立工作的主體必須具備下列 IAM 角色:
- 代管 Kafka 用戶端 (
roles/managedkafka.client),存取 Apache Kafka 叢集。 - 服務帳戶使用者 (
roles/iam.serviceAccountUser) 角色,以 Dataflow 工作站服務帳戶身分執行作業。 - Storage 管理員 (
roles/storage.admin) 將工作檔案上傳至 Cloud Storage。 - Dataflow 管理員 (
roles/dataflow.admin) 建立工作。
如果您從 Compute Engine 執行個體啟動工作,可以將這些角色授予附加至 VM 的服務帳戶。詳情請參閱「建立採使用者代管服務帳戶的 VM」。
您也可以在建立作業時,使用應用程式預設憑證 (ADC) 和服務帳戶模擬。
- 代管 Kafka 用戶端 (
設定代管 I/O
如果管道使用 Managed I/O for Apache Kafka,請設定下列設定選項,向 Managed Service for Apache Kafka 進行驗證:
"security.protocol":"SASL_SSL""sasl.mechanism":"OAUTHBEARER""sasl.login.callback.handler.class":"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler""sasl.jaas.config":"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
以下範例說明如何為 Managed Service for Apache Kafka 設定代管 I/O:
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
設定 KafkaIO 連接器
以下範例說明如何為 Managed Service for Apache Kafka 設定 KafkaIO 連接器:
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
後續步驟
- 進一步瞭解 Managed Service for Apache Kafka。
- 將資料從 Managed Service for Apache Kafka 寫入 BigQuery。
- 從 Apache Kafka 讀取資料到 Dataflow。
- 從 Dataflow 寫入 Apache Kafka。