כתיבה מ-Dataflow ל-Pub/Sub

במאמר הזה מוסבר איך לכתוב נתוני טקסט מ-Dataflow ל-Pub/Sub באמצעות מחבר הקלט/פלט של Apache Beam PubSubIO.

סקירה כללית

כדי לכתוב נתונים ל-Pub/Sub, משתמשים במחבר PubSubIO. רכיבי הקלט יכולים להיות הודעות Pub/Sub או רק נתוני ההודעה. אם רכיבי הקלט הם הודעות Pub/Sub, אפשר להגדיר מאפיינים או מפתח סדר לכל הודעה.

אפשר להשתמש בגרסת Java,‏ Python או Go של המחבר PubSubIO, באופן הבא:

Java

כדי לכתוב לנושא אחד, צריך להפעיל את method‏ PubsubIO.writeMessages. השיטה הזו מקבלת אוסף קלט של אובייקטים מסוג PubsubMessage. המחבר מגדיר גם שיטות נוחות לכתיבת מחרוזות, הודעות Avro עם קידוד בינארי או הודעות protobuf עם קידוד בינארי. השיטות האלה ממירות את אוסף הקלט להודעות Pub/Sub.

כדי לכתוב למערך דינמי של נושאים על סמך נתוני הקלט, קוראים ל-writeMessagesDynamic. מציינים את נושא היעד לכל הודעה על ידי קריאה ל-PubsubMessage.withTopic בהודעה. לדוגמה, אתם יכולים לנתב הודעות לנושאים שונים על סמך הערך של שדה מסוים בנתוני הקלט.

מידע נוסף מופיע במאמרי העזרה של PubsubIO.

Python

מבצעים קריאה ל-method‏ pubsub.WriteToPubSub. כברירת מחדל, השיטה הזו מקבלת אוסף קלט מסוג bytes, שמייצג את המטען הייעודי (Payload) של ההודעה. אם הפרמטר with_attributes הוא True, השיטה מקבלת אוסף של אובייקטים מסוג PubsubMessage.

מידע נוסף מופיע במאמרי העזרה של מודול pubsub.

Go

כדי לכתוב נתונים ל-Pub/Sub, מפעילים את השיטה pubsubio.Write. השיטה הזו מקבלת אוסף קלט של אובייקטים מסוג PubSubMessage או פרוסות של בייטים שמכילות את מטעני הייעוד של ההודעה.

מידע נוסף מופיע במאמרי העזרה בנושא חבילת pubsubio.

מידע נוסף על הודעות Pub/Sub זמין במאמר פורמט הודעה במסמכי Pub/Sub.

חותמות זמן

ב-Pub/Sub מוגדרת חותמת זמן לכל הודעה. חותמת הזמן הזו מייצגת את השעה שבה ההודעה פורסמה ב-Pub/Sub. בתרחיש של עיבוד סטרימינג, יכול להיות שיהיה לכם חשוב גם חותמת הזמן של האירוע, כלומר הזמן שבו נוצרו נתוני ההודעה. אפשר להשתמש בחותמת הזמן של הרכיב ב-Apache Beam כדי לייצג את זמן האירוע. מקורות שיוצרים PCollection ללא הגבלה לרוב מקצים לכל רכיב חדש חותמת זמן שתואמת לזמן האירוע.

ב-Java וב-Python, מחבר ה-I/O של Pub/Sub יכול לכתוב את חותמת הזמן של כל רכיב כמאפיין של הודעת Pub/Sub. צרכני הודעות יכולים להשתמש במאפיין הזה כדי לקבל את חותמת הזמן של האירוע.

Java

קוראים ל-PubsubIO.Write<T>.withTimestampAttribute ומציינים את שם המאפיין.

Python

מציינים את הפרמטר timestamp_attribute כשמתקשרים אל WriteToPubSub.

מסירת הודעות

‫Dataflow תומך בעיבוד של הודעות בדיוק פעם אחת בצינור. עם זאת, מחבר הקלט/פלט של Pub/Sub לא יכול להבטיח מסירה של הודעות דרך Pub/Sub בדיוק פעם אחת.

ב-Java וב-Python, אפשר להגדיר את מחבר הקלט/פלט של Pub/Sub כך שיכתוב את המזהה הייחודי של כל רכיב כמאפיין של הודעה. צרכני ההודעות יכולים להשתמש במאפיין הזה כדי לבטל כפילויות של הודעות.

Java

קוראים ל-PubsubIO.Write<T>.withIdAttribute ומציינים את שם המאפיין.

Python

מציינים את הפרמטר id_label כשמתקשרים אל WriteToPubSub.

פלט ישיר

אם מפעילים מצב סטרימינג של לפחות פעם אחת בצינור, מחבר הקלט/פלט משתמש בפלט ישיר. במצב הזה, המחבר לא מבצע נקודות ביקורת בהודעות, מה שמאפשר כתיבה מהירה יותר. עם זאת, ניסיונות חוזרים במצב הזה עלולים לגרום לשכפול הודעות עם מזהי הודעות שונים, מה שיכול להקשות על צרכני ההודעות לבטל את הכפילות של ההודעות.

בצינורות שמשתמשים במצב 'פעם אחת בדיוק', אפשר להפעיל פלט ישיר על ידי הגדרת streaming_enable_pubsub_direct_output אפשרות השירות. פלט ישיר מפחית את זמן האחזור של הכתיבה ומוביל לעיבוד יעיל יותר. מומלץ להשתמש באפשרות הזו אם צרכני ההודעות יכולים לטפל בהודעות כפולות עם מזהי הודעות לא ייחודיים.

דוגמאות

בדוגמה הבאה נוצר PCollection של הודעות Pub/Sub והן נכתבות לנושא Pub/Sub. הנושא מצוין כאפשרות של צינור. כל הודעה מכילה נתוני מטען ייעודי (payload) וקבוצה של מאפיינים.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;



public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {"buyer": item["name"], "timestamp": str(item["ts"])}
    data = bytes(item["product"], "utf-8")

    return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {"name": "Robert", "product": "TV", "ts": 1613141590000},
        {"name": "Maria", "product": "Phone", "ts": 1612718280000},
        {"name": "Juan", "product": "Laptop", "ts": 1611618000000},
        {"name": "Rebeca", "product": "Video game", "ts": 1610000000000},
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(topic=options.topic, with_attributes=True)
        )

    print("Pipeline ran successfully.")