כתיבה מ-Dataflow ל-BigQuery

במאמר הזה מוסבר איך לכתוב נתונים מ-Dataflow ל-BigQuery.

סקירה כללית

ברוב תרחישי השימוש, מומלץ להשתמש ב-Managed I/O כדי לכתוב ל-BigQuery. התכונות של Managed I/O כוללות שדרוגים אוטומטיים וAPI עקבי להגדרות. כשכותבים ל-BigQuery, מערכת Managed I/O בוחרת באופן אוטומטי את שיטת הכתיבה הטובה ביותר למשימות אצווה או למשימות סטרימינג.

אם אתם צריכים כוונון ביצועים מתקדם יותר, כדאי להשתמש במחבר BigQueryIO. מידע נוסף מופיע בקטע שימוש במחבר BigQueryIO במסמך הזה.

ביצועים

בטבלה הבאה מוצגים מדדי ביצועים של עומסי עבודה שונים. עומסי העבודה האלה הופעלו על e2-standard2 עובד אחד, באמצעות Apache Beam SDK 2.49.0 ל-Java. הם לא השתמשו ב-Runner v2.

‫100 מיליון רשומות | 1KB | עמודה אחת תפוקה (בייטים) תפוקה (אלמנטים)
Storage Write ‫55MBps ‫54,000 רכיבים לשנייה
Avro Load ‫78MBps ‫77,000 רכיבים בשנייה
Json Load ‫54 MBps ‫53,000 רכיבים לשנייה

המדדים האלה מבוססים על צינורות פשוטים של עיבוד נתונים באצווה. הם נועדו להשוות בין הביצועים של מחברי קלט/פלט, ולא בהכרח מייצגים צינורות נתונים בעולם האמיתי. הביצועים של צינורות Dataflow הם מורכבים, והם פונקציה של סוג המכונה הווירטואלית, הנתונים שעוברים עיבוד, הביצועים של מקורות ויעדים חיצוניים וקוד המשתמש. המדדים מבוססים על הפעלת Java SDK, והם לא מייצגים את מאפייני הביצועים של ערכות SDK בשפות אחרות. מידע נוסף זמין במאמר בנושא ביצועים של Beam IO.

שימוש במחבר BigQueryIO

מחבר BigQuery I/O תומך בשיטות הבאות לכתיבה ב-BigQuery:

  • STORAGE_WRITE_API. במצב הזה, המחבר מבצע כתיבה ישירה לאחסון ב-BigQuery באמצעות BigQuery Storage Write API. ‫Storage Write API משלב הטמעת עדכונים בזמן אמת וטעינה של נתונים בכמות גדולה בממשק API יחיד בעל ביצועים משופרים. המצב הזה מבטיח סמנטיקה של בדיוק פעם אחת.
  • STORAGE_API_AT_LEAST_ONCE. במצב הזה נעשה שימוש גם ב-Storage Write API, אבל הוא מספק סמנטיקה של 'לפחות פעם אחת'. במצב הזה, זמן האחזור ברוב צינורות העיבוד קצר יותר. עם זאת, יכול להיות שיהיו כפילויות בכתיבה.
  • FILE_LOADS. במצב הזה, המחבר כותב את נתוני הקלט לקובצי ביניים ב-Cloud Storage. לאחר מכן, המערכת מריצה משימת טעינה ב-BigQuery כדי לטעון את הנתונים ל-BigQuery. המודל הזה מוגדר כברירת מחדל עבור צינורות מוגבלים PCollections, שהם הנפוצים ביותר בצינורות לעיבוד נתונים באצווה.
  • STREAMING_INSERTS. במצב הזה, המחבר משתמש ב-Legacy Streaming API. ההגדרה הזו היא ברירת המחדל עבור PCollections ללא הגבלה, אבל לא מומלצת לפרויקטים חדשים.

