Dataflow mit Managed Service for Apache Kafka verwenden

Auf dieser Seite wird beschrieben, wie Sie Google Cloud Managed Service for Apache Kafka als Quelle oder Senke in einer Dataflow-Pipeline verwenden.

Sie haben zwei Möglichkeiten:

Voraussetzungen

  • Aktivieren Sie die Cloud Storage API, die Dataflow API und die Managed Service for Apache Kafka API in Ihrem Projekt. Weitere Informationen finden Sie unter APIs aktivieren oder führen Sie den folgenden Google Cloud CLI-Befehl aus:

    gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
    
  • Das Dataflow Worker-Dienstkonto muss die IAM-Rolle (Identity and Access Management) Managed Kafka Client (roles/managedkafka.client) haben.

  • Die Dataflow-Worker-VMs müssen Netzwerkzugriff auf den Kafka-Bootstrap-Server haben. Weitere Informationen finden Sie unter Netzwerk für Managed Service for Apache Kafka konfigurieren.

Adresse des Bootstrap-Servers abrufen

Wenn Sie eine Pipeline ausführen möchten, die eine Verbindung zu einem Managed Service for Apache Kafka-Cluster herstellt, müssen Sie zuerst die Adresse des Bootstrap-Servers des Clusters abrufen. Sie benötigen diese Adresse, wenn Sie die Pipeline konfigurieren.

Sie können die Google Cloud console oder die Google Cloud CLI verwenden:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

  2. Klicken Sie auf den Clusternamen.

  3. Klicken Sie auf den Tab Konfigurationen.

  4. Kopieren Sie die Adresse des Bootstrap-Servers aus Bootstrap-URL.

gcloud

Verwenden Sie den managed-kafka clusters describe Befehl.

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=LOCATION \
  --format="value(bootstrapAddress)"

Ersetzen Sie Folgendes:

  • CLUSTER_ID: die ID oder der Name des Clusters
  • LOCATION: der Standort des Clusters

Weitere Informationen finden Sie unter Managed Service for Apache Kafka-Cluster ansehen.

Managed Service for Apache Kafka mit einer Dataflow-Vorlage verwenden

Google bietet mehrere Dataflow-Vorlagen, die Daten aus Apache Kafka lesen:

Diese Vorlagen können mit Managed Service for Apache Kafka verwendet werden. Wenn eine davon Ihrem Anwendungsfall entspricht, sollten Sie sie verwenden, anstatt benutzerdefinierten Pipelinecode zu schreiben.

Console

  1. Rufen Sie die Seite Dataflow > Jobs auf.

    Zu Jobs

  2. Klicken Sie auf Job aus Vorlage erstellen.

  3. Geben Sie unter Jobname einen Namen für den Job ein.

  4. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die auszuführende Vorlage aus.

  5. Geben Sie im Feld Kafka-Bootstrap-Server die Adresse des Bootstrap-Servers ein.

  6. Geben Sie im Feld Kafka-Thema den Namen des Themas ein.

  7. Wählen Sie für Kafka-Authentifizierungsmodus die Option APPLICATION_DEFAULT_CREDENTIALS aus.

  8. Wählen Sie für Kafka-Nachrichtenformat das Format der Apache Kafka Nachrichten aus.

  9. Geben Sie nach Bedarf weitere Parameter ein. Die unterstützten Parameter sind für jede Vorlage dokumentiert.

  10. Klicken Sie auf Job ausführen.

gcloud

Verwenden Sie den gcloud dataflow jobs run Befehl.

gcloud dataflow jobs run JOB_NAME \
  --gcs-location gs://TEMPLATE_FILE \
  --region REGION_NAME \
  --parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...

Ersetzen Sie Folgendes:

  • JOB_NAME: ein Name für den Job
  • TEMPLATE_FILE: der Speicherort der Vorlagendatei in Cloud Storage
  • REGION_NAME: die Region, in der Sie den Job bereitstellen möchten
  • PROJECT_NAME: der Name Ihres Google Cloud Projekts
  • LOCATION: der Standort des Clusters
  • CLUSTER_ID: die ID oder der Name des Clusters
  • TOPIC: der Name des Kafka-Themas

Bei anderen Vorlagen, z. B. der YAML-Vorlage „Kafka zu Apache Iceberg“, müssen Sie den bootstrapServers Parameter auf die Bootstrap-Adresse des Clusters festlegen. Außerdem müssen Sie die Authentifizierung konfigurieren, indem Sie den Parameter consumerConfigUpdates festlegen, wie in der folgenden gekürzten Befehlszeile gezeigt:

gcloud

gcloud dataflow flex-template run "kafka-to-iceberg-yaml-job" \
  --project "PROJECT_NAME" \
  --region "REGION_NAME" \
  --template-file-gcs-location "gs://REGION_NAME/templates/flex/Kafka_To_Iceberg_Yaml" \
  --parameters "bootstrapServers=BOOTSTRAP_ADDRESS" \
  --parameters "consumerConfigUpdates='{\"security.protocol\": \"SASL_SSL\",\"sasl.mechanism\": \"OAUTHBEARER\",\"sasl.login.callback.handler.class\": \"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler\",\"sasl.jaas.config\": \"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;\"}'"

