יצירת נושא לייבוא של Amazon Kinesis Data Streams

נושא ייבוא של Amazon Kinesis Data Streams מאפשר לכם להטמיע נתונים באופן רציף מ-Amazon Kinesis Data Streams כמקור חיצוני אל Pub/Sub. אחר כך אפשר להזרים את הנתונים לכל אחד מהיעדים שנתמכים ב-Pub/Sub.

מידע נוסף על ייבוא נושאים זמין במאמר מידע על ייבוא נושאים.

לפני שמתחילים

תפקידים והרשאות נדרשים

כדי לקבל את ההרשאות שנדרשות ליצירה ולניהול של נושאי ייבוא של Amazon Kinesis Data Streams, צריך לבקש מהאדמין להקצות לכם את תפקיד ה-IAM‏ Pub/Sub Editor (roles/pubsub.editor) בנושא או בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות ליצירה ולניהול של נושאי ייבוא של Amazon Kinesis Data Streams. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי ליצור ולנהל נושאי ייבוא של Amazon Kinesis Data Streams, נדרשות ההרשאות הבאות:

  • יוצרים נושא לייבוא: pubsub.topics.create
  • מחיקת נושא ייבוא: pubsub.topics.delete
  • קבלת נושא לייבוא: pubsub.topics.get
  • הצגת רשימה של נושא ייבוא: pubsub.topics.list
  • פרסום בנושא ייבוא: pubsub.topics.publish ו-pubsub.serviceAgent
  • עדכון נושא ייבוא: pubsub.topics.update
  • קבלת מדיניות IAM בנושא ייבוא: pubsub.topics.getIamPolicy
  • מגדירים את מדיניות ה-IAM לנושא ייבוא: pubsub.topics.setIamPolicy

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

אפשר להגדיר בקרת גישה ברמת הפרויקט וברמת המשאב הבודד.

הגדרת זהות מאוחדת לגישה ל-Amazon Kinesis Data Streams

איחוד שירותי אימות הזהות של עומסי עבודה מאפשר לשירותים של Google Cloud לגשת לעומסי עבודה שפועלים מחוץ ל- Google Cloud. באמצעות איחוד שירותי אימות הזהות, לא צריך לשמור או להעביר פרטי כניסה ל- Google Cloud כדי לגשת למשאבים בעננים אחרים. במקום זאת, אפשר להשתמש בזהויות של עומסי העבודה עצמם כדי לבצע אימות ל- Google Cloud ולגשת למשאבים.

יצירת חשבון שירות ב- Google Cloud

זהו שלב אופציונלי. אם כבר יש לכם חשבון שירות, אתם יכולים להשתמש בו במקום ליצור חשבון שירות חדש. אם אתם משתמשים בחשבון שירות קיים, צריך לעבור אל תיעוד המזהה הייחודי של חשבון השירות כדי להמשיך לשלב הבא.

בנושאים של ייבוא מ-Amazon Kinesis Data Streams, ‏ Pub/Sub משתמש בחשבון השירות כזהות לגישה למשאבים מ-AWS.

מידע נוסף על יצירת חשבון שירות, כולל דרישות מוקדמות, תפקידים והרשאות נדרשים והנחיות למתן שמות, זמין במאמר יצירת חשבונות שירות. אחרי שיוצרים חשבון שירות, יכול להיות שיהיה צריך להמתין 60 שניות לפחות לפני שאפשר להשתמש בו. הסיבה לכך היא מודל העקביות ההדרגתי: יכול להיות שייקח זמן עד שחשבון השירות החדש יופיע.

תיעוד המזהה הייחודי של חשבון השירות

כדי להגדיר תפקיד ב-AWS, צריך מזהה ייחודי של חשבון שירות.

  1. נכנסים לדף הפרטים חשבון שירות במסוף Google Cloud .

    כניסה לדף חשבון שירות

  2. לוחצים על חשבון השירות שיצרתם או על חשבון השירות שבו אתם מתכננים להשתמש.

  3. בדף פרטי חשבון השירות, רושמים את המזהה הייחודי.

    תצטרכו את המזהה כחלק מתהליך העבודה כדי להגדיר תפקיד ב-AWS.