כשבוחרים שיטת כתיבה, כדאי להתייחס לנקודות הבאות:

  • למשימות של סטרימינג, כדאי להשתמש ב-STORAGE_WRITE_API או ב-STORAGE_API_AT_LEAST_ONCE, כי במצבים האלה המערכת כותבת ישירות לאחסון ב-BigQuery, בלי להשתמש בקבצים זמניים של מחסן ביניים.
  • אם מריצים את צינור הנתונים באמצעות מצב סטרימינג של לפחות פעם אחת, צריך להגדיר את מצב הכתיבה ל-STORAGE_API_AT_LEAST_ONCE. ההגדרה הזו יעילה יותר ומתאימה לסמנטיקה של מצב סטרימינג של לפחות פעם אחת.
  • לפעולות טעינת קבצים ול-Storage Write API יש מכסות ומגבלות שונות.
  • משימות טעינה משתמשות במאגר יחידות הקיבולת המשותף של BigQuery או ביחידות קיבולת שמורות. כדי להשתמש במשבצות שמורות, מריצים את עבודת הטעינה בפרויקט עם הקצאת הזמנה מסוג PIPELINE. משימות טעינה הן בחינם אם משתמשים במאגר המשותף של יחידות הקיבולת של BigQuery. עם זאת, BigQuery לא מתחייב לגבי הקיבולת הזמינה של המאגר המשותף. מידע נוסף זמין במאמר מבוא להזמנות.

מקביליות

  • ב-FILE_LOADS וב-STORAGE_WRITE_API בצינורות להזרמת נתונים, המחבר מפצל את הנתונים למספר קבצים או מקורות נתונים. באופן כללי, מומלץ להפעיל את הפיצול האוטומטי על ידי קריאה ל-withAutoSharding.

  • ב-FILE_LOADS בצינורות להעברת נתונים באצווה, המחבר כותב נתונים לקבצים מחולקים, ואז הם נטענים ל-BigQuery במקביל.

  • בצינורות של STORAGE_WRITE_API, כל עובד יוצר זרם אחד או יותר כדי לכתוב ל-BigQuery, בהתאם למספר הכולל של הרסיסים.

  • ל-STORAGE_API_AT_LEAST_ONCE יש שידור כתיבה אחד שמוגדר כברירת מחדל. כמה תהליכי עבודה מוסיפים נתונים לזרם הזה.

שיטות מומלצות

  • ל-Storage Write API יש מגבלות מכסה. המחבר מטפל במגבלות האלה ברוב צינורות הנתונים. עם זאת, בתרחישים מסוימים יכול להיות שייגמרו הזרמים הזמינים של Storage Write API. לדוגמה, הבעיה הזו יכולה לקרות בצינור (pipeline) שמשתמש בחלוקה אוטומטית (auto-sharding) ובהתאמה אוטומטית לעומס (autoscaling) עם מספר גדול של יעדים, במיוחד במשימות ארוכות טווח עם עומסי עבודה משתנים מאוד. אם הבעיה הזו מתרחשת, כדאי להשתמש ב-STORAGE_WRITE_API_AT_LEAST_ONCE כדי להימנע ממנה.

  • כדי לעקוב אחרי השימוש במכסת Storage Write API, אפשר להשתמש במדדים של Google Cloud Platform.

  • כשמשתמשים בהעלאות קבצים, בדרך כלל Avro עדיף על JSON. כדי להשתמש ב-Avro, קוראים ל-withAvroFormatFunction.

  • כברירת מחדל, עבודות טעינה מופעלות באותו פרויקט כמו עבודת Dataflow. כדי לציין פרויקט אחר, צריך לבצע קריאה ל-method‏ withLoadJobProjectId.

  • כשמשתמשים ב-Java SDK, מומלץ ליצור מחלקה שמייצגת את הסכימה של טבלה ב-BigQuery. אחר כך קוראים לפונקציה useBeamSchema בצינור כדי להמיר אוטומטית בין סוגי Row Apache Beam וסוגי TableRow BigQuery. דוגמה למחלקת סכימה זמינה בקישור ExampleModel.java.

  • אם אתם טוענים טבלאות עם סכימות מורכבות שמכילות אלפי שדות, כדאי להשתמש בפונקציה withMaxBytesPerPartition כדי להגדיר גודל מקסימלי קטן יותר לכל עבודת טעינה.

  • כברירת מחדל, BigQueryIO משתמש בהגדרות של Storage Write API שמתאימות לרוב צינורות הנתונים. עם זאת, אם נתקלים בבעיות בביצועים, אפשר להגדיר אפשרויות של צינורות כדי לשנות את ההגדרות האלה. מידע נוסף מופיע במאמר Tune the Storage Write API (התאמה של Storage Write API) במסמכי התיעוד של Apache Beam.

