Apache Kafka 至 Cloud Storage 範本是串流管道,可從 Google Cloud Managed Service for Apache Kafka 擷取文字資料,並將記錄輸出至 Cloud Storage。
您也可以搭配自行管理或外部 Kafka 使用 Apache Kafka 至 Cloud Storage 範本。
管道相關規定
- 輸出 Cloud Storage 值區必須已存在。
- Apache Kafka 代理程式伺服器必須正在執行,且可從 Dataflow worker 機器連線。
- Apache Kafka 主題必須存在。
Kafka 訊息格式
這個範本支援從 Kafka 讀取下列格式的訊息:
JSON 格式
如要讀取 JSON 訊息,請將 messageFormat 範本參數設為 "JSON"。
Avro 二進位編碼
如要讀取二進位 Avro 訊息,請設定下列範本參數:
messageFormat:"AVRO_BINARY_ENCODING".binaryAvroSchemaPath:Cloud Storage 中 Avro 結構定義檔案的位置。例如:gs://BUCKET_NAME/message-schema.avsc。
如要進一步瞭解 Avro 二進位格式,請參閱 Apache Avro 說明文件中的「二進位編碼」。
以 Confluent 結構定義登錄服務編碼的 Avro
如要讀取 Confluent Schema Registry 編碼的 Avro 訊息,請設定下列範本參數:
messageFormat:"AVRO_CONFLUENT_WIRE_FORMAT".schemaFormat:下列其中一個值:"SINGLE_SCHEMA_FILE":訊息結構定義是在 Avro 結構定義檔案中定義。 在confluentAvroSchemaPath參數中指定結構定義檔案的 Cloud Storage 位置。-
"SCHEMA_REGISTRY":訊息會使用 Confluent Schema Registry 編碼。 在schemaRegistryConnectionUrl參數中指定 Confluent Schema Registry 執行個體的網址,並在schemaRegistryAuthenticationMode參數中指定驗證模式。
如要進一步瞭解這個格式,請參閱 Confluent 說明文件中的「 線路格式」。
輸出檔案格式
輸出檔案格式與輸入 Kafka 訊息的格式相同。舉例來說,如果您為 Kafka 訊息格式選取 JSON,系統會將 JSON 檔案寫入輸出 Cloud Storage 值區。
驗證
Apache Kafka 到 Cloud Storage 範本支援 SASL/PLAIN 驗證,可連線至 Kafka 代理程式。
範本參數
必要參數
- readBootstrapServerAndTopic:要讀取當中輸入內容的 Kafka 主題。
- outputDirectory:寫入輸出檔案的路徑和檔案名稱前置字串,結尾必須為斜線。例如:
gs://your-bucket/your-path/。 - kafkaReadAuthenticationMode:要搭配 Kafka 叢集使用的驗證模式。使用
NONE代表不驗證、SASL_PLAIN代表 SASL/PLAIN 使用者名稱和密碼、SASL_SCRAM_512代表 SASL_SCRAM_512 驗證,以及TLS代表憑證式驗證。APPLICATION_DEFAULT_CREDENTIALS僅適用於 Google Cloud Managed Service for Apache Kafka 叢集,可使用應用程式預設憑證進行驗證。 - messageFormat:要讀取的 Kafka 訊息格式。支援的值為
AVRO_CONFLUENT_WIRE_FORMAT(Confluent Schema Registry 編碼的 Avro)、AVRO_BINARY_ENCODING(純二進位 Avro) 和JSON。預設值為:AVRO_CONFLUENT_WIRE_FORMAT。 - useBigQueryDLQ:如果為 true,系統會將失敗的訊息寫入 BigQuery,並附上額外的錯誤資訊。預設值為:false。
選用參數
- windowDuration:資料寫入 Cloud Storage 的時間間隔/大小。允許的格式為 Ns (以秒為單位例如 5s)、Nm (以分鐘為單位例如 12m)、Nh (以小時為單位 2h)。例如,
5m。預設值為 5 分鐘。 - outputFilenamePrefix:加在每個固定時段檔案的前置字元,例如,
output-。預設值為:output。 - numShards:寫入時產生的輸出資料分割數量上限;資料分割數量越多,寫入 Cloud Storage 的處理量就越高,但處理輸出 Cloud Storage 檔案時,資料分割間的資料匯總費用可能會更高。預設值由 Dataflow 決定。
- enableCommitOffsets:將已處理訊息的偏移量提交至 Kafka。啟用後,重新啟動管道時,系統會盡量減少訊息處理作業的間隔或重複情形。必須指定用戶群組 ID。預設值為:false。
- consumerGroupId:這個管道所屬用戶群組的專屬 ID。如果已啟用「將偏移量提交至 Kafka」,則為必填欄位。預設為空白。
- kafkaReadOffset:在未提交偏移量時讀取訊息的起始點。選取「最早」會從頭開始讀取,選取「最晚」則從最新訊息開始。預設值為:latest。
- kafkaReadUsernameSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配
SASL_PLAIN驗證使用的 Kafka 使用者名稱。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。預設為空白。 - kafkaReadPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配
SASL_PLAIN驗證使用的 Kafka 密碼。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。預設為空白。 - kafkaReadKeystoreLocation:Java KeyStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有與 Kafka 叢集進行驗證時使用的 TLS 憑證和私密金鑰。例如:
gs://your-bucket/keystore.jks。 - kafkaReadTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用於驗證 Kafka 代理程式身分的受信任憑證。
- kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。 - kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案的密碼,以進行 Kafka TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。 - kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java KeyStore (JKS) 檔案中私密金鑰的密碼,以進行 Kafka TLS 驗證。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。 - kafkaReadSaslScramUsernameSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配
SASL_SCRAM驗證使用的 Kafka 使用者名稱。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。 - kafkaReadSaslScramPasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含要搭配
SASL_SCRAM驗證使用的 Kafka 密碼。例如:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。 - kafkaReadSaslScramTruststoreLocation:Java TrustStore (JKS) 檔案的 Google Cloud Storage 路徑,該檔案含有用於驗證 Kafka 代理程式身分的受信任憑證。
- kafkaReadSaslScramTruststorePasswordSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於存取 Java TrustStore (JKS) 檔案的密碼,以進行 Kafka SASL_SCRAM 驗證。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。 - schemaFormat:Kafka 結構定義格式。可以提供為
SINGLE_SCHEMA_FILE或SCHEMA_REGISTRY。如果指定SINGLE_SCHEMA_FILE,請對所有訊息使用 avro 結構定義檔案中提及的結構定義。如果指定SCHEMA_REGISTRY,訊息可以具有單一或多個結構定義。預設值為:SINGLE_SCHEMA_FILE。 - confluentAvroSchemaPath:Google Cloud Storage 路徑,指向用於解碼主題中所有訊息的單一 Avro 結構定義檔案。預設為空白。
- schemaRegistryConnectionUrl:用於管理 Avro 結構定義的 Confluent Schema Registry 執行個體網址,可解碼訊息。預設為空白。
- binaryAvroSchemaPath:用於解碼二進位編碼 Avro 訊息的 Avro 結構定義檔案 Google Cloud Storage 路徑。預設為空白。
- schemaRegistryAuthenticationMode:要搭配結構定義登錄使用的驗證模式。使用
NONE代表不驗證、TLS代表憑證式驗證,以及OAUTH代表 OAuth2 驗證。APPLICATION_DEFAULT_CREDENTIALS僅適用於 Google Cloud Managed Service for Apache Kafka 結構定義登錄,可使用應用程式預設憑證進行驗證。 - schemaRegistryTruststoreLocation:SSL 憑證的位置,用於儲存向 Schema Registry 進行驗證的信任儲存庫。例如:
/your-bucket/truststore.jks。 - schemaRegistryTruststorePasswordSecretId:Secret Manager 中的 SecretId,用於儲存存取信任儲存區中密鑰的密碼。例如:
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version。 - schemaRegistryKeystoreLocation:含有 SSL 憑證和私密金鑰的 KeyStore 位置。例如:
/your-bucket/keystore.jks。 - schemaRegistryKeystorePasswordSecretId:Secret Manager 中的 SecretId,內含用於存取 KeyStore 檔案的密碼。例如
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version。 - schemaRegistryKeyPasswordSecretId:存取用戶端私密金鑰所需的密碼 SecretId,該金鑰儲存在金鑰儲存區中。例如
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version。 - schemaRegistryOauthClientId:用於在 OAUTH 模式中驗證 Schema Registry 用戶端的用戶端 ID。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式的必要項目。
- schemaRegistryOauthClientSecretId:Google Cloud Secret Manager 密鑰 ID,內含用於以 OAUTH 模式驗證 Schema Registry 用戶端的用戶端密鑰。如果訊息格式為 AVRO_CONFLUENT_WIRE_FORMAT,則必須提供這個值。例如:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。 - schemaRegistryOauthScope:用於在 OAUTH 模式中驗證 Schema Registry 用戶端的存取權杖範圍。這個欄位為選填,因為要求可以不傳遞範圍參數。例如:
openid。 - schemaRegistryOauthTokenEndpointUrl:以 HTTP(S) 為基礎的 OAuth/OIDC 識別資訊提供者網址,用於在 OAUTH 模式中驗證 Schema Registry 用戶端。AVRO_CONFLUENT_WIRE_FORMAT 訊息格式的必要項目。
- outputDeadletterTable:用於儲存失敗訊息的 BigQuery 資料表完整名稱。訊息無法到達輸出資料表的所有原因 (例如結構定義不相符、JSON 格式錯誤) 會寫入此資料表。資料表會由範本建立。例如:
your-project-id:your-dataset.your-table-name。
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(依據範本建立工作) 頁面。 前往「依範本建立工作」
- 在「Job name」(工作名稱) 欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1。如需可執行 Dataflow 工作的區域清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Kafka to Cloud Storage template。
- 在提供的參數欄位中輸入參數值。
- 選用:如要從「僅需處理一次」切換至「至少一次串流模式」,請選取「At Least Once」(至少一次)。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputDirectory=gs://STORAGE_BUCKET_NAME,\ useBigQueryDLQ=false
更改下列內容:
PROJECT_ID:您要執行 Dataflow 工作的 Google Cloud 專案 IDJOB_NAME:您選擇的不重複工作名稱REGION_NAME:您要部署 Dataflow 工作的區域,例如us-central1VERSION:您要使用的範本版本您可以使用下列值:
latest,使用範本的最新版本,該版本位於 bucket 中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱,例如
2023-09-12-00_RC00,可使用特定版本的範本,該範本會以巢狀結構存放在 bucket 中相應的依日期命名上層資料夾中:gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC:Apache Kafka 啟動伺服器位址和主題啟動伺服器位址和主題的格式取決於叢集類型:
- Managed Service for Apache Kafka 叢集:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME - 外部 Kafka 叢集:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka 叢集:
STORAGE_BUCKET_NAME:寫入輸出的 Cloud Storage bucket
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "outputDirectory": "gs://STORAGE_BUCKET_NAME", "useBigQueryDLQ": "false" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex", } }
更改下列內容:
PROJECT_ID:您要執行 Dataflow 工作的 Google Cloud 專案 IDJOB_NAME:您選擇的不重複工作名稱LOCATION:您要部署 Dataflow 工作的區域,例如us-central1VERSION:您要使用的範本版本您可以使用下列值:
latest,使用範本的最新版本,該版本位於 bucket 中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱,例如
2023-09-12-00_RC00,可使用特定版本的範本,該範本會以巢狀結構存放在 bucket 中相應的依日期命名上層資料夾中:gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC:Apache Kafka 啟動伺服器位址和主題啟動伺服器位址和主題的格式取決於叢集類型:
- Managed Service for Apache Kafka 叢集:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME - 外部 Kafka 叢集:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka 叢集:
STORAGE_BUCKET_NAME:寫入輸出內容的 Cloud Storage bucket
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。