הוספת התפקיד 'יצירת אסימונים בחשבון שירות' לחשבון השירות של Pub/Sub

התפקיד 'יצירת אסימונים בחשבון שירות' (roles/iam.serviceAccountTokenCreator) מאפשר לחשבונות משתמשים ליצור פרטי כניסה לטווח קצר לחשבון שירות. האסימונים או פרטי הכניסה האלה משמשים להתחזות לחשבון השירות.

מידע נוסף על התחזות לחשבון שירות זמין במאמר התחזות לחשבון שירות.

במהלך התהליך הזה אפשר גם להוסיף את התפקיד 'פרסום הודעות ב-Pub/Sub' (roles/pubsub.publisher). מידע נוסף על התפקיד והסיבה להוספתו זמין במאמר הוספת התפקיד 'פרסום הודעות ב-Pub/Sub' לחשבון השירות של Pub/Sub.

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. מסמנים את התיבה Include Google-provided role grants.

  3. מחפשים את חשבון השירות בפורמט service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. לוחצים על הלחצן Edit Principal (עריכת חשבון המשתמש) בחשבון השירות הרלוונטי.

  5. אם צריך, לוחצים על הוספת תפקיד נוסף.

  6. מחפשים את התפקיד 'יצירת אסימונים בחשבון שירות' (roles/iam.serviceAccountTokenCreator) ולוחצים עליו.

  7. לוחצים על Save.

יצירת מדיניות ב-AWS

אתם צריכים מדיניות ב-AWS שתאפשר ל-Pub/Sub לבצע אימות ב-AWS כדי ש-Pub/Sub יוכל להטמיע נתונים מ-Amazon Kinesis Data Streams.

כדי ליצור מדיניות ב-AWS:

  1. נכנסים אל AWS Management Console ופותחים את IAM console.

  2. בחלונית הניווט של המסוף IAM, לוחצים על Access Management ‏ > Policies.

  3. לוחצים על יצירת מדיניות.

  4. בקטע Select a service (בחירת שירות), לוחצים על Kinesis.

  5. בקטע פעולה מותרת, לוחצים על אחת מהאפשרויות הבאות:

    • List‏ > ListShards.

      הפעולה הזו מעניקה הרשאה לרשום את הרסיסים בזרם ומספקת מידע על כל רסיס.

    • Read >‏ SubscribeToShard.

      הפעולה הזו מעניקה הרשאה להאזין לשבר ספציפי עם fan-out משופר.

    • Read‏ > DescribeStreamConsumer.

      הפעולה הזו מעניקה הרשאה לקבל את התיאור של צרכן רשום של זרם.

    ההרשאות האלה כוללות קריאה מהשידור. ‫Pub/Sub תומך רק בקריאה מזרם Kinesis עם Enhanced Fan-Out באמצעות streaming SubscribeToShard API.

  6. בקטע Resources, אם רוצים להגביל את המדיניות לזרם או לצרכן ספציפיים (מומלץ), צריך לציין את מספר ה-ARN של הצרכן ואת מספר ה-ARN של הזרם.

  7. לוחצים על הוספת הרשאות נוספות.

  8. בקטע Select a service (בחירת שירות), לוחצים על STS.

  9. בקטע Action allowed (הפעולה מותרת), לוחצים על Write (כתיבה) > AssumeRoleWithWebIdentity (קבלת תפקיד עם זהות אינטרנט).

    הפעולה הזו מעניקה הרשאה לקבל קבוצה של פרטי כניסה זמניים לאבטחה עבור Pub/Sub כדי לבצע אימות ל-Amazon Kinesis Data Streams באמצעות איחוד שירותי אימות הזהות.

  10. לוחצים על הבא.

  11. מזינים שם ותיאור למדיניות.

  12. לוחצים על יצירת מדיניות.

