שימוש ב-Dataflow עם שירות מנוהל ל-Apache Kafka

בדף הזה מוסבר איך להשתמש בשירות המנוהל של Google Cloud ל-Apache Kafka כמקור או כיעד בצינור Dataflow.

אפשר להשתמש באחת מהגישות הבאות:

דרישות

  • מפעילים את ממשקי ה-API של Cloud Storage,‏ Dataflow והשירות המנוהל ל-Apache Kafka בפרויקט. אפשר לעיין במאמר בנושא הפעלת ממשקי API או להריץ את הפקודה הבאה ב-Google Cloud CLI:

    gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
    
  • לחשבון השירות של העובד ב-Dataflow צריכות להיות הרשאות של התפקיד Managed Kafka Client ‏ (roles/managedkafka.client) בניהול הזהויות והרשאות הגישה (IAM).

  • למכונות הווירטואליות של העובדים ב-Dataflow צריכה להיות גישה לרשת לשרת ה-bootstrap של Kafka. מידע נוסף זמין במאמר הגדרת רשת בשירות מנוהל ל-Apache Kafka.

איך מוצאים את כתובת השרת של Bootstrap

כדי להפעיל צינור שמתחבר לאשכול של שירות מנוהל ל-Apache Kafka, צריך קודם לקבל את כתובת שרת האתחול של האשכול. תצטרכו את הכתובת הזו כשמגדירים את צינור עיבוד הנתונים.

אפשר להשתמש במסוף Google Cloud או ב-Google Cloud CLI, באופן הבא:

המסוף

  1. נכנסים לדף Clusters במסוף Google Cloud .

    כניסה לדף Clusters

  2. לוחצים על שם האשכול.

  3. לוחצים על הכרטיסייה Configurations.

  4. מעתיקים את כתובת השרת של ה-bootstrap מהשדה Bootstrap URL.

gcloud

משתמשים בפקודה managed-kafka clusters describe.

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=LOCATION \
  --format="value(bootstrapAddress)"

מחליפים את מה שכתוב בשדות הבאים:

  • CLUSTER_ID: המזהה או השם של האשכול
  • LOCATION: המיקום של האשכול

מידע נוסף זמין במאמר בנושא הצגת אשכול של שירות מנוהל ל-Apache Kafka.

שימוש בשירות מנוהל ל-Apache Kafka עם תבנית Dataflow

‫Google מספקת כמה תבניות Dataflow שקוראות מ-Apache Kafka:

אפשר להשתמש בתבניות האלה עם שירות מנוהל ל-Apache Kafka. אם אחד מהם מתאים לתרחיש לדוגמה שלכם, כדאי להשתמש בו במקום לכתוב קוד מותאם אישית של צינור.

המסוף

  1. עוברים לדף Dataflow > משימות.

    מעבר לדף Jobs

  2. לוחצים על יצירת עבודה מתבנית.

  3. בשדה שם המשרה, מזינים שם למשרה.

  4. בתפריט הנפתח של תבנית Dataflow, בוחרים את התבנית להפעלה.

  5. בתיבה Kafka bootstrap server (שרת אתחול של Kafka), מזינים את כתובת שרת האתחול.

  6. בתיבה Kafka topic (נושא ב-Kafka), מזינים את שם הנושא.

  7. בקטע Kafka authentication mode (מצב אימות של Kafka), בוחרים באפשרות APPLICATION_DEFAULT_CREDENTIALS (אישורי ברירת מחדל של האפליקציה).

  8. בקטע פורמט ההודעה של Kafka, בוחרים את הפורמט של ההודעות ב-Apache Kafka.

  9. מזינים פרמטרים אחרים לפי הצורך. הפרמטרים הנתמכים מתועדים עבור כל תבנית.

  10. הפעלת משימה.

gcloud

משתמשים בפקודה gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
  --gcs-location gs://TEMPLATE_FILE \
  --region REGION_NAME \
  --parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...

מחליפים את מה שכתוב בשדות הבאים:

  • JOB_NAME: שם למשימה
  • TEMPLATE_FILE: המיקום של קובץ התבנית ב-Cloud Storage
  • REGION_NAME: האזור שבו רוצים לפרוס את המשימה
  • PROJECT_NAME: השם של הפרויקט שלכם ב-Google Cloud Platform
  • LOCATION: המיקום של האשכול
  • CLUSTER_ID: המזהה או השם של האשכול
  • TOPIC: השם של נושא Kafka

שימוש בשירות מנוהל ל-Apache Kafka עם צינור עיבוד נתונים של Beam

