הזרמת נתונים באמצעות Storage Write API
במאמר הזה מוסבר איך להשתמש ב-BigQuery Storage Write API כדי להזרים נתונים ל-BigQuery.
בתרחישי סטרימינג, הנתונים מגיעים באופן רציף וצריכים להיות זמינים לקריאה עם השהיה מינימלית. כשמשתמשים ב-BigQuery Storage Write API לעומסי עבודה של סטרימינג, צריך להגדיר אילו ערבויות נדרשות:
- אם האפליקציה שלכם צריכה רק סמנטיקה של 'לפחות פעם אחת', אתם יכולים להשתמש בזרם ברירת המחדל.
- אם אתם צריכים סמנטיקה של 'פעם אחת בדיוק', אתם יכולים ליצור זרם אחד או יותר בסוג committed ולהשתמש בהיסטים של הזרם כדי להבטיח כתיבה של 'פעם אחת בדיוק'.
בסוג committed, הנתונים שנכתבים לזרם זמינים לשאילתה ברגע שהשרת מאשר את בקשת הכתיבה. זרם ברירת המחדל משתמש גם בסוג committed, אבל לא מספק הבטחות לגבי מסירה בדיוק פעם אחת.
שימוש בזרם ברירת המחדל לסמנטיקה של לפחות פעם אחת
אם האפליקציה יכולה לקבל את האפשרות שיופיעו רשומות כפולות בטבלת היעד, מומלץ להשתמש בזרם ברירת המחדל לתרחישי סטרימינג.
הקוד הבא מראה איך לכתוב נתונים לזרם ברירת המחדל:
Java
מידע על התקנת ספריית הלקוח של BigQuery ושימוש בה מופיע במאמר ספריות הלקוח של BigQuery. מידע נוסף מופיע במאמרי העזרה של BigQuery Java API.
כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.
Node.js
מידע על התקנת ספריית הלקוח של BigQuery ושימוש בה מופיע במאמר ספריות הלקוח של BigQuery.
כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.
Python
בדוגמה הזו מוצג אופן ההוספה של רשומה עם שני שדות באמצעות הזרם שמוגדר כברירת מחדל:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
דוגמת הקוד הזו תלויה במודול הפרוטוקול המהודר sample_data_pb2.py. כדי ליצור את המודול המהודר, מריצים את הפקודה protoc --python_out=. sample_data.proto, כאשר protoc הוא מהדר של מאגר אחסון לפרוטוקולים. הקובץ sample_data.proto מגדיר את הפורמט של ההודעות שמשמשות בדוגמה של Python. כדי להתקין את מהדר protoc, פועלים לפי ההוראות במאמר Protocol Buffers – פורמט להחלפת נתונים של Google.
זה התוכן של הקובץ sample_data.proto:
message SampleData {
required string name = 1;
required int64 age = 2;
}
הסקריפט הזה צורך את הקובץ entries.json, שמכיל נתוני שורות לדוגמה שיוכנסו לטבלה ב-BigQuery:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
שימוש ב-multiplexing
אתם מפעילים ריבוב ברמת כותב הסטרימינג רק לסטרימינג שמוגדר כברירת מחדל. כדי להפעיל מולטיפלקסינג ב-Java, צריך להפעיל את השיטה setEnableConnectionPool כשיוצרים אובייקט StreamWriter או JsonStreamWriter.
אחרי שמפעילים את מאגר החיבורים, ספריית הלקוח של Java מנהלת את החיבורים ברקע, ומרחיבה את החיבורים אם החיבורים הקיימים נחשבים עמוסים מדי. כדי שהתאמה אוטומטית לעומס תהיה יעילה יותר, כדאי לשקול להקטין את maxInflightRequestsהמכסה.
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();
כדי להפעיל ריבוב ב-Go, אפשר לעיין במאמר בנושא שיתוף חיבורים (ריבוב).
שימוש בסוג committed כדי להבטיח סמנטיקה של שליחה חד-פעמית
אם אתם צריכים סמנטיקה של כתיבה בדיוק פעם אחת, אתם יכולים ליצור זרם כתיבה מסוג committed. בסוג committed, הרשומות זמינות לשאילתה ברגע שהלקוח מקבל אישור מהקצה העורפי.
סוג ההתחייבות מספק מסירה של כל רשומה פעם אחת בדיוק בזרם באמצעות היסטים של רשומות. באמצעות היסטים של רשומות, האפליקציה מציינת את ההיסט הבא של הוספה בכל קריאה ל-AppendRows. פעולת הכתיבה מתבצעת רק אם ערך ההיסט תואם להיסט הבא של ההוספה. מידע נוסף זמין במאמר בנושא ניהול היסטים של זרמים כדי להשיג סמנטיקה של בדיוק פעם אחת.
אם לא מציינים היסט, הרשומות מצורפות לסוף הנוכחי של הזרם. במקרה כזה, אם בקשת הוספה מחזירה שגיאה, ניסיון חוזר של הבקשה עלול לגרום לכך שהרשומה תופיע יותר מפעם אחת בזרם.
כדי להשתמש בסוג התחייבות, פועלים לפי השלבים הבאים:
Java
- כדי ליצור זרם אחד או יותר מסוג Committed, צריך להתקשר למספר
CreateWriteStream. - עבור כל זרם, קוראים ל-
AppendRowsבלולאה כדי לכתוב קבוצות של רשומות. - מתקשרים אל
FinalizeWriteStreamלכל מקור נתונים כדי לשחרר אותו. אחרי שקוראים לשיטה הזו, אי אפשר לכתוב עוד שורות לזרם. השלב הזה הוא אופציונלי בסוגים של התחייבות, אבל הוא עוזר למנוע חריגה מהמגבלה על מספר הזרמים הפעילים. מידע נוסף זמין במאמר בנושא הגבלת קצב היצירה של מקורות נתונים.
Node.js
- כדי ליצור זרם אחד או יותר מסוג Committed, צריך להתקשר למספר
createWriteStreamFullResponse. - עבור כל זרם, קוראים ל-
appendRowsבלולאה כדי לכתוב קבוצות של רשומות. - מתקשרים אל
finalizeלכל מקור נתונים כדי לשחרר אותו. אחרי שקוראים לשיטה הזו, אי אפשר לכתוב עוד שורות לזרם. השלב הזה הוא אופציונלי בסוגים של התחייבות, אבל הוא עוזר למנוע חריגה מהמגבלה על מספר הזרמים הפעילים. מידע נוסף זמין במאמר בנושא הגבלת קצב היצירה של מקורות נתונים.
אי אפשר למחוק מקור נתונים באופן מפורש. הסטרימינג מתבצע בהתאם לאורך החיים (TTL) שמוגדר במערכת:
- לשידור עם התחייבות יש TTL של שלושה ימים אם אין תנועה בשידור.
- כברירת מחדל, לזרם עם מאגר זמני יש זמן חיים (TTL) של שבעה ימים אם אין תנועה בזרם.
בדוגמת הקוד הבאה אפשר לראות איך משתמשים בסוג committed:
Java
מידע על התקנת ספריית הלקוח של BigQuery ושימוש בה מופיע במאמר ספריות הלקוח של BigQuery. מידע נוסף מופיע במאמרי העזרה של BigQuery Java API.
כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.
Node.js
מידע על התקנת ספריית הלקוח של BigQuery ושימוש בה מופיע במאמר ספריות הלקוח של BigQuery.
כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.
שימוש בפורמט Apache Arrow להטמעת נתונים
בדוגמה הבאה אפשר לראות איך להטמיע נתונים באמצעות הפורמט Apache Arrow.
Python
בדוגמה הזו אפשר לראות איך להטמיע טבלה מסוג PyArrow שעברה סריאליזציה באמצעות הזרם שמוגדר כברירת מחדל. דוגמה מפורטת יותר מקצה לקצה מופיעה בדוגמה של PyArrow ב-GitHub.
from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1
def append_rows_with_pyarrow(
pyarrow_table: pyarrow.Table,
project_id: str,
dataset_id: str,
table_id: str,
):
bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create request_template.
request_template = gapic_types.AppendRowsRequest()
request_template.write_stream = (
f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
)
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
arrow_data.writer_schema.serialized_schema = (
pyarrow_table.schema.serialize().to_pybytes()
)
request_template.arrow_rows = arrow_data
# Create AppendRowsStream.
append_rows_stream = AppendRowsStream(
bqstorage_write_client,
request_template,
)
# Create request with table data.
request = gapic_types.AppendRowsRequest()
request.arrow_rows.rows.serialized_record_batch = (
pyarrow_table.to_batches()[0].serialize().to_pybytes()
)
# Send request.
future = append_rows_stream.send(request)
# Wait for result.
future.result()
Java
מידע על התקנת ספריית הלקוח של BigQuery ושימוש בה מופיע במאמר ספריות הלקוח של BigQuery. מידע נוסף מופיע במאמרי העזרה של BigQuery Java API.
כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.