יצירת תפקיד ב-AWS באמצעות מדיניות אמון מותאמת אישית

צריך ליצור תפקיד ב-AWS כדי ש-Pub/Sub יוכל לבצע אימות ב-AWS כדי להטמיע נתונים מ-Amazon Kinesis Data Streams.

  1. נכנסים אל AWS Management Console ופותחים את IAM console.

  2. בחלונית הניווט של המסוף, בקטע IAM, לוחצים על Roles (תפקידים).

  3. לוחצים על יצירת תפקיד.

  4. בקטע Select trusted entity (בחירת ישות מהימנה), לוחצים על Custom trust policy (מדיניות מהימנות בהתאמה אישית).

  5. בקטע Custom trust policy, מזינים או מדביקים את הטקסט הבא:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    מחליפים את <SERVICE_ACCOUNT_UNIQUE_ID> במזהה הייחודי של חשבון השירות שרשמתם בשלב רשימת המזהה הייחודי של חשבון השירות.

  6. לוחצים על הבא.

  7. בקטע Add permissions (הוספת הרשאות), מחפשים את המדיניות המותאמת אישית שיצרתם ולוחצים עליה.

  8. לוחצים על הבא.

  9. מזינים שם ותיאור לתפקיד.

  10. לוחצים על יצירת תפקיד.

הוספת התפקיד 'פרסום הודעות ב-Pub/Sub' לחשבון המשתמש ב-Pub/Sub

כדי להפעיל את הפרסום, צריך להקצות תפקיד של פרסום לחשבון השירות של Pub/Sub, כדי ש-Pub/Sub יוכל לפרסם בנושא הייבוא של Amazon Kinesis Data Streams.

הוספת התפקיד של סוכן שירות Pub/Sub לחשבון השירות של Pub/Sub

כדי לאפשר ל-Pub/Sub להשתמש במכסת הפרסום של פרויקט נושא הייבוא, לסוכן השירות של Pub/Sub נדרשת ההרשאה serviceusage.services.use בפרויקט של נושא הייבוא.

כדי לתת את ההרשאה הזו, מומלץ להוסיף את תפקיד סוכן השירות של Pub/Sub לחשבון השירות של Pub/Sub.

אם לחשבון השירות של Pub/Sub אין את התפקיד של סוכן שירות Pub/Sub, אפשר להקצות אותו באופן הבא:

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. מסמנים את התיבה Include Google-provided role grants.

  3. מחפשים את חשבון השירות בפורמט service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. לוחצים על הלחצן Edit principal (עריכת חשבון המשתמש) בחשבון השירות הרלוונטי.

  5. אם צריך, לוחצים על הוספת תפקיד נוסף.

  6. מחפשים את התפקיד של סוכן השירות Pub/Sub (roles/pubsub.serviceAgent) ולוחצים עליו.

  7. לוחצים על Save.

הפעלת פרסום מכל הנושאים

משתמשים בשיטה הזו אם לא יצרתם נושאים לייבוא של Amazon Kinesis Data Streams.

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. מסמנים את התיבה Include Google-provided role grants.

  3. מחפשים את חשבון השירות בפורמט service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. לוחצים על הלחצן Edit principal (עריכת חשבון המשתמש) בחשבון השירות הרלוונטי.

  5. אם צריך, לוחצים על הוספת תפקיד נוסף.

  6. מחפשים את התפקיד 'פרסום הודעות ב-Pub/Sub' (roles/pubsub.publisher) ולוחצים עליו.

  7. לוחצים על Save.

הפעלת פרסום מנושא יחיד

משתמשים בשיטה הזו רק אם נושא הייבוא של Amazon Kinesis Data Streams כבר קיים.

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID: מזהה הנושא של נושא הייבוא של Amazon Kinesis Data Streams.

    • PROJECT_NUMBER: מספר הפרויקט. במאמר זיהוי פרויקטים מוסבר איך לראות את מספר הפרויקט.

