כתיבה מ-Dataflow ל-BigQuery

במאמר הזה מוסבר איך לכתוב נתונים מ-Dataflow ל-BigQuery.

סקירה כללית

ברוב תרחישי השימוש, מומלץ להשתמש בקלט/פלט מנוהל כדי לכתוב ל-BigQuery. ‫Managed I/O מספק תכונות כמו שדרוגים אוטומטיים וAPI עקבי להגדרות. כשכותבים ל-BigQuery, כלי ה-I/O המנוהל בוחר באופן אוטומטי את שיטת הכתיבה הטובה ביותר למשימות אצווה או סטרימינג.

אם אתם צריכים כוונון ביצועים מתקדם יותר, כדאי להשתמש במחבר BigQueryIO. מידע נוסף מופיע בקטע שימוש במחבר BigQueryIO במסמך הזה.

ביצועים

בטבלה הבאה מוצגים מדדי ביצועים של עומסי עבודה שונים. עומסי העבודה האלה הופעלו על e2-standard2 עובד אחד, באמצעות Apache Beam SDK 2.49.0 ל-Java. הם לא השתמשו ב-Runner v2.

‫100 מיליון רשומות | 1KB | עמודה אחת תפוקה (בייטים) תפוקה (אלמנטים)
Storage Write ‫55MBps ‫54,000 רכיבים לשנייה
Avro Load ‫78MBps ‫77,000 רכיבים בשנייה
Json Load ‫54 MBps ‫53,000 רכיבים לשנייה

המדדים האלה מבוססים על צינורות פשוטים של עיבוד נתונים באצווה. הם נועדו להשוות בין הביצועים של מחברי קלט/פלט, ולא בהכרח מייצגים צינורות נתונים בעולם האמיתי. הביצועים של צינורות Dataflow הם מורכבים, והם פונקציה של סוג המכונה הווירטואלית, הנתונים שעוברים עיבוד, הביצועים של מקורות ויעדים חיצוניים וקוד המשתמש. המדדים מבוססים על הפעלת Java SDK, והם לא מייצגים את מאפייני הביצועים של ערכות SDK בשפות אחרות. מידע נוסף זמין במאמר בנושא ביצועים של Beam IO.

שימוש במחבר BigQueryIO

מחבר BigQuery I/O תומך בשיטות הבאות לכתיבה ב-BigQuery:

  • STORAGE_WRITE_API. במצב הזה, המחבר מבצע כתיבה ישירה לאחסון ב-BigQuery באמצעות BigQuery Storage Write API. ‫Storage Write API משלב הטמעת עדכונים בזמן אמת וטעינת נתונים בכמות גדולה בממשק API יחיד בעל ביצועים משופרים. במצב הזה מובטחת סמנטיקה של בדיוק פעם אחת.
  • STORAGE_API_AT_LEAST_ONCE. במצב הזה נעשה שימוש גם ב-Storage Write API, אבל הוא מספק סמנטיקה של 'לפחות פעם אחת'. במצב הזה, זמן האחזור נמוך יותר ברוב צינורות הנתונים. עם זאת, יכול להיות שיהיו כתיבות כפולות.
  • FILE_LOADS. במצב הזה, המחבר כותב את נתוני הקלט לקובצי ביניים ב-Cloud Storage. לאחר מכן, המערכת מריצה משימת טעינה ב-BigQuery כדי לטעון את הנתונים ל-BigQuery. המודל הזה מוגדר כברירת מחדל עבור PCollections,0x0A>שנפוצים בעיקר בצינורות להעברת נתונים באצווה.
  • STREAMING_INSERTS. במצב הזה, המחבר משתמש ב-Legacy Streaming API. ההגדרה הזו היא ברירת המחדל עבור PCollections ללא הגבלה, אבל לא מומלצת לפרויקטים חדשים.

