יצירת נושא לייבוא ב-Confluent Cloud

נושא ייבוא של Confluent Cloud מאפשר לכם להטמיע נתונים מ-Confluent Cloud כמקור חיצוני ב-Pub/Sub באופן רציף. אחר כך תוכלו להזרים את הנתונים לכל אחד מהיעדים ש-Pub/Sub תומך בהם.

במאמר הזה מוסבר איך ליצור ולנהל נושאים של ייבוא ב-Confluent Cloud. כדי ליצור נושא רגיל, ראו יצירת נושא רגיל.

מידע נוסף על ייבוא נושאים זמין במאמר מידע על ייבוא נושאים.

לפני שמתחילים

תפקידים והרשאות נדרשים

כדי לקבל את ההרשאות שנדרשות ליצירה ולניהול של נושאי ייבוא ב-Confluent Cloud, צריך לבקש מהאדמין להקצות לכם ב-IAM את התפקיד עריכת Pub/Sub (roles/pubsub.editor) בנושא או בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות ליצירה ולניהול של נושאי ייבוא ב-Confluent Cloud. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי ליצור ולנהל נושאים של ייבוא ב-Confluent Cloud, נדרשות ההרשאות הבאות:

  • יוצרים נושא לייבוא: pubsub.topics.create
  • מחיקת נושא ייבוא: pubsub.topics.delete
  • קבלת נושא לייבוא: pubsub.topics.get
  • הצגת רשימה של נושא ייבוא: pubsub.topics.list
  • פרסום בנושא ייבוא: pubsub.topics.publish ו-pubsub.serviceAgent
  • עדכון נושא ייבוא: pubsub.topics.update
  • קבלת מדיניות IAM בנושא ייבוא: pubsub.topics.getIamPolicy
  • מגדירים את מדיניות ה-IAM לנושא ייבוא: pubsub.topics.setIamPolicy

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

אפשר להגדיר בקרת גישה ברמת הפרויקט וברמת המשאב הבודד.

הגדרה של זהות מאוחדת לגישה ל-Confluent Cloud

איחוד שירותי אימות הזהות של עומסי עבודה מאפשר לשירותים של Google Cloud לגשת לעומסי עבודה שפועלים מחוץ ל- Google Cloud. באמצעות איחוד שירותי אימות הזהות, לא צריך לשמור או להעביר פרטי כניסה ל- Google Cloud כדי לגשת למשאבים בעננים אחרים. במקום זאת, אפשר להשתמש בזהויות של עומסי העבודה עצמם כדי לבצע אימות ל- Google Cloud ולגשת למשאבים.

יצירת חשבון שירות ב- Google Cloud

זהו שלב אופציונלי. אם כבר יש לכם חשבון שירות, אתם יכולים להשתמש בו במקום ליצור חשבון שירות חדש. אם אתם משתמשים בחשבון שירות קיים, צריך לעבור אל תיעוד המזהה הייחודי של חשבון השירות כדי להמשיך לשלב הבא.

בנושאים של ייבוא מ-Confluent Cloud, ‏ Pub/Sub משתמש בחשבון השירות כזהות לגישה למשאבים מ-Confluent Cloud.

מידע נוסף על יצירת חשבון שירות, כולל דרישות מוקדמות, תפקידים והרשאות נדרשים והנחיות למתן שמות, זמין במאמר יצירת חשבונות שירות. אחרי שיוצרים חשבון שירות, יכול להיות שיהיה צריך להמתין 60 שניות לפחות לפני שאפשר להשתמש בו. הסיבה לכך היא מודל העקביות ההדרגתי: יכול להיות שייקח זמן עד שחשבון השירות החדש יופיע.

תיעוד המזהה הייחודי של חשבון השירות

כדי להגדיר את ספק הזהויות ואת מאגר הזהויות במסוף Confluent Cloud, צריך מזהה ייחודי של חשבון שירות.

  1. נכנסים לדף הפרטים חשבון שירות במסוף Google Cloud .

    כניסה לדף חשבון שירות

  2. לוחצים על חשבון השירות שיצרתם או על חשבון השירות שבו אתם מתכננים להשתמש.

  3. בדף פרטי חשבון השירות, רושמים את המזהה הייחודי.

    המזהה נדרש כחלק מתהליך העבודה להגדרת ספק הזהויות ומאגר הזהויות במסוף Confluent Cloud.

הוספת התפקיד 'יצירת אסימונים בחשבון שירות' לחשבון השירות של Pub/Sub