בקטע הזה מוסבר איך להשתמש ב-Apache Beam SDK כדי ליצור ולהפעיל צינור Dataflow שמתחבר לשירות מנוהל ל-Apache Kafka.

ברוב התרחישים, כדאי להשתמש בטרנספורמציה מנוהלת של קלט/פלט כמקור או כיעד של Kafka. אם אתם צריכים כוונון מתקדם יותר של הביצועים, כדאי להשתמש במחבר KafkaIO. מידע נוסף על היתרונות של שימוש ב-I/O מנוהל זמין במאמר Dataflow managed I/O.

דרישות

  • גרסה 3.6.0 ואילך של Kafka Client.

  • ‫Apache Beam SDK בגרסה 2.61.0 ואילך.

  • למכונה שבה מתחילים את משימת Dataflow צריכה להיות גישה לרשת לשרת ה-bootstrap של Apache Kafka. לדוגמה, מפעילים את העבודה ממכונה של Compute Engine שיכולה לגשת ל-VPC שבו אפשר להגיע לאשכול.

  • למשתמש הראשי שיוצר את העבודה צריכים להיות התפקידים הבאים ב-IAM:

    • לקוח Managed Kafka ‏ (roles/managedkafka.client) כדי לגשת לאשכול Apache Kafka.
    • משתמש בחשבון שירות (roles/iam.serviceAccountUser) כדי לפעול בתור חשבון השירות של עובד Dataflow.
    • אדמין לניהול נפח האחסון (roles/storage.admin) כדי להעלות קבצים של משימות ל-Cloud Storage.
    • אדמין של Dataflow‏ (roles/dataflow.admin) כדי ליצור את העבודה.

    אם מפעילים את העבודה מתוך מכונה וירטואלית של Compute Engine, אפשר להעניק את התפקידים האלה לחשבון שירות שמצורף למכונה הווירטואלית. מידע נוסף זמין במאמר בנושא יצירת מכונה וירטואלית שמשתמשת בחשבון שירות שמנוהל על ידי משתמש.

    אתם יכולים גם להשתמש ב-Application Default Credentials ‏ (ADC) עם התחזות לחשבון שירות כשאתם יוצרים את העבודה.

הגדרת קלט/פלט מנוהל

אם צינור הנתונים משתמש בקלט/פלט מנוהל ל-Apache Kafka, צריך להגדיר את אפשרויות ההגדרה הבאות כדי לבצע אימות באמצעות השירות המנוהל ל-Apache Kafka:

  • "security.protocol": "SASL_SSL"
  • "sasl.mechanism": "OAUTHBEARER"
  • "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
  • "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"

בדוגמאות הבאות מוצגות דרכים להגדיר קלט/פלט מנוהל בשירות מנוהל ל-Apache Kafka:

Java

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
      .put("bootstrap_servers", options.getBootstrapServer())
      .put("topic", options.getTopic())
      .put("data_format", "RAW")
      // Set the following fields to authenticate with Application Default
      // Credentials (ADC):
      .put("security.protocol", "SASL_SSL")
      .put("sasl.mechanism", "OAUTHBEARER")
      .put("sasl.login.callback.handler.class",
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
      .put("sasl.jaas.config",   "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
      .build();

Python

pipeline
| beam.managed.Read(
    beam.managed.KAFKA,
    config={
      "bootstrap_servers": options.bootstrap_server,
      "topic": options.topic,
      "data_format": "RAW",
      # Set the following fields to authenticate with Application Default
      # Credentials (ADC):
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "OAUTHBEARER",
      "sasl.login.callback.handler.class":
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
      "sasl.jaas.config":
          "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
    }
)

הגדרת המחבר KafkaIO

בדוגמאות הבאות מוסבר איך להגדיר את מחבר KafkaIO לשירות מנוהל ל-Apache Kafka:

Java

String bootstap = options.getBootstrap();
String topicName = options.getTopic();

// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers(bootstap)
    .withTopic(topicName)
    .withKeyDeserializer(IntegerSerializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGCPApplicationDefaultCredentials())

// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
    .withBootstrapServers(bootstrap)
    .withTopic(topicName)
    .withKeySerializer(IntegerSerializer.class)
    .withValueSerializer(StringSerializer.class)
    .withGCPApplicationDefaultCredentials());

Python

WriteToKafka(
  producer_config={
    "bootstrap.servers": options.bootstrap_servers,
    "security.protocol": 'SASL_SSL',
    "sasl.mechanism": "OAUTHBEARER",
    "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
    "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
  },
  topic=options.topic,
  key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
  value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)

המאמרים הבאים