יצירה וצריכה של הודעות באמצעות כלי שורת הפקודה של Kafka

במאמר הזה מוסבר איך להשתמש בכלים של שורת הפקודה של Kafka כדי להתחבר לאשכול של שירות מנוהל ל-Apache Kafka, ליצור הודעות ולצרוך הודעות.

לפני שמתחילים

לפני שמתחילים את המדריך הזה, צריך ליצור אשכול חדש של שירות מנוהל ל-Apache Kafka. אם כבר יש לכם אשכול, אתם יכולים לדלג על השלב הזה.

איך יוצרים אשכול

המסוף

  1. עוברים אל הדף שירות מנוהל ל-Apache Kafka > Clusters.

    מעבר אל Clusters

  2. לוחצים על יצירה.
  3. בתיבה שם האשכול, מזינים שם לאשכול.
  4. ברשימה Region, בוחרים מיקום לאשכול.
  5. בקטע Network configuration (תצורת רשת), מגדירים את רשת המשנה שבה אפשר לגשת לאשכול:
    1. בקטע Project (פרויקט), בוחרים את הפרויקט.
    2. בקטע רשת, בוחרים את רשת ה-VPC.
    3. בקטע רשת משנה, בוחרים את רשת המשנה.
    4. לוחצים על סיום.
  6. לוחצים על יצירה.

אחרי שלוחצים על יצירה, מצב האשכול הוא Creating. כשהאשכול מוכן, הסטטוס שלו הוא Active.

gcloud

כדי ליצור אשכול Kafka, מריצים את הפקודה managed-kafka clusters create.

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

מחליפים את מה שכתוב בשדות הבאים:

  • KAFKA_CLUSTER: שם לאשכול Kafka
  • REGION: המיקום של האשכול
  • PROJECT_ID: מזהה הפרויקט
  • SUBNET_NAME: תת-הרשת שבה רוצים ליצור את האשכול, לדוגמה default

מידע על מיקומים נתמכים זמין במאמר בנושא מיקומים של שירות מנוהל ל-Apache Kafka.

הפקודה פועלת באופן אסינכרוני ומחזירה מזהה פעולה:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

כדי לעקוב אחרי ההתקדמות של פעולת היצירה, משתמשים בפקודה gcloud managed-kafka operations describe:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

כשהאשכול מוכן, הפלט מהפקודה הזו כולל את הרשומה state: ACTIVE. מידע נוסף זמין במאמר בנושא מעקב אחרי פעולת יצירת האשכול.

התפקידים הנדרשים

כדי לקבל את ההרשאות שדרושות ליצירה ולהגדרה של מכונת VM של לקוח, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בפרויקט:

להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.

יצירת מכונה וירטואלית של לקוח

יוצרים מכונה וירטואלית (VM) של Linux ב-Compute Engine שיכולה לגשת לאשכול Kafka. כשמגדירים את המכונה הווירטואלית, צריך להגדיר את האפשרויות הבאות:

  • אזור. יוצרים את המכונה הווירטואלית באותו אזור שבו נמצא אשכול Kafka.

  • Subnet. יוצרים את המכונה הווירטואלית באותה רשת VPC כמו רשת המשנה שבה השתמשתם בהגדרת אשכול Kafka. מידע נוסף זמין במאמר בנושא הצגת רשתות המשנה של אשכול.

  • היקפי הרשאות. מקצים את https://www.googleapis.com/auth/cloud-platform היקף הגישה למכונה הווירטואלית. היקף ההרשאות הזה מאשר למכונה הווירטואלית לשלוח בקשות אל Managed Kafka API.

בשלבים הבאים מוסבר איך מגדירים את האפשרויות האלה.