צינורות עיבוד נתונים בסטרימינג

ההמלצות הבאות רלוונטיות לצינורות להעברת נתונים בסטרימינג.

  • לצינורות להזרמת נתונים מומלץ להשתמש ב-Storage Write API‏ (STORAGE_WRITE_API או STORAGE_API_AT_LEAST_ONCE).

  • בצינורות להזרמת נתונים אפשר להשתמש בטעינות של קבצים, אבל לגישה הזו יש חסרונות:

    • כדי לכתוב את הקבצים, צריך עיבוד החלק הנצפה בלבד. אי אפשר להשתמש בחלון הגלובלי.
    • כשמשתמשים במאגר המשבצות המשותף, BigQuery טוען קבצים על בסיס המאמץ המרבי. יכול להיות עיכוב משמעותי בין מועד הכתיבה של רשומה לבין המועד שבו היא זמינה ב-BigQuery.
    • אם עבודת טעינה נכשלת – לדוגמה, בגלל נתונים פגומים או אי התאמה לסכימה – כל צינור הנתונים נכשל.
  • כדאי להשתמש ב-STORAGE_WRITE_API_AT_LEAST_ONCE כשזה אפשרי. השיטה הזו עלולה לגרום לכתיבה של רשומות כפולות ב-BigQuery, אבל היא זולה יותר וניתנת להרחבה יותר מאשר שיטה STORAGE_WRITE_API.

  • באופן כללי, מומלץ להימנע משימוש ב-STREAMING_INSERTS. הוספות בסטרימינג יקרות יותר מ-Storage Write API, והביצועים שלהן פחות טובים.

  • חלוקת נתונים (Data sharding) יכולה לשפר את הביצועים בצינורות סטרימינג. ברוב צינורות הנתונים, פיצול אוטומטי הוא נקודת התחלה טובה. עם זאת, אפשר לכוונן את ה-sharding באופן הבא:

    • במקרה של STORAGE_WRITE_API, מתקשרים אל withNumStorageWriteApiStreams כדי להגדיר את מספר זרמי הכתיבה.
    • במקרה של FILE_LOADS, מתקשרים אל withNumFileShards כדי להגדיר את מספר רסיסי הקבצים.
  • אם אתם משתמשים בהוספות לסטרימינג, מומלץ להגדיר את retryTransientErrors כמדיניות הניסיון החוזר.

צינורות עיבוד נתונים של אצווה

ההמלצות הבאות רלוונטיות לצינורות להעברת נתונים באצווה.

  • ברוב צינורות העיבוד של אצווה גדולה, מומלץ לנסות קודם את FILE_LOADS. צינור עיבוד נתונים של אצווה יכול להשתמש ב-STORAGE_WRITE_API, אבל סביר להניח שהוא יחרוג ממגבלות המכסה בקנה מידה גדול (1,000+ vCPU) או אם מופעלים צינורות עיבוד נתונים בו-זמנית. ‫Apache Beam לא מגביל את המספר המקסימלי של זרמי כתיבה למשימות באצווה STORAGE_WRITE_API, ולכן המשימה מגיעה בסופו של דבר למגבלות של BigQuery Storage API.

  • כשמשתמשים ב-FILE_LOADS, יכול להיות שתנצלו את כל יחידות הקיבולת המשותפות של BigQuery או את כל יחידות הקיבולת השמורות. אם נתקלתם בכשל כזה, נסו את הפתרונות הבאים:

    • צריך להקטין את המספר המקסימלי של העובדים או את גודל העובד של המשימה.
    • קונים עוד משבצות שמורות.
    • כדאי להשתמש ב-STORAGE_WRITE_API.
  • יכול להיות שצינורות קטנים עד בינוניים (פחות מ-1,000 vCPU) ירוויחו משימוש ב-STORAGE_WRITE_API. למשימות קטנות יותר, מומלץ להשתמש ב-STORAGE_WRITE_API אם רוצים תור של הודעות שלא ניתן להעביר או אם מאגר המשבצות המשותף של FILE_LOADS לא מספיק.

  • אם אתם יכולים להסתדר עם נתונים כפולים, כדאי להשתמש בשיטה STORAGE_WRITE_API_AT_LEAST_ONCE. במצב הזה יכול להיות שרשומות כפולות ייכתבו ל-BigQuery, אבל יכול להיות שהעלות תהיה נמוכה יותר מאשר באפשרות STORAGE_WRITE_API.

  • אופן הפעולה של מצבי כתיבה שונים עשוי להיות שונה בהתאם למאפיינים של צינור הנתונים. מומלץ לערוך ניסויים כדי למצוא את מצב הכתיבה הכי טוב לעומס העבודה שלכם.

