שינוי סוג הנושא

אפשר להמיר נושא מיובא לנושא רגיל, או להפך.

המרת נושא מיובא לנושא רגיל

כדי להמיר נושא מיובא לנושא רגיל, צריך לנקות את הגדרות ההטמעה. כך עושים את זה:

המסוף

  1. נכנסים לדף Topics במסוף Google Cloud .

    לדף Topics

  2. לוחצים על הנושא לייבוא.

  3. בדף הפרטים של הנושא, לוחצים על עריכה.

  4. מבטלים את הסימון של האפשרות הפעלת הטמעה.

  5. לוחצים על עדכון.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud pubsub topics update:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    מחליפים את TOPIC_ID במזהה הנושא.

המרת נושא רגיל לנושא ייבוא של Amazon Kinesis Data Streams

כדי להמיר נושא רגיל לנושא ייבוא של Amazon Kinesis Data Streams, קודם צריך לוודא שאתם עומדים בכל הדרישות המוקדמות.

המסוף

  1. נכנסים לדף Topics במסוף Google Cloud .

    לדף Topics

  2. לוחצים על הנושא שרוצים להמיר לנושא מיובא.

  3. בדף הפרטים של הנושא, לוחצים על עריכה.

  4. בוחרים באפשרות הפעלת ההעברה.

  5. בקטע 'מקור להעברה', בוחרים באפשרות Amazon Kinesis Data Streams.

  6. מזינים את הפרטים הבאים:

    • Kinesis Stream ARN: ה-ARN של Kinesis Data Stream שאתם מתכננים להעביר ל-Pub/Sub. הפורמט של ARN הוא: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • Kinesis Consumer ARN: ה-ARN של משאב הצרכן שרשום ב-AWS Kinesis Data Stream. הפורמט של ה-ARN הוא: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • AWS Role ARN: ה-ARN של התפקיד ב-AWS. הפורמט של ה-ARN של התפקיד הוא: arn:aws:iam::${Account}:role/${RoleName}.

    • חשבון שירות: חשבון השירות שיצרתם.

  7. לוחצים על עדכון.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud pubsub topics update עם כל הדגלים שמוזכרים בדוגמה הבאה:

    gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID הוא המזהה או השם של הנושא. אי אפשר לעדכן את השדה הזה.

    • KINESIS_STREAM_ARN הוא ה-ARN של Kinesis Data Streams שאתם מתכננים להעביר ל-Pub/Sub. הפורמט של ה-ARN הוא: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN הוא ה-ARN של משאב הצרכן שנרשם ב-AWS Kinesis Data Streams. הפורמט של ה-ARN הוא: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN הוא ה-ARN של תפקיד ה-AWS. הפורמט של ה-ARN של התפקיד הוא: arn:aws:iam::${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT הוא חשבון השירות שיצרתם.

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התחלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר הפעלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של ה-API בשפת Python של Pub/Sub.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C++‎ במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה של Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר הפעלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

מידע נוסף על שמות משאבים ב-Amazon (ARN) זמין במאמרים שמות משאבים ב-Amazon (ARN) ומזהים ב-IAM.

המרת נושא רגיל לנושא ייבוא של Cloud Storage

כדי להמיר נושא רגיל לנושא ייבוא של Cloud Storage, קודם צריך לוודא שאתם עומדים בכל הדרישות המוקדמות.

המסוף

  1. נכנסים לדף Topics במסוף Google Cloud .

    לדף Topics

  2. לוחצים על הנושא שרוצים להמיר לנושא ייבוא של Cloud Storage.

  3. בדף הפרטים של הנושא, לוחצים על עריכה.

  4. בוחרים באפשרות הפעלת ההעברה.

  5. בקטע 'מקור ההטמעה', בוחרים באפשרות Google Cloud Storage.

  6. בקטע קטגוריה של Cloud Storage, לוחצים על Browse.

    נפתח הדף Select bucket. בוחרים באחת מהאפשרויות הבאות:

    • בוחרים דלי קיים מכל פרויקט מתאים.

    • לוחצים על סמל היצירה ופועלים לפי ההוראות במסך כדי ליצור מאגר חדש. אחרי שיוצרים את הקטגוריה, בוחרים אותה בנושא הייבוא של Cloud Storage.

  7. כשמציינים את הקטגוריה, שירות Pub/Sub בודק אם לחשבון השירות של Pub/Sub יש את ההרשאות המתאימות בקטגוריה. אם יש בעיות בהרשאות, תוצג הודעת שגיאה שקשורה להרשאות.

    אם יש בעיות בהרשאות, לוחצים על הגדרת הרשאות. מידע נוסף זמין במאמר מתן הרשאות Cloud Storage לחשבון השירות של Pub/Sub.

  8. בשדה Object format, בוחרים באפשרות Text,‏ Avro או Pub/Sub Avro.

    אם בוחרים באפשרות טקסט, אפשר לציין תו מפריד שבאמצעותו יפוצלו האובייקטים להודעות.

    מידע נוסף על האפשרויות האלה זמין במאמר בנושא פורמט קלט.

  9. זה שינוי אופציונלי. אפשר לציין זמן מינימלי ליצירת אובייקט לנושא. אם ההגדרה הזו מוגדרת, המערכת מבצעת המרה רק לאובייקטים שנוצרו אחרי הזמן המינימלי ליצירת אובייקט.

    מידע נוסף זמין במאמר בנושא זמן מינימלי ליצירת אובייקט.

  10. צריך לציין תבנית Glob. כדי להטמיע את כל האובייקטים בקטגוריה, משתמשים ב-** כדפוס glob. רק אובייקטים שתואמים לתבנית הנתונה ייכללו.

    מידע נוסף זמין במאמר בנושא התאמה לתבנית glob.

  11. משאירים את שאר הגדרות ברירת המחדל.
  12. לוחצים על עדכון הנושא.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. כדי לא לאבד את ההגדרות של נושא הייבוא, חשוב לכלול את כל ההגדרות בכל פעם שמעדכנים את הנושא. אם משמיטים משהו, Pub/Sub מאפס את ההגדרה לערך ברירת המחדל המקורי שלה.

    מריצים את הפקודה gcloud pubsub topics update עם כל הדגלים שמוזכרים בדוגמה הבאה:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID הוא המזהה או השם של הנושא. אי אפשר לעדכן את השדה הזה.

    • BUCKET_NAME: ציון השם של קטגוריה קיימת. לדוגמה, prod_bucket. שם הקטגוריה לא יכול לכלול את מזהה הפרויקט. במאמר יצירת קטגוריות מוסבר איך ליצור קטגוריה.

    • INPUT_FORMAT: מציין את הפורמט של האובייקטים שמועברים. הערך יכול להיות text, avro או pubsub_avro. מידע נוסף על האפשרויות האלה זמין במאמר בנושא פורמט קלט.

    • TEXT_DELIMITER: מציין את התו המפריד שבו יש לפצל אובייקטים של טקסט להודעות Pub/Sub. התו המפריד חייב להיות תו יחיד, וצריך להגדיר אותו רק אם הערך של INPUT_FORMAT הוא text. ברירת המחדל היא תו השורה החדשה (\n).

      כשמשתמשים ב-CLI של gcloud כדי לציין את התו שמפריד בין הערכים, צריך לשים לב במיוחד לטיפול בתווים מיוחדים כמו מעבר שורה \n. כדי לוודא שהתו המפריד יפורש בצורה נכונה, צריך להשתמש בפורמט '\n'. שימוש ב-\n בלי מרכאות או בלי escape יגרום להגדרת התו "n" כתו מפריד.

    • MINIMUM_OBJECT_CREATE_TIME: מציין את הזמן המינימלי שחלף מאז שאובייקט נוצר, כדי שהוא ייכלל בתהליך ההטמעה. הערך צריך להיות בפורמט UTC‏ YYYY-MM-DDThh:mm:ssZ. לדוגמה, 2024-10-14T08:30:30Z.

      כל תאריך, בעבר או בעתיד, מ-0001-01-01T00:00:00Z עד 9999-12-31T23:59:59Z כולל, הוא תקין.

    • MATCH_GLOB: מציין את תבנית ה-glob להתאמה כדי שאובייקט ייקלט. כשמשתמשים ב-CLI של gcloud, אם מחפשים התאמה באמצעות glob עם התווים *, צריך להוסיף לפני התו * את התו * כדי לבטל את המשמעות המיוחדת שלו, כלומר להשתמש בפורמט \*\*.txt, או להוסיף את כל ה-glob של ההתאמה במירכאות "**.txt" או '**.txt'. מידע על התחביר הנתמך של תבניות glob מופיע ב מאמרי העזרה של Cloud Storage.