המסוף

  1. נכנסים לדף Create an instance במסוף Google Cloud .

    יצירת מופע

  2. בחלונית Machine configuration:

    1. בשדה Name, מציינים שם למכונה. מידע נוסף זמין במאמר מוסכמות למתן שמות למשאבים.

    2. ברשימה Region, בוחרים את אותו אזור כמו אשכול Kafka.

    3. ברשימה Zone, בוחרים אזור.

  3. בתפריט הניווט, לוחצים על Networking (רשת). בחלונית Networking שמופיעה, מבצעים את הפעולות הבאות:

    1. עוברים לקטע Network interfaces.

    2. כדי להרחיב את ממשק ברירת המחדל של הרשת, לוחצים על החץ .

    3. בשדה Network, בוחרים את רשת ה-VPC.

    4. ברשימה Subnetwork, בוחרים את רשת המשנה.

    5. לוחצים על סיום.

  4. בתפריט הניווט, לוחצים על אבטחה. בחלונית Security שמופיעה, מבצעים את הפעולות הבאות:

    1. בקטע Access scopes בוחרים באפשרות Set access for each API.

    2. ברשימת היקפי הגישה, מחפשים את הרשימה הנפתחת Cloud Platform ובוחרים באפשרות Enabled.

  5. לוחצים על יצירה כדי ליצור את המכונה הווירטואלית.

gcloud

כדי ליצור את מכונת ה-VM, משתמשים בפקודה gcloud compute instances create.

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

מחליפים את מה שכתוב בשדות הבאים:

  • VM_NAME: השם של ה-VM
  • PROJECT_ID: מזהה הפרויקט
  • REGION: האזור שבו יצרתם את אשכול Kafka, לדוגמה us-central1
  • SUBNET: תת-רשת באותה רשת VPC כמו תת-הרשת שבה השתמשתם בתצורת האשכול
  • ZONE: אזור באזור שבו יצרתם את האשכול, למשל us-central1-c

מידע נוסף על יצירת מכונה וירטואלית זמין במאמר בנושא יצירת מופע של מכונה וירטואלית ברשת משנה ספציפית.

מתן תפקידים ב-IAM

מקצים את התפקידים הבאים בניהול זהויות והרשאות גישה (IAM) לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine:

  • Managed Kafka Client‏ (roles/managedkafka.client)
  • יצירת אסימונים בחשבון שירות (roles/iam.serviceAccountTokenCreator)
  • יצירת אסימונים מסוג OpenID בחשבון שירות (roles/iam.serviceAccountOpenIdTokenCreator)

המסוף

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. מחפשים את השורה של חשבון השירות של Compute Engine שמוגדר כברירת מחדל ולוחצים על עריכת החשבון הראשי.

  3. לוחצים על Add another role ובוחרים את התפקיד Managed Kafka Client. חוזרים על השלב הזה עבור התפקידים יצירת אסימונים בחשבון שירות ויצירת אסימונים מסוג OpenID בחשבון שירות.

  4. לוחצים על Save.

gcloud

כדי להקצות תפקידים ב-IAM, משתמשים בפקודה gcloud projects add-iam-policy-binding.

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט

  • PROJECT_NUMBER: מספר הפרויקט

כדי לקבל את מספר הפרויקט, מריצים את הפקודה gcloud projects describe:

gcloud projects describe PROJECT_ID

מידע נוסף זמין במאמר איך מוצאים את השם, המספר והמזהה של הפרויקט.

התחברות למכונה הווירטואלית

משתמשים ב-SSH כדי להתחבר למופע של המכונה הווירטואלית.

המסוף

  1. נכנסים לדף VM instances.

    כניסה לדף VM instances

  2. ברשימת המכונות הווירטואליות, מוצאים את שם המכונה הווירטואלית ולוחצים על SSH.

gcloud

כדי להתחבר למכונה הווירטואלית, משתמשים בפקודה gcloud compute ssh.

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

מחליפים את מה שכתוב בשדות הבאים:

  • VM_NAME: השם של ה-VM
  • PROJECT_ID: מזהה הפרויקט
  • ZONE: האזור שבו יצרתם את המכונה הווירטואלית

יכול להיות שיהיה צורך בהגדרה נוספת כדי להשתמש ב-SSH בפעם הראשונה. מידע נוסף זמין במאמר מידע על חיבורי SSH.

התקנה של כלי שורת הפקודה של Kafka

בסשן ה-SSH, מריצים את הפקודות הבאות כדי להתקין את כלי שורת הפקודה של Kafka.

  1. מתקינים את Java, שנדרשת להרצת כלי שורת הפקודה של Kafka, וגם את wget כדי להוריד יחסי תלות. הפקודות הבאות מניחות שאתם משתמשים בסביבת Debian Linux.

    sudo apt-get install default-jre wget
    
  2. מתקינים את כלי שורת הפקודה של Kafka.

    wget -O kafka_2.13-3.7.2.tgz https://archive.apache.org/dist/kafka/3.7.2/kafka_2.13-3.7.2.tgz
    tar xfz kafka_2.13-3.7.2.tgz
    
  3. מגדירים את משתני הסביבה הבאים:

    export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
    export PATH=$PATH:$KAFKA_HOME/bin
    export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
    