Ersetzen Sie BOOTSTRAP_ADDRESS durch die Bootstrap-Adresse des Kafka-Clusters. Eine vollständige Befehlszeile finden Sie in der Vorlage README-Datei.

Managed Service for Apache Kafka mit einer Beam-Pipeline verwenden

In diesem Abschnitt wird beschrieben, wie Sie mit dem Apache Beam SDK eine Dataflow-Pipeline erstellen und ausführen, die eine Verbindung zu Managed Service for Apache Kafka herstellt.

In den meisten Fällen verwenden Sie die verwaltete E/A Transformation als Kafka-Quelle oder -Senke. Wenn Sie eine erweiterte Leistungs optimierung benötigen, können Sie den KafkaIO-Connector verwenden. Weitere Informationen zu den Vorteilen der Verwendung von verwalteter E/A finden Sie unter Von Dataflow verwaltete E/A.

Voraussetzungen

  • Kafka-Client-Version 3.6.0 oder höher.

  • Apache Beam SDK-Version 2.61.0 oder höher.

  • Die Maschine, auf der Sie den Dataflow-Job starten, muss Netzwerkzugriff auf den Apache Kafka-Bootstrap-Server haben. Starten Sie den Job beispielsweise von einer Compute Engine-Instanz aus, die auf die VPC zugreifen kann, in der der Cluster erreichbar ist.

  • Das Prinzipal, das den Job erstellt, muss die folgenden IAM-Rollen haben:

    • Managed Kafka Client (roles/managedkafka.client) für den Zugriff auf den Apache Kafka-Cluster.
    • Dienstkontonutzer (roles/iam.serviceAccountUser) als Dataflow-Worker-Dienstkonto.
    • Storage-Administrator (roles/storage.admin) zum Hochladen von Jobdateien in Cloud Storage.
    • Dataflow-Administrator (roles/dataflow.admin) zum Erstellen des Jobs.

    Wenn Sie den Job von einer Compute Engine-Instanz aus starten, können Sie diese Rollen einem Dienstkonto gewähren, das an die VM angehängt ist. Weitere Informationen finden Sie unter VM mit vom Nutzer verwalteten Dienstkonto erstellen.

    Sie können beim Erstellen des Jobs auch Standardanmeldedaten für Anwendungen (Application Default Credentials, ADC) mit Identitätsübernahme des Dienstkontos verwenden.

Verwaltete E/A konfigurieren

Wenn Ihre Pipeline verwaltete E/A für Apache Kafkaverwendet, legen Sie die folgenden Konfigurationsoptionen fest, um sich bei Managed Service for Apache Kafka zu authentifizieren:

  • "security.protocol": "SASL_SSL"
  • "sasl.mechanism": "OAUTHBEARER"
  • "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
  • "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"

In den folgenden Beispielen wird gezeigt, wie Sie verwaltete E/A für Managed Service for Apache Kafka konfigurieren:

Java

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
      .put("bootstrap_servers", options.getBootstrapServer())
      .put("topic", options.getTopic())
      .put("data_format", "RAW")
      // Set the following fields to authenticate with Application Default
      // Credentials (ADC):
      .put("security.protocol", "SASL_SSL")
      .put("sasl.mechanism", "OAUTHBEARER")
      .put("sasl.login.callback.handler.class",
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
      .put("sasl.jaas.config",   "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
      .build();

Python

pipeline
| beam.managed.Read(
    beam.managed.KAFKA,
    config={
      "bootstrap_servers": options.bootstrap_server,
      "topic": options.topic,
      "data_format": "RAW",
      # Set the following fields to authenticate with Application Default
      # Credentials (ADC):
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "OAUTHBEARER",
      "sasl.login.callback.handler.class":
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
      "sasl.jaas.config":
          "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
    }
)

Den KafkaIO-Connector konfigurieren

In den folgenden Beispielen wird gezeigt, wie Sie den KafkaIO-Connector für Managed Service for Apache Kafka konfigurieren:

Java

String bootstap = options.getBootstrap();
String topicName = options.getTopic();

// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers(bootstap)
    .withTopic(topicName)
    .withKeyDeserializer(IntegerSerializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGCPApplicationDefaultCredentials())

// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
    .withBootstrapServers(bootstrap)
    .withTopic(topicName)
    .withKeySerializer(IntegerSerializer.class)
    .withValueSerializer(StringSerializer.class)
    .withGCPApplicationDefaultCredentials());

Python

WriteToKafka(
  producer_config={
    "bootstrap.servers": options.bootstrap_servers,
    "security.protocol": 'SASL_SSL',
    "sasl.mechanism": "OAUTHBEARER",
    "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
    "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
  },
  topic=options.topic,
  key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
  value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)

Nächste Schritte