התפקיד 'יצירת אסימונים בחשבון שירות' (roles/iam.serviceAccountTokenCreator) מאפשר לחשבונות משתמשים ליצור פרטי כניסה לטווח קצר לחשבון שירות. האסימונים או פרטי הכניסה האלה משמשים להתחזות לחשבון השירות.

מידע נוסף על התחזות לחשבון שירות זמין במאמר התחזות לחשבון שירות.

במהלך התהליך הזה אפשר גם להוסיף את התפקיד 'פרסום הודעות ב-Pub/Sub' (roles/pubsub.publisher). מידע נוסף על התפקיד והסיבה להוספתו זמין במאמר הוספת התפקיד 'פרסום הודעות ב-Pub/Sub' לחשבון השירות של Pub/Sub.

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. מסמנים את התיבה Include Google-provided role grants.

  3. מחפשים את חשבון השירות בפורמט service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. לוחצים על הלחצן Edit Principal (עריכת חשבון המשתמש) בחשבון השירות הרלוונטי.

  5. אם צריך, לוחצים על הוספת תפקיד נוסף.

  6. מחפשים את התפקיד 'יצירת אסימונים בחשבון שירות' (roles/iam.serviceAccountTokenCreator) ולוחצים עליו.

  7. לוחצים על Save.

יצירת ספק זהויות ב-Confluent Cloud

כדי לבצע אימות ב-Confluent Cloud, לחשבון השירות ב-Google Cloud צריך להיות מאגר זהויות. קודם צריך ליצור ספק זהויות ב-Confluent Cloud.

למידע נוסף על יצירת ספק זהויות ב-Confluent Cloud, אפשר לעיין בדף הוספה של ספק זהויות OAuth/OIDC.

  1. נכנסים למסוף Confluent Cloud.

  2. בתפריט, לוחצים על חשבונות וגישה.

  3. לוחצים על זהויות של עומסי עבודה.

  4. לוחצים על הוספת ספק.

  5. לוחצים על OAuth/OIDC ואז על הבא.

  6. לוחצים על ספק OIDC אחר ואז על הבא.

  7. מזינים שם ותיאור של המטרה של ספק הזהויות.

  8. לוחצים על הצגת הגדרות מתקדמות.

  9. בשדה Issuer URI, מזינים https://accounts.google.com.

  10. בשדה JWKS URI, מזינים https://www.googleapis.com/oauth2/v3/certs.

  11. לוחצים על אימות ושמירה.

יצירת מאגר זהויות והקצאת התפקידים המתאימים ב-Confluent Cloud

צריך ליצור מאגר זהויות בפרופיל הזהות ולהעניק את התפקידים הנדרשים כדי לאפשר לחשבון השירות של Pub/Sub לבצע אימות ולקרוא מנושאי Confluent Cloud Kafka.

לפני שיוצרים מאגר זהויות, צריך לוודא שהאשכול נוצר ב-Confluent Cloud.

מידע נוסף על יצירת מאגר זהויות זמין במאמר שימוש במאגרי זהויות עם ספק הזהויות שלכם ב-OAuth/OIDC.

  1. נכנסים למסוף Confluent Cloud.

  2. בתפריט, לוחצים על חשבונות וגישה.

  3. לוחצים על זהויות של עומסי עבודה.

  4. לוחצים על ספק הזהויות שיצרתם בקטע יצירת ספק זהויות ב-Confluent Cloud.

  5. לוחצים על הוספת מאגר.

  6. מזינים שם ותיאור למאגר הזהויות.

  7. מגדירים את Identity claim לערך claims.

  8. בקטע הגדרת מסננים, לוחצים על הכרטיסייה מתקדם. מזינים את הקוד הבא:

    claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
    

    מחליפים את <SERVICE_ACCOUNT_UNIQUE_ID> במזהה הייחודי של חשבון השירות שמופיע בקטע תיעוד המזהה הייחודי של חשבון השירות.

  9. לוחצים על הבא.

  10. לוחצים על הוספת הרשאה חדשה. ואז לוחצים על הבא.

  11. באשכול הרלוונטי, לוחצים על הוספת הקצאת תפקיד.

  12. לוחצים על התפקיד מפעיל ואז על הוספה.

    התפקיד הזה מעניק לחשבון השירות של Pub/Sub גישה לאשכול שמכיל את הנושא של Confluent Kafka שרוצים להעביר ל-Pub/Sub.

  13. מתחת לאשכול, לוחצים על נושאים. אחר כך לוחצים על הוספת הקצאת תפקיד.

  14. בוחרים בתפקיד DeveloperRead.

  15. לוחצים על האפשרות המתאימה ומציינים את הנושא או הקידומת. לדוגמה, נושא ספציפי, כלל קידומת או כל הנושאים.

  16. לוחצים על הוספה.

  17. לוחצים על הבא.

  18. לוחצים על אימות ושמירה.

הוספת התפקיד 'פרסום הודעות ב-Pub/Sub' לחשבון המשתמש ב-Pub/Sub

כדי להפעיל את הפרסום, צריך להקצות תפקיד של פרסום לחשבון השירות של Pub/Sub, כדי ש-Pub/Sub יוכל לפרסם בנושא הייבוא של Confluent Cloud.

הוספת התפקיד של סוכן שירות Pub/Sub לחשבון השירות של Pub/Sub

כדי לאפשר ל-Pub/Sub להשתמש במכסת הפרסום של פרויקט נושא הייבוא, לסוכן השירות של Pub/Sub נדרשת ההרשאה serviceusage.services.use בפרויקט של נושא הייבוא.

כדי לתת את ההרשאה הזו, מומלץ להוסיף את תפקיד סוכן השירות של Pub/Sub לחשבון השירות של Pub/Sub.

אם לחשבון השירות של Pub/Sub אין את התפקיד של סוכן שירות Pub/Sub, אפשר להקצות אותו באופן הבא:

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. מסמנים את התיבה Include Google-provided role grants.

  3. מחפשים את חשבון השירות בפורמט service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. לוחצים על הלחצן Edit principal (עריכת חשבון המשתמש) בחשבון השירות הרלוונטי.

  5. אם צריך, לוחצים על הוספת תפקיד נוסף.

  6. מחפשים את התפקיד של סוכן השירות Pub/Sub (roles/pubsub.serviceAgent) ולוחצים עליו.

  7. לוחצים על Save.

הפעלת פרסום מכל הנושאים

כדאי להשתמש בשיטה הזו אם לא יצרתם נושאים לייבוא ל-Confluent Cloud.

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. מסמנים את התיבה Include Google-provided role grants.

  3. מחפשים את חשבון השירות בפורמט service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. לוחצים על הלחצן Edit principal (עריכת חשבון המשתמש) בחשבון השירות הרלוונטי.

  5. אם צריך, לוחצים על הוספת תפקיד נוסף.

  6. מחפשים את התפקיד 'פרסום הודעות ב-Pub/Sub' (roles/pubsub.publisher) ולוחצים עליו.

  7. לוחצים על Save.

הפעלת פרסום מנושא יחיד

משתמשים בשיטה הזו רק אם נושא הייבוא של Confluent Cloud כבר קיים.

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID: מזהה הנושא של נושא הייבוא ב-Confluent Cloud.

    • PROJECT_NUMBER: מספר הפרויקט. במאמר זיהוי פרויקטים מוסבר איך לראות את מספר הפרויקט.

הוספת התפקיד 'משתמש בחשבון שירות' לחשבון השירות

התפקיד 'משתמש בחשבון השירות' (roles/iam.serviceAccountUser) כולל את ההרשאה iam.serviceAccounts.actAs שמאפשרת לישות מורשית לצרף חשבון שירות להגדרות הטמעת הנתונים של נושא הייבוא ב-Confluent Cloud ולהשתמש בחשבון השירות הזה לצורך זהות מאוחדת.

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. לוחצים על הלחצן Edit principal (עריכת חשבון המשתמש) בשורה של חשבון המשתמש שמוציא את הקריאות ליצירה או לעדכון של נושא.

  3. אם צריך, לוחצים על הוספת תפקיד נוסף.

  4. מחפשים את התפקיד 'משתמש בחשבון שירות' (roles/iam.serviceAccountUser) ולוחצים עליו.

  5. לוחצים על Save.

שימוש בנושאים של ייבוא Confluent Cloud

אפשר ליצור נושא חדש לייבוא או לערוך נושא קיים.

לתשומת ליבכם

  • יצירת הנושא והמינוי בנפרד, גם אם היא נעשית ברצף מהיר, עלולה לגרום לאובדן נתונים. יש חלון זמן קצר שבו הנושא קיים ללא מינוי. אם נתונים יישלחו לנושא במהלך הזמן הזה, הם יאבדו. אם יוצרים את הנושא קודם, יוצרים את המינוי ואז ממירים את הנושא לנושא ייבוא, אפשר להבטיח שלא יפספסו הודעות במהלך תהליך הייבוא.

  • אם אתם צריכים ליצור מחדש את נושא Kafka של נושא קיים לייבוא עם אותו שם, אתם לא יכולים פשוט למחוק את נושא Kafka וליצור אותו מחדש. פעולה כזו עלולה לפגוע בניהול ההיסט של Pub/Sub, מה שעלול להוביל לאובדן נתונים. כדי למנוע זאת, צריך לפעול לפי השלבים הבאים:

    • מוחקים את נושא הייבוא ב-Pub/Sub.
    • מוחקים את נושא Kafka.
    • יוצרים את נושא Kafka.
    • יוצרים את נושא הייבוא ב-Pub/Sub.
  • הנתונים מנושא Confluent Cloud Kafka תמיד נקראים מההיסט המוקדם ביותר.

יצירת נושא לייבוא ב-Confluent Cloud

מידע נוסף על מאפיינים שמשויכים לנושא

חשוב לוודא שהשלמתם את הפעולות הבאות:

כדי ליצור נושא ייבוא ב-Confluent Cloud:

המסוף

  1. במסוף Google Cloud, עוברים לדף Topics.

    לדף Topics

  2. לוחצים על יצירת נושא.
  3. בשדה Topic ID (מזהה הנושא), מזינים מזהה לנושא הייבוא. מידע נוסף על מתן שמות לנושאים זמין בהנחיות למתן שמות.
  4. בוחרים באפשרות הוספת מינוי שמוגדר כברירת מחדל.
  5. בוחרים באפשרות הפעלת הטמעה.
  6. בקטע 'מקור ההטמעה', בוחרים באפשרות Confluent Cloud.
  7. ממלאים את הפרטים הבאים:
    1. שרת Bootstrap: שרת ה-bootstrap של האשכול שמכיל את נושא Kafka שאתם מטמיעים ב-Pub/Sub. הפורמט הוא: hostname:port.
    2. מזהה האשכול: המזהה של האשכול שמכיל את נושא Kafka שאתם מבצעים לו העברה ל-Pub/Sub.
    3. Topic: השם של נושא Kafka שמתבצעת ממנו העברה ל-Pub/Sub.
    4. מזהה מאגר הזהויות: מזהה המאגר של מאגר הזהויות שמשמש לאימות ב-Confluent Cloud.
    5. חשבון שירות: חשבון השירות שיצרתם במאמר יצירת חשבון שירות ב-Google Cloud.
  8. לוחצים על יצירת נושא.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud pubsub topics create:
    gcloud pubsub topics create TOPIC_ID 
    --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER
    --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID
    --confluent-cloud-ingestion-topic CONFLUENT_TOPIC
    --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID
    --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID: השם או המזהה של נושא ה-Pub/Sub.
    • CONFLUENT_BOOTSTRAP_SERVER: שרת ה-bootstrap של האשכול שמכיל את נושא Kafka שמתבצעת ממנו העברה ל-Pub/Sub. הפורמט הוא: hostname:port.
    • CONFLUENT_CLUSTER_ID: המזהה של האשכול שמכיל את נושא Kafka שמתבצעת ממנו העברה ל-Pub/Sub.
    • CONFLUENT_TOPIC: השם של נושא Kafka שמתבצעת ממנו העברה ל-Pub/Sub.
    • CONFLUENT_IDENTITY_POOL_ID: מזהה מאגר הזהויות שמשמש לאימות ב-Confluent Cloud.
    • PUBSUB_SERVICE_ACCOUNT: חשבון השירות שיצרתם בשלב יצירת חשבון שירות ב-Google Cloud.

C++‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C++‎ במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה בנושא Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& bootstrap_server,
   std::string const& cluster_id, std::string const& confluent_topic,
   std::string const& identity_pool_id,
   std::string const& gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
                              ->mutable_confluent_cloud();
  confluent_cloud->set_bootstrap_server(bootstrap_server);
  confluent_cloud->set_cluster_id(cluster_id);
  confluent_cloud->set_topic(confluent_topic);
  confluent_cloud->set_identity_pool_id(identity_pool_id);
  confluent_cloud->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, bootstrapServer, clusterID, confluentTopic, poolID, gcpSA string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"

	// // Confluent Cloud ingestion settings.
	// bootstrapServer := "bootstrap-server"
	// clusterID := "cluster-id"
	// confluentTopic := "confluent-topic"
	// poolID := "identity-pool-id"
	// gcpSA := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	topicpb := &pubsubpb.Topic{
		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_ConfluentCloud_{
				ConfluentCloud: &pubsubpb.IngestionDataSourceSettings_ConfluentCloud{
					BootstrapServer:   bootstrapServer,
					ClusterId:         clusterID,
					Topic:             confluentTopic,
					IdentityPoolId:    poolID,
					GcpServiceAccount: gcpSA,
				},
			},
		},
	}
	topic, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Created topic with Confluent Cloud ingestion: %v\n", topic)
	return nil
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithConfluentCloudIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Confluent Cloud ingestion settings.
    String bootstrapServer = "bootstrap-server";
    String clusterId = "cluster-id";
    String confluentTopic = "confluent-topic";
    String identityPoolId = "identity-pool-id";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithConfluentCloudIngestionExample(
        projectId,
        topicId,
        bootstrapServer,
        clusterId,
        confluentTopic,
        identityPoolId,
        gcpServiceAccount);
  }

  public static void createTopicWithConfluentCloudIngestionExample(
      String projectId,
      String topicId,
      String bootstrapServer,
      String clusterId,
      String confluentTopic,
      String identityPoolId,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.ConfluentCloud confluentCloud =
          IngestionDataSourceSettings.ConfluentCloud.newBuilder()
              .setBootstrapServer(bootstrapServer)
              .setClusterId(clusterId)
              .setTopic(confluentTopic)
              .setIdentityPoolId(identityPoolId)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמר Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId,
  bootstrapServer,
  clusterId,
  confluentTopic,
  identityPoolId,
  gcpServiceAccount,
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Node.ts

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמר Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId: string,
  bootstrapServer: string,
  clusterId: string,
  confluentTopic: string,
  identityPoolId: string,
  gcpServiceAccount: string,
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה של Pub/Sub Python API.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bootstrap_server = "your-bootstrap-server"
# cluster_id = "your-cluster-id"
# confluent_topic = "your-confluent-topic"
# identity_pool_id = "your-identity-pool-id"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
            bootstrap_server=bootstrap_server,
            cluster_id=cluster_id,
            topic=confluent_topic,
            identity_pool_id=identity_pool_id,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")

נתקלתם בבעיות? היעזרו במאמר בנושא פתרון בעיות בייבוא נושא ב-Confluent Cloud.

עריכה של נושא ייבוא ב-Confluent Cloud Hubs

כדי לערוך את הגדרות מקור הנתונים של נושא ייבוא ב-Confluent Cloud, פועלים לפי השלבים הבאים:

המסוף

  1. נכנסים לדף Topics במסוף Google Cloud .

    לדף Topics

  2. לוחצים על הנושא של הייבוא ב-Confluent Cloud.

  3. בדף הפרטים של הנושא, לוחצים על עריכה.

  4. מעדכנים את השדות שרוצים לשנות.

  5. לוחצים על עדכון.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

    כדי לא לאבד את ההגדרות של נושא הייבוא, חשוב לכלול את כולן בכל פעם שמעדכנים את הנושא. אם משמיטים משהו, ההגדרה ב-Pub/Sub מתאפסת לערך ברירת המחדל המקורי שלה.

  2. מריצים את הפקודה gcloud pubsub topics update עם כל הדגלים שמוזכרים בדוגמה הבאה:

    gcloud pubsub topics update TOPIC_ID \
       --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
       --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
       --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
       --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
       --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID: השם או המזהה של נושא Pub/Sub.
    • CONFLUENT_BOOTSTRAP_SERVER: שרת האתחול של האשכול שמכיל את נושא Kafka שמתבצעת ממנו העברה ל-Pub/Sub. הפורמט הוא: hostname:port.
    • CONFLUENT_CLUSTER_ID: המזהה של האשכול שמכיל את נושא Kafka שמתבצעת ממנו העברה ל-Pub/Sub
    • CONFLUENT_TOPIC: השם של נושא Kafka שמתבצעת ממנו העברה ל-Pub/Sub.
    • CONFLUENT_IDENTITY_POOL_ID: מזהה מאגר הזהויות שמשמש לאימות ב-Confluent Cloud.
    • CONFLUENT_IDENTITY_POOL_ID: חשבון השירות שיצרתם בשלב יצירת חשבון שירות ב-Google Cloud.

מכסות ומגבלות

התפוקה של המוציא לאור בנושאים של ייבוא מוגבלת על ידי מכסת הפרסום של הנושא. מידע נוסף זמין במאמר מכסות ומגבלות ב-Pub/Sub.

המאמרים הבאים

Apache Kafka®‎ הוא סימן מסחרי רשום של The Apache Software Foundation או של השותפים העצמאיים שלה בארצות הברית או במדינות אחרות.