הוספת התפקיד 'משתמש בחשבון שירות' לחשבון השירות

התפקיד 'משתמש בחשבון השירות' (roles/iam.serviceAccountUser) כולל את ההרשאה iam.serviceAccounts.actAs שמאפשרת לישות מורשית לצרף חשבון שירות להגדרות הטמעת הנתונים של נושא הייבוא של Amazon Kinesis Data Streams ולהשתמש בחשבון השירות הזה לצורך זהות מאוחדת.

  1. נכנסים לדף IAM במסוף Google Cloud .

    כניסה לדף IAM

  2. לוחצים על הלחצן Edit principal (עריכת חשבון המשתמש) בשורה של חשבון המשתמש שמוציא את הקריאות ליצירה או לעדכון של נושא.

  3. אם צריך, לוחצים על הוספת תפקיד נוסף.

  4. מחפשים את התפקיד 'משתמש בחשבון שירות' (roles/iam.serviceAccountUser) ולוחצים עליו.

  5. לוחצים על Save.

שימוש בנושאים של Amazon Kinesis Data Streams

אפשר ליצור נושא חדש לייבוא או לערוך נושא קיים.

לתשומת ליבכם

  • יצירת הנושא והמינוי בנפרד, גם אם היא נעשית ברצף מהיר, עלולה לגרום לאובדן נתונים. יש חלון זמן קצר שבו הנושא קיים בלי מינוי. אם נתונים כלשהם יישלחו לנושא במהלך הזמן הזה, הם יאבדו. אם יוצרים את הנושא קודם, יוצרים את המינוי ואז ממירים את הנושא לנושא ייבוא, אפשר להבטיח שלא יפספסו הודעות במהלך תהליך הייבוא.

יצירת נושא לייבוא של Amazon Kinesis Data Streams

מידע נוסף על מאפיינים שמשויכים לנושא

חשוב לוודא שהשלמתם את הפעולות הבאות:

כדי ליצור נושא לייבוא של Amazon Kinesis Data Streams, פועלים לפי השלבים הבאים:

המסוף

  1. נכנסים לדף Topics במסוף Google Cloud .

    לדף Topics

  2. לוחצים על יצירת נושא.

  3. בשדה Topic ID (מזהה הנושא), מזינים מזהה לנושא הייבוא של Amazon Kinesis Data Streams.

    מידע נוסף על מתן שמות לנושאים זמין בהנחיות למתן שמות.

  4. בוחרים באפשרות הוספת מינוי שמוגדר כברירת מחדל.

  5. בוחרים באפשרות הפעלת הטמעה.

  6. בקטע 'מקור להעברה', בוחרים באפשרות Amazon Kinesis Data Streams.

  7. מזינים את הפרטים הבאים:

    • Kinesis Stream ARN: ה-ARN של Kinesis Data Stream שאתם מתכננים להעביר ל-Pub/Sub. הפורמט של ה-ARN הוא: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • Kinesis Consumer ARN: ה-ARN של משאב הצרכן שרשום ב-AWS Kinesis Data Stream. הפורמט של ARN הוא: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}.

    • AWS Role ARN: ה-ARN של התפקיד ב-AWS. הפורמט של ה-ARN של התפקיד הוא כדלקמן: arn:aws:iam::${Account}:role/${RoleName}

    • חשבון שירות: חשבון השירות שיצרתם במאמר יצירת חשבון שירות ב- Google Cloud.

  8. לוחצים על יצירת נושא.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID \
        --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN \
        --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN \
        --kinesis-ingestion-role-arn KINESIS_ROLE_ARN \
        --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
    

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID: מזהה הנושא.
    • KINESIS_STREAM_ARN: מספר ה-ARN של מקורות Kinesis Data Streams שאתם מתכננים להעביר ל-Pub/Sub. הפורמט של ה-ARN הוא כדלקמן: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.
    • KINESIS_CONSUMER_ARN: ה-ARN של משאב הצרכן שרשום ב-AWS Kinesis Data Streams. הפורמט של ה-ARN הוא: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}.
    • KINESIS_ROLE_ARN: ה-ARN של תפקיד AWS. פורמט ה-ARN של התפקיד הוא: arn:aws:iam::${Account}:role/${RoleName}.
    • PUBSUB_SERVICE_ACCOUNT: חשבון השירות שיצרתם במאמר יצירת חשבון שירות ב-Google Cloud.