כשבוחרים שיטת כתיבה, כדאי להתייחס לנקודות הבאות:

  • לגבי משימות של סטרימינג, כדאי להשתמש ב-STORAGE_WRITE_API או ב-STORAGE_API_AT_LEAST_ONCE, כי במצבים האלה הנתונים נכתבים ישירות לאחסון ב-BigQuery, בלי להשתמש בקבצים זמניים של מחסן ביניים (Stage).
  • אם מריצים את צינור הנתונים באמצעות מצב סטרימינג של לפחות פעם אחת, צריך להגדיר את מצב הכתיבה ל-STORAGE_API_AT_LEAST_ONCE. ההגדרה הזו יעילה יותר ומתאימה לסמנטיקה של מצב הסטרימינג 'לפחות פעם אחת'.
  • ל-File loads ול-Storage Write API יש מכסות ומגבלות שונות.
  • משימות טעינה משתמשות במאגר המשבצות המשותף של BigQuery או במשבצות שמורות. כדי להשתמש במשבצות שמורות, מריצים את עבודת הטעינה בפרויקט עם הקצאת הזמנה מסוג PIPELINE. משימות טעינה לא כרוכות בתשלום אם משתמשים במאגר המשותף של יחידות הקיבולת של BigQuery. עם זאת, BigQuery לא מתחייב לגבי הקיבולת הזמינה של המאגר המשותף. מידע נוסף מופיע במאמר מבוא להזמנות.

מקביליות

  • במחברים של FILE_LOADS ו-STORAGE_WRITE_API בצינורות עיבוד נתונים בסטרימינג, הנתונים מחולקים למספר קבצים או מקורות נתונים. באופן כללי, מומלץ להפעיל את הפיצול האוטומטי על ידי קריאה ל-withAutoSharding.

  • ב-FILE_LOADS בצינורות להעברת נתונים באצווה, המחבר כותב נתונים לקבצים מחולקים, ואז הם נטענים ל-BigQuery במקביל.

  • בצינורות של משימות אצווה, כל עובד יוצר זרם אחד או יותר לכתיבה ב-BigQuery, בהתאם למספר הכולל של הרסיסים.STORAGE_WRITE_API

  • ב-STORAGE_API_AT_LEAST_ONCE יש שידור כתיבה אחד כברירת מחדל. כמה עובדים מוסיפים נתונים לזרם הזה.

שיטות מומלצות

  • ל-Storage Write API יש מגבלות מכסה. המחבר מטפל במגבלות האלה ברוב צינורות הנתונים. עם זאת, בתרחישים מסוימים יכול להיות שלא יהיו יותר זרמי Storage Write API זמינים. לדוגמה, הבעיה הזו יכולה לקרות בצינור (pipeline) שמשתמש בחלוקה אוטומטית (auto-sharding) ובהתאמה אוטומטית לעומס (autoscaling) עם מספר גדול של יעדים, במיוחד במשימות ארוכות טווח עם עומסי עבודה משתנים מאוד. אם הבעיה הזו מתרחשת, כדאי להשתמש ב-STORAGE_WRITE_API_AT_LEAST_ONCE כדי להימנע ממנה.

  • אפשר להשתמש בGoogle Cloud מדדים כדי לעקוב אחרי השימוש במכסה של Storage Write API.

  • כשמשתמשים בהעלאות קבצים, בדרך כלל Avro עדיף על JSON. כדי להשתמש ב-Avro, מתקשרים אל withAvroFormatFunction.

  • כברירת מחדל, עבודות טעינה מופעלות באותו פרויקט כמו עבודת Dataflow. כדי לציין פרויקט אחר, צריך לבצע קריאה ל-withLoadJobProjectId.

  • כשמשתמשים ב-Java SDK, כדאי ליצור מחלקה שמייצגת את הסכימה של טבלה ב-BigQuery. אחר כך קוראים ל-useBeamSchema בצינור כדי להמיר אוטומטית בין סוגי Row Apache Beam וסוגי TableRow BigQuery. דוגמה למחלקת סכימה זמינה בקישור ExampleModel.java.

  • אם אתם טוענים טבלאות עם סכימות מורכבות שמכילות אלפי שדות, כדאי להשתמש בפונקציה withMaxBytesPerPartition כדי להגדיר גודל מקסימלי קטן יותר לכל עבודת טעינה.

  • כברירת מחדל, BigQueryIO משתמש בהגדרות של Storage Write API שמתאימות לרוב צינורות הנתונים. עם זאת, אם נתקלים בבעיות בביצועים, אפשר להגדיר אפשרויות של צינורות כדי לשנות את ההגדרות האלה. מידע נוסף מופיע במאמר Tune the Storage Write API (התאמה של Storage Write API) במסמכי התיעוד של Apache Beam.

