קריאה מ-BigQuery ל-Dataflow

במאמר הזה מוסבר איך לקרוא נתונים מ-BigQuery ל-Dataflow.

סקירה כללית

ברוב תרחישי השימוש, מומלץ להשתמש בManaged I/O כדי לקרוא מ-BigQuery. התכונות של Managed I/O כוללות שדרוגים אוטומטיים וAPI עקבי להגדרות. כשקוראים מ-BigQuery, הממשק המנוהל לקלט/פלט מבצע קריאות ישירות של טבלאות, שמניבות את ביצועי הקריאה הכי טובים.

אם אתם צריכים כוונון ביצועים מתקדם יותר, כדאי להשתמש בBigQueryIOמחבר. המחבר BigQueryIO תומך בקריאות ישירות של טבלאות ובקריאה ממשימות ייצוא של BigQuery. הוא גם מאפשר שליטה מפורטת יותר בביטול הסריאליזציה של רשומות בטבלה. מידע נוסף מופיע בקטע שימוש במחבר BigQueryIO במאמר הזה.

הקרנה וסינון של עמודות

כדי להקטין את נפח הנתונים שצינור הנתונים קורא מ-BigQuery, אפשר להשתמש בטכניקות הבאות:

  • הקרנת עמודות מציינת קבוצת משנה של עמודות לקריאה מהטבלה. כדאי להשתמש בהקרנת עמודות אם בטבלה יש מספר גדול של עמודות ואתם צריכים לקרוא רק קבוצת משנה שלהן.
  • Row filtering מציין פרדיקט להחלה על הטבלה. פעולת הקריאה של BigQuery מחזירה רק שורות שתואמות למסנן, וכך אפשר לצמצם את כמות הנתונים הכוללת שמוטמעת דרך צינור העיבוד.