C++‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C++‎ במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה בנושא Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis =
      request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	pubsub "cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createTopicWithKinesisIngestion(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topicID := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	topicpb := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	topicpb, err = client.TopicAdminClient.CreateTopic(ctx, topicpb)
	if err != nil {
		return fmt.Errorf("failed to create topic with kinesis: %w", err)
	}
	fmt.Fprintf(w, "Kinesis topic created: %v\n", topicpb)
	return nil
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithKinesisIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithKinesisIngestionExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithKinesisIngestionExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמר Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  // Creates a new topic with Kinesis ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Node.ts

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמר Pub/Sub Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  // Creates a new topic with Kinesis ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר תחילת העבודה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה של Pub/Sub Python API.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
            stream_arn=stream_arn,
            consumer_arn=consumer_arn,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

מידע נוסף על שמות משאבים ב-Amazon (ARN) זמין במאמרים שמות משאבים ב-Amazon (ARN) ומזהים ב-IAM.

נתקלתם בבעיות? כך פותרים בעיות בייבוא של נושאים מ-Amazon Kinesis Data Streams.

עריכת נושא ייבוא של Amazon Kinesis Data Streams

אפשר לערוך את ההגדרות של מקור הנתונים לייבוא של נושא ב-Amazon Kinesis Data Streams. כך עושים את זה:

המסוף

  1. נכנסים לדף Topics במסוף Google Cloud .

    לדף Topics

  2. לוחצים על נושא הייבוא של Amazon Kinesis Data Streams.

  3. בדף הפרטים של הנושא, לוחצים על עריכה.

  4. מעדכנים את השדות שרוצים לשנות.

  5. לוחצים על עדכון.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. כדי לא לאבד את ההגדרות של נושא הייבוא, חשוב לכלול את כל ההגדרות בכל פעם שמעדכנים את הנושא. אם משמיטים משהו, Pub/Sub מאפס את ההגדרה לערך ברירת המחדל המקורי שלה.

    מריצים את הפקודה gcloud pubsub topics update עם כל הדגלים שמוזכרים בדוגמה הבאה:

    gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID הוא מזהה הנושא. אי אפשר לעדכן את השדה הזה.

    • KINESIS_STREAM_ARN הוא ה-ARN של Kinesis Data Streams שאתם מתכננים להעביר ל-Pub/Sub. הפורמט של ה-ARN הוא: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN הוא ה-ARN של משאב הצרכן שנרשם ב-AWS Kinesis Data Streams. הפורמט של ה-ARN הוא: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}.

    • KINESIS_ROLE_ARN הוא ה-ARN של תפקיד ה-AWS. הפורמט של ה-ARN של התפקיד הוא: arn:aws:iam::${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT הוא חשבון השירות שיצרתם בקטע יצירת חשבון שירות ב- Google Cloud.

מכסות ומגבלות בנושאים של ייבוא מ-Amazon Kinesis Data Streams

התפוקה של המוציא לאור בנושאים של ייבוא מוגבלת על ידי מכסת הפרסום של הנושא. מידע נוסף זמין במאמר מכסות ומגבלות ב-Pub/Sub.

המאמרים הבאים

Apache Kafka®‎ הוא סימן מסחרי רשום של The Apache Software Foundation או של השותפים העצמאיים שלה בארצות הברית או במדינות אחרות.