כתיבה מ-Dataflow ל-Cloud Storage

במאמר הזה מוסבר איך לכתוב נתוני טקסט מ-Dataflow ל-Cloud Storage באמצעות מחבר ה-I/O של Apache Beam TextIO.

הוספת תלות בספרייה של Google Cloud Platform

כדי להשתמש במחבר TextIO עם Cloud Storage, צריך לכלול את התלות הבאה. הספרייה הזו מספקת רכיב handler של סכימה לשמות קבצים מסוג "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

מידע נוסף זמין במאמר בנושא התקנת Apache Beam SDK.

הפעלת gRPC במחבר קלט/פלט של Apache Beam ב-Dataflow

אפשר להתחבר ל-Cloud Storage באמצעות gRPC דרך מחבר הקלט/פלט של Apache Beam ב-Dataflow. ‏gRPC היא מסגרת קוד פתוח של קריאה לפרוצדורה מרוחקת (RPC) עם ביצועים גבוהים שפותחה על ידי Google, ואפשר להשתמש בה כדי ליצור אינטראקציה עם Cloud Storage.

כדי להאיץ את בקשות הכתיבה של משימת Dataflow ל-Cloud Storage, אפשר להפעיל את מחבר ה-I/O של Apache Beam ב-Dataflow כדי להשתמש ב-gRPC.

שורת הפקודה

  1. חשוב לוודא שאתם משתמשים בגרסה 2.55.0 ואילך של Apache Beam SDK.
  2. כדי להריץ משימת Dataflow, משתמשים באפשרות --additional-experiments=use_grpc_for_gcs pipeline. מידע על האפשרויות השונות של צינורות העברת נתונים מופיע במאמר בנושא דגלים אופציונליים.

‫Apache Beam SDK

  1. חשוב לוודא שאתם משתמשים בגרסה 2.55.0 ואילך של Apache Beam SDK.
  2. כדי להריץ משימת Dataflow, משתמשים באפשרות --experiments=use_grpc_for_gcs pipeline. מידע על האפשרויות השונות של צינורות מופיע במאמר בנושא אפשרויות בסיסיות.

אפשר להגדיר מחבר קלט/פלט של Apache Beam ב-Dataflow כדי ליצור מדדים שקשורים ל-gRPC ב-Cloud Monitoring. המדדים שקשורים ל-gRPC יכולים לעזור לכם:

  • מעקב אחרי הביצועים של בקשות gRPC ל-Cloud Storage ואופטימיזציה שלהם.
  • לפתור בעיות ולנפות באגים.
  • קבלת תובנות לגבי השימוש באפליקציה וההתנהגות שלה.

מידע על הגדרת מחבר Apache Beam I/O ב-Dataflow כדי ליצור מדדים שקשורים ל-gRPC זמין במאמר שימוש במדדים בצד הלקוח. אם איסוף מדדים לא נחוץ לתרחיש השימוש שלכם, אתם יכולים לבטל את ההסכמה לאיסוף מדדים. הוראות מפורטות מופיעות במאמר בנושא ביטול ההסכמה לשיתוף מדדים בצד הלקוח.

מקביליות

רמת המקביליות נקבעת בעיקר לפי מספר הרסיסים. כברירת מחדל, הרץ מגדיר את הערך הזה באופן אוטומטי. ברוב צינורות הנתונים, מומלץ להשתמש בהתנהגות ברירת המחדל. שיטות מומלצות

ביצועים

בטבלה הבאה מוצגים מדדי הביצועים של כתיבה ל-Cloud Storage. עומסי העבודה הופעלו על e2-standard2 עובד אחד, באמצעות Apache Beam SDK 2.49.0 ל-Java. הם לא השתמשו ב-Runner v2.

‫100 מיליון רשומות | 1KB | עמודה אחת תפוקה (בייטים) תפוקה (אלמנטים)
כתיבה ‫130 MBps ‫130,000 רכיבים בשנייה

המדדים האלה מבוססים על צינורות פשוטים של עיבוד נתונים באצווה. הם נועדו להשוות בין הביצועים של מחברי קלט/פלט, ולא בהכרח מייצגים צינורות נתונים בעולם האמיתי. הביצועים של צינורות Dataflow הם מורכבים, והם פונקציה של סוג המכונה הווירטואלית, הנתונים שעוברים עיבוד, הביצועים של מקורות ויעדים חיצוניים וקוד המשתמש. המדדים מבוססים על הפעלת Java SDK, והם לא מייצגים את מאפייני הביצועים של ערכות SDK בשפות אחרות. מידע נוסף זמין במאמר בנושא ביצועים של Beam IO.

שיטות מומלצות

  • באופן כללי, מומלץ להימנע מהגדרת מספר ספציפי של רסיסים. כך המפעיל יכול לבחור ערך מתאים לסולם. כדי להפעיל חלוקה אוטומטית, קוראים לפונקציה .withAutoSharding() ולא לפונקציה .withNumShards(0). אם משנים את מספר הרסיסים, מומלץ לכתוב בין 100MB ל-1GB לכל רסיס. עם זאת, יכול להיות שהערך האופטימלי יהיה תלוי בעומס העבודה.

  • ‫Cloud Storage יכול להתרחב למספר גדול מאוד של בקשות לשנייה. עם זאת, אם יש בצינור שלכם עליות חדות בנפח הכתיבה, כדאי לכתוב לכמה קטגוריות כדי להימנע מעומס יתר זמני על קטגוריה אחת של Cloud Storage.

  • באופן כללי, כתיבה ל-Cloud Storage יעילה יותר כשכל פעולת כתיבה גדולה יותר (1kb ומעלה). כתיבת רשומות קטנות למספר גדול של קבצים עלולה להוביל לביצועים גרועים יותר לכל בייט.

  • כשיוצרים שמות של קבצים, מומלץ להשתמש בשמות לא עוקבים כדי לפזר את העומס. מידע נוסף זמין במאמר בנושא שימוש במוסכמה למתן שמות שמפזרת את העומס באופן שווה בין מפתחות.

  • כשנותנים שמות לקבצים, אל תשתמשו בסימן @ ואחריו מספר או כוכבית (*). למידע נוסף, אפשר לעיין במאמר בנושא "@*" ו-"@N" הם מפרטים שמורים של חלוקה לשברי מידע.

דוגמה: כתיבת קובצי טקסט ל-Cloud Storage

בדוגמה הבאה נוצר צינור עיבוד נתונים של אצווה שכותב קובצי טקסט באמצעות דחיסת GZIP:

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

אם הקלט PCollection הוא בלתי מוגבל, צריך להגדיר חלון או טריגר באוסף, ואז לציין כתיבות בחלון על ידי קריאה ל-TextIO.Write.withWindowedWrites.

Python

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

בנתיב הפלט, מציינים נתיב Cloud Storage שכולל את שם הקטגוריה ואת הקידומת של שם הקובץ. לדוגמה, אם מציינים gs://my_bucket/output/file, המחבר TextIO כותב לקטגוריה של Cloud Storage בשם my_bucket, ולקובצי הפלט יש את הקידומת output/file*.

כברירת מחדל, מחבר TextIO מפצל את קובצי הפלט, באמצעות מוסכמת שמות כמו: <file-prefix>-00000-of-00001. אפשר גם לציין סיומת שם קובץ וסכמת דחיסה, כמו בדוגמה.

כדי להבטיח כתיבה אידמפוטנטית, Dataflow כותב לקובץ זמני ואז מעתיק את הקובץ הזמני המלא לקובץ הסופי. כדי לקבוע איפה הקבצים הזמניים האלה מאוחסנים, משתמשים בשיטה withTempDirectory.

המאמרים הבאים