בדוגמה הבאה, המערכת קוראת את העמודות "user_name" ו-"age" מטבלה ומסננת שורות שלא תואמות לפרדיקט "age > 18". בדוגמה הזו נעשה שימוש ב-I/O מנוהל.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    String tableSpec = String.format("%s:%s.%s",
        options.getProjectId(),
        options.getDatasetName(),
        options.getTableName());

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", tableSpec)
        .put("row_restriction", "age > 18")
        .put("fields", List.of("user_name", "age"))
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Name: %s, Age: %s%n",
                  row.getString("user_name"),
                  row.getInt64("age"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

הקראה מתוצאת שאילתה

בדוגמה הבאה נעשה שימוש ב-Managed I/O כדי לקרוא את התוצאה של שאילתת SQL. הוא מריץ שאילתה במערך נתונים ציבורי של BigQuery. אפשר גם להשתמש בשאילתות SQL כדי לקרוא מתצוגה או מתצוגה חומרית ב-BigQuery.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("query", queryString)
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Repo: %s, commits: %d%n",
                  row.getString("repo"),
                  row.getInt64("count"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

שימוש במחבר BigQueryIO

מחבר BigQueryIO תומך בשיטות הסריאליזציה הבאות:

המחבר תומך בשתי אפשרויות לקריאת נתונים:

  • משימת ייצוא. כברירת מחדל, מחבר BigQueryIO מריץ משימת ייצוא של BigQuery שכותבת את נתוני הטבלה ל-Cloud Storage. לאחר מכן, המחבר קורא את הנתונים מ-Cloud Storage.
  • קריאות ישירות של טבלאות. האפשרות הזו מהירה יותר מעבודות ייצוא, כי היא משתמשת ב-BigQuery Storage Read API ומדלגת על שלב הייצוא. כדי להשתמש בקריאות ישירות של טבלאות, צריך להפעיל את הפונקציה withMethod(Method.DIRECT_READ) כשיוצרים את צינור הנתונים.

כשבוחרים באיזו אפשרות להשתמש, כדאי להביא בחשבון את הנקודות הבאות:

  • באופן כללי, מומלץ להשתמש בקריאות ישירות של טבלאות. ה-API לקריאת נתונים ב-Storage מתאים יותר לצינורות נתונים מאשר משימות ייצוא, כי הוא לא צריך את שלב הביניים של ייצוא הנתונים.

  • אם אתם משתמשים בקריאות ישירות, תחויבו על השימוש ב-Storage Read API. כאן תוכלו לראות את התמחור של חילוץ נתונים בדף התמחור של BigQuery.

  • לא חל תשלום נוסף על עבודות ייצוא. עם זאת, יש מגבלות על משימות ייצוא. אם מעבירים כמויות גדולות של נתונים, והמהירות היא בראש סדר העדיפויות והעלות ניתנת להתאמה, מומלץ להשתמש בקריאות ישירות.

  • ל-Storage Read API יש מגבלות מכסה. כדי לעקוב אחרי השימוש במכסה, אפשר להשתמש במדדים של Google Cloud Platform.

  • אם אתם משתמשים במשימות ייצוא, צריך להגדיר את --tempLocation אפשרות הצינור כדי לציין קטגוריה של Cloud Storage לקבצים המיוצאים.

  • כשמשתמשים ב-Storage Read API, יכול להיות שיופיעו ביומנים שגיאות של תפוגת חכירה וזמן קצוב לתפוגה של סשן, כמו:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    השגיאות האלה יכולות להתרחש כשפעולה נמשכת יותר זמן מהזמן הקצוב לתפוגה, בדרך כלל בצינורות שפועלים יותר מ-6 שעות. כדי לפתור את הבעיה, צריך לעבור לייצוא קבצים.

  • רמת המקביליות תלויה בשיטת הקריאה:

    • קריאות ישירות: מחבר ה-I/O יוצר מספר דינמי של זרמים, על סמך גודל בקשת הייצוא. היא קוראת את הזרמים האלה ישירות מ-BigQuery במקביל.

    • משימות ייצוא: מערכת BigQuery קובעת כמה קבצים ייכתבו ל-Cloud Storage. מספר הקבצים תלוי בשאילתה ובנפח הנתונים. מחבר ה-I/O קורא את הקבצים המיוצאים במקביל.

בטבלה הבאה מוצגים מדדי ביצועים של אפשרויות קריאה שונות של BigQuery I/O. עומסי העבודה הופעלו על e2-standard2worker אחד, באמצעות Apache Beam SDK 2.49.0 ל-Java. הם לא השתמשו ב-Runner v2.

‫100 מיליון רשומות | 1KB | עמודה אחת תפוקה (בייטים) תפוקה (אלמנטים)
קריאת נתונים מהאחסון ‫120MBps ‫88,000 רכיבים לשנייה
ייצוא Avro ‫105MBps ‫78,000 רכיבים לשנייה
ייצוא JSON ‫110MBps ‫81,000 רכיבים לשנייה

המדדים האלה מבוססים על צינורות פשוטים של עיבוד נתונים באצווה. הם נועדו להשוות בין הביצועים של מחברי קלט/פלט, ולא בהכרח מייצגים צינורות נתונים בעולם האמיתי. הביצועים של צינורות Dataflow הם מורכבים, והם פונקציה של סוג המכונה הווירטואלית, הנתונים שעוברים עיבוד, הביצועים של מקורות ויעדים חיצוניים וקוד המשתמש. המדדים מבוססים על הפעלת Java SDK, והם לא מייצגים את מאפייני הביצועים של ערכות SDK בשפות אחרות. מידע נוסף זמין במאמר בנושא ביצועים של Beam IO.

דוגמאות

בדוגמאות הקוד הבאות נעשה שימוש במחבר BigQueryIO עם קריאות ישירות של טבלאות. כדי להשתמש במקום זאת במשימת ייצוא, משמיטים את הקריאה ל-withMethod.

קריאת רשומות בפורמט Avro

בדוגמה הזו מוסבר איך להשתמש במחבר BigQueryIO כדי לקרוא רשומות בפורמט Avro.

כדי לקרוא נתונים מ-BigQuery לרשומות בפורמט Avro, משתמשים בשיטה read(SerializableFunction). השיטה הזו מקבלת פונקציה שמוגדרת על ידי האפליקציה, שמנתחת אובייקטים של SchemaAndRecord ומחזירה סוג נתונים מותאם אישית. הפלט מהמחבר הוא PCollection של סוג הנתונים המותאם אישית.

הקוד הבא קורא PCollection<MyData> מטבלה ב-BigQuery, כאשר MyData הוא מחלקה שמוגדרת על ידי האפליקציה.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

השיטה read מקבלת ממשק SerializableFunction<SchemaAndRecord, T> שמגדיר פונקציה להמרה מרשומות Avro למחלקת נתונים בהתאמה אישית. בדוגמת הקוד הקודמת, ה-method‏ MyData.apply מטמיע את פונקציית ההמרה הזו. הפונקציה לדוגמה מנתחת את השדות name ו-age מרשומה של Avro ומחזירה מופע של MyData.

כדי לציין איזו טבלה ב-BigQuery לקרוא, קוראים לשיטה from, כמו בדוגמה הקודמת. מידע נוסף זמין במאמר בנושא שמות טבלאות במאמרי העזרה של מחבר BigQuery I/O.

קריאת אובייקטים של TableRow

בדוגמה הזו מוסבר איך להשתמש במחבר BigQueryIO כדי לקרוא אובייקטים של TableRow.

השיטה readTableRows קוראת נתונים מ-BigQuery לתוך PCollection של אובייקטים מסוג TableRow. כל TableRow הוא מיפוי של צמדי מפתח/ערך שמכיל שורה אחת של נתוני טבלה. מציינים את הטבלה ב-BigQuery לקריאה על ידי קריאה לשיטה from.

הקוד הבא קורא PCollection<TableRows> מטבלת BigQuery.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

בדוגמה הזו אפשר לראות גם איך לגשת לערכים מהמילון TableRow. ערכים מסוג Integer מקודדים כמחרוזות כדי להתאים לפורמט JSON המיוצא של BigQuery.

המאמרים הבאים