ייצוא נתונים כעמודות Protobuf

במאמר הזה מוסבר איך אפשר לייצא נתונים מ-BigQuery כעמודות של Protocol Buffers ‏ (Protobuf) באמצעות פונקציות מוגדרות על ידי המשתמש (UDF) ב-BigQuery.

מתי כדאי להשתמש בעמודות Protobuf

‫BigQuery מציע מספר פונקציות מובנות לעיצוב נתונים נבחרים. אפשרות אחת היא למזג כמה ערכים של עמודות לערך Protobuf אחד, ויש לכך יתרונות:

  • מניעת שגיאות הקלדה באובייקט.
  • דחיסה משופרת, זמן העברת נתונים נמוך יותר ועלות נמוכה יותר בהשוואה ל-JSON.
  • גמישות, כי לרוב שפות התכנות יש ספריות לטיפול ב-Protobuf.
  • פחות תקורה כשקוראים מכמה עמודות ויוצרים אובייקט יחיד.

סוגי עמודות אחרים יכולים גם לספק מניעת שגיאות הקלדה, אבל שימוש בעמודות Protobuf מספק אובייקט עם הקלדה מלאה, שיכול לצמצם את כמות העבודה שצריך לבצע בשכבת האפליקציות או בחלק אחר של צינור הנתונים.

עם זאת, יש מגבלות על ייצוא נתונים מ-BigQuery כעמודות Protobuf:

  • הוספה לאינדקס או סינון של עמודות Protobuf לא מתבצעים בצורה טובה. חיפוש לפי התוכן של עמודות ה-Protobuf עשוי להיות פחות יעיל.
  • יכול להיות שיהיה קשה למיין נתונים בפורמט Protobuf.

אם המגבלות האלה חלות על תהליך העבודה של הייצוא, כדאי לשקול שיטות אחרות לייצוא נתונים מ-BigQuery:

  • אפשר להשתמש בשאילתות מתוזמנות עם הצהרות EXPORT DATA כדי למיין את הנתונים המיוצאים ב-BigQuery לפי תאריך או שעה, ולתזמן ייצוא על בסיס חוזר. ‫BigQuery תומך בייצוא נתונים לפורמטים Avro,‏ CSV,‏ JSON ו-Parquet.
  • אפשר להשתמש ב-Dataflow כדי לייצא נתונים מ-BigQuery בפורמט קובץ Avro או CSV.

התפקידים הנדרשים

כדי לקבל את ההרשאות שדרושות לייצוא נתוני BigQuery כעמודות Protobuf, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בפרויקט:

להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.

יצירת פונקציית UDF

יצירת פונקציה מוגדרת על ידי המשתמש (UDF) שממירה סוג נתונים של BigQuery‏ STRUCTלעמודת Protobuf:

  1. בשורת פקודה, משכפלים את מאגר bigquery-utils.git:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. עוברים לתיקיית הייצוא של Protobuf:

    cd bigquery-utils/tools/protobuf_export
    
  3. משתמשים בפקודה cp או במנהל הקבצים והתיקיות של מערכת ההפעלה כדי להעתיק את קובץ ה-proto לתיקיית הצאצא ./protos.

    כבר קיים קובץ proto לדוגמה בשם dummy.proto בתיקייה ./protos.

  4. מתקינים את החבילות הנדרשות ממאגר GitHub:

    npm install
    
  5. יוצרים חבילה של הקבצים באמצעות webpack:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. מאתרים את הקובץ pbwrapper.js בתיקיית הצאצא ./dist ואז מעלים את הקובץ לקטגוריה של Cloud Storage.

  7. עוברים לדף BigQuery.

    כניסה ל-BigQuery

  8. בעזרת עורך השאילתות, יוצרים UDF בשם toMyProtoMessage שבונה עמודת Protobuf מעמודות קיימות בטבלה ב-BigQuery:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<INPUT_FIELDS>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    מחליפים את מה שכתוב בשדות הבאים:

    • DATASET_ID: המזהה של מערך הנתונים שאמור להכיל את הפונקציה המוגדרת על ידי המשתמש.
    • INPUT_FIELDS: השדות שמשמשים בסוג הודעת הפרוטו של קובץ הפרוטו, בפורמט field_name_1 field_type_1 [, field_name_2 field_type_2, ...].

      צריך לתרגם את כל השדות של סוג ההודעה שמשתמשים בקו תחתון, כך שבמקום זאת ישתמשו באותיות רישיות לסירוגין. לדוגמה, אם סוג ההודעה נראה כך, ערך שדות הקלט צריך להיות itemId int64, itemDescription string:

      message ThisMessage {
        int64 item_id = 1;
        string item_description = 2;
      }
      
    • BUCKET_NAME: שם הקטגוריה ב-Cloud Storage שמכילה את הקובץ pbwrapper.js.

    • PROTO_PACKAGE: החבילה של קובץ ה-proto.

    • PROTO_MESSAGE: סוג ההודעה בקובץ הפרוטו.

    לדוגמה, אם משתמשים בקובץ dummy.proto שסופק, ההצהרה CREATE FUNCTION נראית כך:

    CREATE OR REPLACE FUNCTION
      mydataset.toMyProtoMessage(input STRUCT<dummyField STRING>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://mybucket/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("dummypackage.DummyMessage")
    return pbwrapper.parse(message, input)
      """;
    

עיצוב עמודות כערכי Protobuf

מריצים את הפונקציה toMyProtoMessage UDF כדי לעצב את העמודות בטבלת BigQuery כערכי Protobuf:

  SELECT
    UDF_DATASET_ID.toMyProtoMessage(STRUCT(INPUT_COLUMNS)) AS protoResult
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_NAME`
  LIMIT
    100;

מחליפים את מה שכתוב בשדות הבאים:

  • UDF_DATASET_ID: המזהה של מערך הנתונים שמכיל את פונקציית ה-UDF.
  • INPUT_COLUMNS: שמות העמודות שרוצים לעצב כערך Protobuf, בפורמט column_name_1 [, column_name_2, ...]. העמודות יכולות להיות מכל סוג ערך סקלרי נתמך או מסוג לא סקלרי, כולל ARRAY ו-STRUCT. עמודות הקלט צריכות להתאים לסוג ולמספר של השדות בסוג הודעת הפרוטו.
  • PROJECT_ID: מזהה הפרויקט שמכיל את הטבלה. אפשר לדלג על זיהוי הפרויקט אם מערך הנתונים נמצא בפרויקט הנוכחי.
  • DATASET_ID: המזהה של מערך הנתונים שמכיל את הטבלה.
  • TABLE_NAME: שם הטבלה שמכילה את העמודות שרוצים לעצב.

לדוגמה, אם משתמשים ב-toMyProtoMessage UDF שמבוסס על dummy.proto, אפשר להשתמש בהצהרת SELECT הבאה:

SELECT
  mydataset.toMyProtoMessage(STRUCT(word)) AS protoResult
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT 100;

עבודה עם ערכי Protobuf

אחרי ייצוא הנתונים בפורמט Protobuf ל-BigQuery, אפשר לעבוד עם הנתונים כאובייקט או כמבנה עם הקלדה מלאה.

בדוגמאות הקוד הבאות אפשר לראות כמה דרכים לעיבוד הנתונים המיוצאים או לעבודה איתם:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )