Apache Kafka to Cloud Storage テンプレートは、Google Cloud Managed Service for Apache Kafka からテキストデータを取り込み、レコードを Cloud Storage に出力するストリーミング パイプラインです。
Apache Kafka to Cloud Storage テンプレートは、セルフマネージドまたは外部の Kafka でも使用できます。
パイプラインの要件
- 出力先の Cloud Storage バケットが存在する。
- Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である。
- Apache Kafka トピックが存在している。
Kafka メッセージ形式
このテンプレートでは、Kafka から次の形式でメッセージを読み取ります。
JSON 形式
JSON メッセージを読み取るには、messageFormat テンプレート パラメータを "JSON" に設定します。
Avro バイナリ エンコード
バイナリ Avro メッセージを読み取るには、次のテンプレート パラメータを設定します。
messageFormat:"AVRO_BINARY_ENCODING"binaryAvroSchemaPath: Cloud Storage 内の Avro スキーマ ファイルの場所。例:gs://BUCKET_NAME/message-schema.avsc
Avro バイナリ形式の詳細については、Apache Avro ドキュメントの Binary Encoding をご覧ください。
Confluent Schema Registry でエンコードされた Avro
Confluent Schema Registry でエンコードされた Avro のメッセージを読み取るには、次のテンプレート パラメータを設定します。
messageFormat:"AVRO_CONFLUENT_WIRE_FORMAT"schemaFormat: 次のいずれかの値。"SINGLE_SCHEMA_FILE": メッセージ スキーマは Avro スキーマ ファイルで定義されます。confluentAvroSchemaPathパラメータに、スキーマ ファイルがある Cloud Storage の場所を指定します。-
"SCHEMA_REGISTRY": メッセージは Confluent Schema Registry を使用してエンコードされます。schemaRegistryConnectionUrlパラメータに Confluent Schema Registry インスタンスの URL を指定し、schemaRegistryAuthenticationModeパラメータに認証モードを指定します。
この形式の詳細については、Confluent のドキュメントの Wire format をご覧ください。
出力ファイル形式
出力ファイルの形式は、Kafka 入力メッセージと同じ形式です。たとえば、Kafka メッセージ形式として JSON を選択すると、JSON ファイルが出力 Cloud Storage バケットに書き込まれます。
認証
Apache Kafka to Cloud Storage テンプレートは、Kafka ブローカーに対する SASL / PLAIN 認証をサポートしています。
テンプレートのパラメータ
必須パラメータ
- readBootstrapServerAndTopic: 入力を読み取る Kafka トピック。
- outputDirectory: 出力ファイルを書き込むパスとファイル名の接頭辞。末尾はスラッシュでなければなりません。例:
gs://your-bucket/your-path/ - kafkaReadAuthenticationMode: Kafka クラスタで使用する認証モード。認証なしの場合は
KafkaAuthenticationMethod.NONE、SASL / PLAIN のユーザー名とパスワードの場合はKafkaAuthenticationMethod.SASL_PLAIN、SASL_SCRAM_512 認証の場合はKafkaAuthenticationMethod.SASL_SCRAM_512、証明書ベースの認証の場合はKafkaAuthenticationMethod.TLSを使用します。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALSは Google Cloud Apache Kafka for BigQuery クラスタでのみ使用できます。これにより、アプリケーションのデフォルト認証情報を使用して認証できます。 - messageFormat: 読み取る Kafka メッセージの形式。サポートされている値は、
AVRO_CONFLUENT_WIRE_FORMAT(Confluent Schema Registry でエンコードされた Avro)、AVRO_BINARY_ENCODING(プレーンなバイナリ Avro)、JSONです。デフォルトは AVRO_CONFLUENT_WIRE_FORMAT です。 - useBigQueryDLQ: true の場合、失敗したメッセージは追加のエラー情報とともに BigQuery に書き込まれます。デフォルトは false です。
オプション パラメータ
- windowDuration: Cloud Storage へのデータ書き込みが行われるウィンドウ期間またはサイズ。指定できる形式は、秒が Ns(例: 5s)、分が Nm(例: 12m)、時が Nh(例: 2h)です。例:
5mデフォルトは 5m です。 - outputFilenamePrefix: ウィンドウ処理された各ファイルに配置する接頭辞。例:
output-。デフォルトは output です。 - numShards: 書き込み時に生成される出力シャードの最大数。シャード数が多いと Cloud Storage への書き込みのスループットが高くなりますが、出力 Cloud Storage ファイルの処理時にシャード全体のデータ集計コストが高くなる可能性があります。デフォルト値は Dataflow によって決定されます。
- enableCommitOffsets: 処理済みメッセージのオフセットを Kafka に commit します。有効にすると、パイプライン再開時のメッセージの処理のギャップや重複を最小限に抑えることができます。コンシューマー グループ ID を指定する必要があります。デフォルトは false です。
- consumerGroupId: このパイプラインが属するコンシューマー グループの固有識別子。Kafka へのオフセット commit が有効な場合は必須です。デフォルトは空です。
- kafkaReadOffset: commit されたオフセットが存在しない場合にメッセージを読み始めるポイント。最も古いメッセージから始まり、最新のメッセージが最後になります。デフォルトは latest です。
- kafkaReadUsernameSecretId:
SASL_PLAIN認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。 - kafkaReadPasswordSecretId:
SASL_PLAIN認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。 - kafkaReadKeystoreLocation: Kafka クラスタで認証を行う際に使用する TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス。例:
gs://your-bucket/keystore.jks - kafkaReadTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
- kafkaReadTruststorePasswordSecretId: Kafka TLS 認証用に Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> - kafkaReadKeystorePasswordSecretId: Kafka TLS 認証用に Java KeyStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> - kafkaReadKeyPasswordSecretId: Kafka TLS 認証用の Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> - kafkaReadSaslScramUsernameSecretId:
SASL_SCRAM認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> - kafkaReadSaslScramPasswordSecretId:
SASL_SCRAM認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> - kafkaReadSaslScramTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
- kafkaReadSaslScramTruststorePasswordSecretId: Kafka SASL_SCRAM 認証で Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> - schemaFormat: Kafka スキーマの形式。
SINGLE_SCHEMA_FILEまたはSCHEMA_REGISTRYとして指定できます。SINGLE_SCHEMA_FILEが指定されている場合は、すべてのメッセージに avro スキーマ ファイルで言及されているスキーマを使用します。SCHEMA_REGISTRYが指定されている場合、メッセージは 1 つのスキーマまたは複数のスキーマを持つことができます。デフォルトは SINGLE_SCHEMA_FILE です。 - confluentAvroSchemaPath: トピック内のすべてのメッセージをデコードするために使用される単一の Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
- schemaRegistryConnectionUrl: メッセージのデコード用に Avro スキーマを管理するために使用される Confluent Schema Registry のインスタンスの URL。デフォルトは空です。
- binaryAvroSchemaPath: バイナリでエンコードされた Avro メッセージをデコードするために使用される Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
- schemaRegistryAuthenticationMode: Schema Registry 認証モード。NONE、TLS、OAUTH のいずれかです。デフォルトは NONE です。
- schemaRegistryTruststoreLocation: スキーマ レジストリの認証用トラストストアが保存されている SSL 証明書の場所。例:
/your-bucket/truststore.jks - schemaRegistryTruststorePasswordSecretId: トラストストア内のシークレットへのアクセス パスワードが保存されている Secret Manager の SecretId。例:
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version - schemaRegistryKeystoreLocation: SSL 証明書と秘密鍵を含むキーストアの場所。例:
/your-bucket/keystore.jks - schemaRegistryKeystorePasswordSecretId: キーストア ファイルにアクセスするためのパスワードが保存されている Secret Manager の SecretId(例:
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version)。 - schemaRegistryKeyPasswordSecretId: キーストアに保存されているクライアントの秘密鍵にアクセスするために必要なパスワードの SecretId(例:
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version)。 - schemaRegistryOauthClientId: OAUTH モードでスキーマ レジストリ クライアントを認証するために使用されるクライアント ID。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。
- schemaRegistryOauthClientSecretId : OAUTH モードでスキーマ レジストリ クライアントの認証に使用するクライアント シークレットを含む Google Cloud Secret Manager のシークレット ID。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> - schemaRegistryOauthScope: OAUTH モードでスキーマ レジストリ クライアントを認証するために使用されるアクセス トークン スコープ。このフィールドは省略可能であり、スコープ パラメータを渡さずにリクエストを実行できます。例:
openid - schemaRegistryOauthTokenEndpointUrl: OAUTH モードでスキーマ レジストリ クライアントの認証に使用される OAuth/OIDC ID プロバイダの HTTP(S) ベースの URL。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。
- outputDeadletterTable: 失敗したメッセージの完全修飾 BigQuery テーブル名。さまざまな理由(スキーマの不一致、JSON の形式の誤りなど)により出力テーブルに到達できなかったメッセージは、このテーブルに書き込まれます。このテーブルはテンプレートに基づいて作成されます。例:
your-project-id:your-dataset.your-table-name
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Kafka to Cloud Storage template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- (省略可)1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputDirectory=gs://STORAGE_BUCKET_NAME,\ useBigQueryDLQ=false
次のように置き換えます。
PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME: 一意の任意のジョブ名REGION_NAME: Dataflow ジョブをデプロイするリージョン(例:us-central1)VERSION: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
BOOTSTRAP_SERVER_AND_TOPIC: Apache Kafka ブートストラップ サーバーのアドレスとトピックブートストラップ サーバーのアドレスとトピックの形式は、クラスタのタイプによって異なります。
- Managed Service for Apache Kafka クラスタ:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME - 外部 Kafka クラスタ:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka クラスタ:
STORAGE_BUCKET_NAME: 出力が書き込まれる Cloud Storage バケット
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "outputDirectory": "gs://STORAGE_BUCKET_NAME", "useBigQueryDLQ": "false" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex", } }
次のように置き換えます。
PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME: 一意の任意のジョブ名LOCATION: Dataflow ジョブをデプロイするリージョン(例:us-central1)VERSION: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
BOOTSTRAP_SERVER_AND_TOPIC: Apache Kafka ブートストラップ サーバーのアドレスとトピックブートストラップ サーバーのアドレスとトピックの形式は、クラスタのタイプによって異なります。
- Managed Service for Apache Kafka クラスタ:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME - 外部 Kafka クラスタ:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka クラスタ:
STORAGE_BUCKET_NAME: 出力が書き込まれる Cloud Storage バケット
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。