צינורות עיבוד נתונים בסטרימינג

ההמלצות הבאות רלוונטיות לצינורות סטרימינג.

  • לצינורות להזרמת נתונים, מומלץ להשתמש ב-Storage Write API‏ (STORAGE_WRITE_API או STORAGE_API_AT_LEAST_ONCE).

  • בצינורות להעברת נתונים בסטרימינג אפשר להשתמש בהעלאות של קבצים, אבל לגישה הזו יש חסרונות:

    • כדי לכתוב את הקבצים, צריך חלונות. אי אפשר להשתמש בחלון הגלובלי.
    • כשמשתמשים במאגר המשבצות המשותף, BigQuery טוען קבצים על בסיס המאמץ המרבי. יכול להיות עיכוב משמעותי בין מועד הכתיבה של רשומה לבין המועד שבו היא זמינה ב-BigQuery.
    • אם משימת טעינה נכשלת – לדוגמה, בגלל נתונים פגומים או חוסר התאמה לסכימה – כל הצינור נכשל.
  • כדאי להשתמש ב-STORAGE_WRITE_API_AT_LEAST_ONCE כשזה אפשרי. השיטה הזו עלולה לגרום לכתיבה של רשומות כפולות ב-BigQuery, אבל היא זולה יותר וניתנת להרחבה יותר מאשר STORAGE_WRITE_API.

  • באופן כללי, מומלץ להימנע משימוש ב-STREAMING_INSERTS. הוספות בסטרימינג יקרות יותר מ-Storage Write API, והביצועים שלהן פחות טובים.

  • חלוקת נתונים (Data sharding) יכולה לשפר את הביצועים בצינורות להזרמת נתונים. ברוב צינורות הנתונים, פיצול אוטומטי הוא נקודת התחלה טובה. עם זאת, אפשר לשנות את הגדרות השארדינג באופן הבא:

    • במקרה של STORAGE_WRITE_API, צריך להתקשר אל withNumStorageWriteApiStreams כדי להגדיר את מספר זרמי הכתיבה.
    • במקרה של FILE_LOADS, מתקשרים אל withNumFileShards כדי להגדיר את מספר רסיסי הקבצים.
  • אם אתם משתמשים בהוספות לסטרימינג, מומלץ להגדיר את retryTransientErrors כמדיניות הניסיון החוזר.

צינורות עיבוד נתונים של אצווה

ההמלצות הבאות רלוונטיות לצינורות להעברת נתונים באצווה.

  • ברוב צינורות העיבוד של קבוצות גדולות של נתונים, מומלץ לנסות קודם את FILE_LOADS. בצינורות (pipeline) של עיבוד באצווה אפשר להשתמש ב-STORAGE_WRITE_API, אבל סביר להניח שתחרגו ממגבלות המכסה בקנה מידה גדול (1,000+ vCPU) או אם צינורות מקבילים פועלים. ‫Apache Beam לא מגביל את המספר המקסימלי של זרמי כתיבה למשימות באצווה STORAGE_WRITE_API, ולכן המשימה מגיעה בסופו של דבר למגבלות של BigQuery Storage API.

  • כשמשתמשים ב-FILE_LOADS, יכול להיות שתנצלו את כל יחידות הקיבולת המשותפות של BigQuery או את כל יחידות הקיבולת השמורות. אם נתקלתם בכשל כזה, נסו את הגישות הבאות:

    • מצמצמים את המספר המקסימלי של העובדים או את גודל העובד במשימה.
    • קונים עוד משבצות שמורות.
    • כדאי להשתמש ב-STORAGE_WRITE_API.
  • בצינורות קטנים עד בינוניים (פחות מ-1,000 vCPU) כדאי להשתמש ב-STORAGE_WRITE_API. למשימות קטנות יותר, מומלץ להשתמש ב-STORAGE_WRITE_API אם רוצים תור של הודעות שלא ניתן להעביר או אם מאגר המשבצות המשותף של FILE_LOADS לא מספיק.

  • אם אתם יכולים להסתדר עם נתונים כפולים, כדאי להשתמש בשיטה STORAGE_WRITE_API_AT_LEAST_ONCE. במצב הזה יכול להיות שרשומות כפולות ייכתבו ל-BigQuery, אבל יכול להיות שהעלות תהיה נמוכה יותר מאשר באפשרות STORAGE_WRITE_API.

  • אופן הפעולה של מצבי כתיבה שונים עשוי להיות שונה בהתאם למאפיינים של צינור הנתונים. מומלץ לערוך ניסויים כדי למצוא את מצב הכתיבה הכי טוב לעומס העבודה שלכם.