טיפול בשגיאות ברמת השורה

בקטע הזה מתואר איך לטפל בשגיאות שעלולות להתרחש ברמת השורה, למשל בגלל נתוני קלט לא תקינים או אי התאמות בסכימה.

ב-Storage Write API, כל השורות שלא ניתן לכתוב ממוקמות ב-PCollection נפרד. כדי לקבל את האוסף הזה, צריך להתקשר אל getFailedStorageApiInserts באובייקט WriteResult. דוגמה לגישה הזו מופיעה במאמר הזרמת נתונים ל-BigQuery.

מומלץ לשלוח את השגיאות לתור או לטבלה של הודעות שלא ניתן למסור, כדי לעבד אותן בהמשך. מידע נוסף על התבנית הזו זמין במאמר בנושא BigQueryIO תבנית של הודעות שלא ניתן להעביר.

ב-FILE_LOADS, אם מתרחשת שגיאה בזמן טעינת הנתונים, משימת הטעינה נכשלת והצינור יוצר חריגה בזמן הריצה. אפשר לראות את השגיאה ביומנים של Dataflow או בהיסטוריית המשימות של BigQuery. מחבר ה-I/O לא מחזיר מידע על שורות נפרדות שנכשלו.

מידע נוסף על פתרון בעיות שגיאות זמין במאמר שגיאות במחבר BigQuery.

דוגמאות

בדוגמאות הבאות מוסבר איך להשתמש ב-Dataflow כדי לכתוב ל-BigQuery. בדוגמאות האלה אנחנו משתמשים במחבר BigQueryIO.

כתיבה לטבלה קיימת

בדוגמה הבאה נוצר צינור באצווה שכותב PCollection<MyData> ל-BigQuery, כאשר MyData הוא סוג נתונים מותאם אישית.

השיטה BigQueryIO.write() מחזירה סוג BigQueryIO.Write<T>, שמשמש להגדרת פעולת הכתיבה. מידע נוסף זמין במאמר בנושא כתיבה לטבלה במסמכי התיעוד של Apache Beam. בדוגמת הקוד הזו מתבצעת כתיבה לטבלה קיימת (CREATE_NEVER) והוספה של השורות החדשות לטבלה (WRITE_APPEND).

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

כתיבה לטבלה חדשה או קיימת

בדוגמה הבאה נוצרת טבלה חדשה אם טבלת היעד לא קיימת, על ידי הגדרת הפעולה ליצירה לערך CREATE_IF_NEEDED. אם בוחרים באפשרות הזו, צריך לספק סכימת טבלה. המחבר משתמש בסכימה הזו אם הוא יוצר טבלה חדשה.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

הזרמת נתונים ל-BigQuery

בדוגמה הבאה מוצג איך להזרים נתונים באמצעות סמנטיקה של בדיוק פעם אחת, על ידי הגדרת מצב הכתיבה ל-STORAGE_WRITE_API

לא כל צינורות הנתונים של סטרימינג דורשים סמנטיקה של 'פעם אחת בדיוק'. לדוגמה, יכול להיות שתוכלו להסיר כפילויות באופן ידני מטבלת היעד. אם האפשרות של רשומות כפולות מקובלת בתרחיש שלכם, כדאי להשתמש בסמנטיקה של 'לפחות פעם אחת' על ידי הגדרת שיטת הכתיבה לערך STORAGE_API_AT_LEAST_ONCE. השיטה הזו בדרך כלל יעילה יותר ומובילה לזמן אחזור נמוך יותר ברוב צינורות הנתונים.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

המאמרים הבאים