מגדירים אימות

במהלך סשן SSH, מבצעים את השלבים הבאים כדי להגדיר את ספריית האימות של השירות המנוהל ל-Apache Kafka.

  1. מורידים את הספרייה ומתקינים אותה באופן מקומי.

    wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
    sudo apt-get install unzip
    unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/
    

    הפקודה הזו מתקינה את הספרייה בספרייה lib של ספריית ההתקנה של Kafka. כלי שורת הפקודה של Kafka מחפשים יחסי תלות של Java בספרייה הזו.

  2. באמצעות כלי לעריכת טקסט, יוצרים קובץ בשם client.properties ומדביקים בו את התוכן הבא:

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    

    שומרים את הקובץ. הקובץ הזה מגדיר לקוח Kafka עם ההגדרות הבאות:

    • משתמשים ב-SASL_SSL לתקשורת מאובטחת עם אשכול Kafka.

    • שימוש באסימוני Bearer של OAuth 2.0 לצורך אימות.

    • משתמשים במחלקה GcpLoginCallbackHandler שסופקה על ידי הספרייה כגורם מטפל בקריאה חוזרת של הכניסה כדי לקבל אסימוני OAuth 2.0.

יצירה וצריכה של הודעות

בסשן ה-SSH, מריצים את הפקודות הבאות כדי ליצור הודעות Kafka ולצרוך אותן.

  1. מגדירים את כתובת ה-bootstrap כמשתנה סביבה.

    export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    

    מחליפים את מה שכתוב בשדות הבאים:

    • CLUSTER_ID: השם של האשכול
    • REGION: המקום שבו יצרתם את האשכול
    • PROJECT_ID: מזהה הפרויקט

    מידע נוסף זמין במאמר בנושא קבלת כתובת ה-bootstrap.

  2. מציגים את הנושאים באשכול.

    kafka-topics.sh --list \
      --bootstrap-server $BOOTSTRAP \
      --command-config client.properties
    
  3. כתיבת הודעה בנושא.

    echo "hello world" | kafka-console-producer.sh \
      --topic KAFKA_TOPIC_NAME \
      --bootstrap-server $BOOTSTRAP \
      --producer.config client.properties
    

    מחליפים את KAFKA_TOPIC_NAME בשם הנושא.

  4. צריכת הודעות מהנושא.

    kafka-console-consumer.sh \
      --topic KAFKA_TOPIC_NAME \
      --from-beginning \
      --bootstrap-server $BOOTSTRAP \
      --consumer.config client.properties
    

    כדי להפסיק את צריכת ההודעות, מקישים על Ctrl+C.

  5. מריצים בדיקת ביצועים של המפיק.

    kafka-producer-perf-test.sh \
      --topic KAFKA_TOPIC_NAME \
      --num-records 1000000 --throughput 1000 --print-metrics --record-size 1024 \
      --producer-props bootstrap.servers=$BOOTSTRAP \
      --producer.config client.properties
    

הסרת המשאבים

כדי לא לצבור חיובים לחשבון Google Cloud על המשאבים שבהם השתמשתם בדף הזה, פועלים לפי השלבים הבאים:

המסוף

  1. מוחקים את מכונת ה-VM.

    1. נכנסים לדף VM instances.

      כניסה לדף VM instances

    2. בוחרים את המכונה הווירטואלית ולוחצים על מחיקה.

  2. מוחקים את אשכול Kafka.

    1. עוברים לדף שירות מנוהל ל-Apache Kafka > Clusters (אשכולות).

      מעבר אל Clusters

    2. בוחרים את אשכול Kafka ולוחצים על מחיקה.

gcloud

  1. כדי למחוק את מכונת ה-VM, משתמשים בפקודה gcloud compute instances delete.

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. כדי למחוק את אשכול Kafka, משתמשים בפקודה gcloud managed-kafka clusters delete.

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

המאמרים הבאים

Apache Kafka®‎ הוא סימן מסחרי רשום של The Apache Software Foundation או של השותפים העצמאיים שלה בארצות הברית או במדינות אחרות.