טיפול בשגיאות ברמת השורה

בקטע הזה מתואר איך לטפל בשגיאות שעלולות להתרחש ברמת השורה, למשל בגלל נתוני קלט לא תקינים או אי התאמות בסכימה.

ב-Storage Write API, כל השורות שלא ניתן לכתוב ממוקמות ב-PCollection נפרד. כדי לקבל את האוסף הזה, צריך להתקשר אל getFailedStorageApiInserts באובייקט WriteResult. דוגמה לגישה הזו מופיעה במאמר הזרמת נתונים ל-BigQuery.

מומלץ לשלוח את השגיאות לתור או לטבלה של הודעות שלא ניתן למסור, כדי לעבד אותן בהמשך. מידע נוסף על התבנית הזו זמין במאמר בנושא BigQueryIO תבנית של הודעות שלא נמסרו.

ב-FILE_LOADS, אם מתרחשת שגיאה בזמן טעינת הנתונים, משימת הטעינה נכשלת וצינור הנתונים יוצר חריגה בזמן ריצה. אפשר לראות את השגיאה ביומנים של Dataflow או בהיסטוריית המשימות של BigQuery. מחבר ה-I/O לא מחזיר מידע על שורות נפרדות שנכשלו.

מידע נוסף על פתרון בעיות שקשורות לשגיאות זמין במאמר שגיאות במחבר BigQuery.

דוגמאות

בדוגמאות הבאות מוסבר איך להשתמש ב-Dataflow כדי לכתוב ל-BigQuery. בדוגמאות האלה נעשה שימוש במחבר BigQueryIO.

כתיבה לטבלה קיימת

בדוגמה הבאה נוצר צינור באצווה שכותב PCollection<MyData> ל-BigQuery, כאשר MyData הוא סוג נתונים מותאם אישית.

השיטה BigQueryIO.write() מחזירה סוג BigQueryIO.Write<T>, שמשמש להגדרת פעולת הכתיבה. מידע נוסף זמין במאמר בנושא כתיבה לטבלה במסמכי התיעוד של Apache Beam. בדוגמת הקוד הזו מתבצעת כתיבה לטבלה קיימת (CREATE_NEVER) והוספה של השורות החדשות לטבלה (WRITE_APPEND).

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

כתיבה לטבלה חדשה או קיימת

בדוגמה הבאה נוצרת טבלה חדשה אם טבלת היעד לא קיימת, על ידי הגדרת הסדר הפעולות ליצירה לערך CREATE_IF_NEEDED. אם בוחרים באפשרות הזו, צריך לספק סכימת טבלה. המחבר משתמש בסכימה הזו אם הוא יוצר טבלה חדשה.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

הזרמת נתונים ל-BigQuery

בדוגמה הבאה מוצגות שתי דרכים להזרמת נתונים באמצעות סמנטיקה של בדיוק פעם אחת. הדרך הראשונה היא להגדיר את מצב הכתיבה ל-STORAGE_WRITE_API

לא כל צינורות הנתונים של סטרימינג מחייבים סמנטיקה של 'פעם אחת בדיוק'. לדוגמה, יכול להיות שתוכלו להסיר כפילויות באופן ידני מטבלת היעד. אם האפשרות של רשומות כפולות מקובלת בתרחיש שלכם, כדאי להשתמש בסמנטיקה של 'לפחות פעם אחת' על ידי הגדרת שיטת הכתיבה לערך STORAGE_API_AT_LEAST_ONCE. השיטה הזו בדרך כלל יעילה יותר ומובילה לזמן אחזור נמוך יותר ברוב צינורות הנתונים.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

המאמרים הבאים