שימוש ב-Legacy streaming API

במאמר הזה מוסבר איך להזרים נתונים ל-BigQuery באמצעות השיטה הקודמת של tabledata.insertAll.

בפרויקטים חדשים מומלץ להשתמש ב-BigQuery Storage Write API במקום בשיטה tabledata.insertAll. המחיר של Storage Write API נמוך יותר והוא כולל תכונות חזקות יותר, כולל סמנטיקה של מסירה חד-פעמית. אם אתם מעבירים פרויקט קיים מהשיטה tabledata.insertAll אל Storage Write API, מומלץ לבחור בזרם ברירת המחדל. השיטה tabledata.insertAll עדיין נתמכת באופן מלא.

לפני שמתחילים

  1. מוודאים שיש לכם הרשאת כתיבה למערך הנתונים שמכיל את טבלת היעד. הטבלה צריכה להתקיים לפני שמתחילים לכתוב בה נתונים, אלא אם משתמשים בטבלאות תבנית. מידע נוסף על טבלאות תבנית זמין במאמר יצירת טבלאות באופן אוטומטי באמצעות טבלאות תבנית.

  2. כדאי לעיין במדיניות בנושא מכסות לנתונים שמוזרמים.

  3. Verify that billing is enabled for your Google Cloud project.

  4. הסטרימינג לא זמין דרך התוכנית ללא תשלום. אם תנסו להשתמש בסטרימינג בלי להפעיל את החיוב, תקבלו את השגיאה הבאה: BigQuery: Streaming insert is not allowed in the free tier.

  5. מקצים תפקידים של ניהול זהויות והרשאות גישה (IAM) שנותנים למשתמשים את ההרשאות הנדרשות לביצוע כל משימה במסמך הזה.

ההרשאות הנדרשות

כדי להזרים נתונים ל-BigQuery, אתם צריכים את הרשאות ה-IAM הבאות:

  • bigquery.tables.updateData (מאפשר להוסיף נתונים לטבלה)
  • bigquery.tables.get (מאפשר לקבל מטא-נתונים של טבלה)
  • bigquery.datasets.get (מאפשר לקבל מטא-נתונים של מערך נתונים)
  • bigquery.tables.create (חובה אם משתמשים בטבלת תבנית כדי ליצור את הטבלה באופן אוטומטי)

כל אחד מהתפקידים הבאים שמוגדרים מראש ב-IAM כולל את ההרשאות שצריך כדי להזרים נתונים ל-BigQuery:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

במאמר תפקידים והרשאות מוגדרים מראש יש מידע נוסף על תפקידים והרשאות ב-IAM ב-BigQuery.

הזרמת נתונים ל-BigQuery

C#

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי C#הוראות ההגדרה שבמדריך למתחילים של BigQuery באמצעות ספריות לקוח. מידע נוסף מופיע במאמרי העזרה של BigQuery C# API.

כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי Goהוראות ההגדרה שבמדריך למתחילים של BigQuery באמצעות ספריות לקוח. מידע נוסף מופיע במאמרי העזרה של BigQuery Go API.

כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי Javaהוראות ההגדרה שבמדריך למתחילים של BigQuery באמצעות ספריות לקוח. מידע נוסף מופיע במאמרי העזרה של BigQuery Java API.

כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי Node.jsהוראות ההגדרה שבמדריך למתחילים של BigQuery באמצעות ספריות לקוח. מידע נוסף מופיע במאמרי העזרה של BigQuery Node.js API.

כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי PHPהוראות ההגדרה שבמדריך למתחילים של BigQuery באמצעות ספריות לקוח. מידע נוסף מופיע במאמרי העזרה של BigQuery PHP API.

כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי Pythonהוראות ההגדרה שבמדריך למתחילים של BigQuery באמצעות ספריות לקוח. מידע נוסף מופיע במאמרי העזרה של BigQuery Python API.

כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי Rubyהוראות ההגדרה שבמדריך למתחילים של BigQuery באמצעות ספריות לקוח. מידע נוסף מופיע במאמרי העזרה של BigQuery Ruby API.

כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

כשמוסיפים שורות, לא צריך למלא את השדה insertID. בדוגמה הבאה אפשר לראות איך להימנע משליחה של insertID לכל שורה כשמבצעים סטרימינג.

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי Javaהוראות ההגדרה שבמדריך למתחילים של BigQuery באמצעות ספריות לקוח. מידע נוסף מופיע במאמרי העזרה של BigQuery Java API.

כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי Pythonהוראות ההגדרה שבמדריך למתחילים של BigQuery באמצעות ספריות לקוח. מידע נוסף מופיע במאמרי העזרה של BigQuery Python API.

כדי לבצע אימות ב-BigQuery, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לספריות לקוח.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

שליחת נתוני תאריך ושעה

בשדות של תאריך ושעה, הפורמט של הנתונים בשיטה tabledata.insertAll הוא כדלקמן:

סוג פורמט
DATE מחרוזת בפורמט "YYYY-MM-DD"
DATETIME מחרוזת בפורמט "YYYY-MM-DD [HH:MM:SS]"
TIME מחרוזת בפורמט "HH:MM:SS"
TIMESTAMP מספר השניות מאז 1970-01-01 (ראשית זמן יוניקס), או מחרוזת בפורמט "YYYY-MM-DD HH:MM[:SS]"

שליחת נתוני טווח

בשדות עם סוג RANGE<T>, צריך לעצב את הנתונים בשיטה tabledata.insertAll כאובייקט JSON עם שני שדות, start ו-end. ערכים חסרים או NULL בשדות start ו-end מייצגים גבולות לא מוגבלים. השדות האלה צריכים להיות בפורמט JSON הנתמך מסוג T, כאשר T יכול להיות אחד מהערכים DATE, DATETIME ו-TIMESTAMP.

בדוגמה הבאה, השדה f_range_date מייצג עמודה RANGE<DATE> בטבלה. שורות מוכנסות לעמודה הזו באמצעות ה-API‏ tabledata.insertAll.

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

זמינות של נתונים בזמן אמת

הנתונים זמינים לניתוח בזמן אמת באמצעות שאילתות GoogleSQL מיד אחרי ש-BigQuery מאשר בהצלחה בקשת tabledata.insertAll. כשמריצים שאילתה על נתונים במאגר הזמני של נתונים זורמים, לא מחויבים על בייטים שעובדו ממאגר זמני של נתונים זורמים אם משתמשים בתמחור על פי דרישה של משאבי מחשוב. אם אתם משתמשים בתמחור לפי קיבולת, ההזמנות שלכם צורכות משבצות לעיבוד נתונים במאגר זמני של נתונים שמוזרמים.

לשורות שנוספו לאחרונה לטבלה מחולקת למחיצות (Partitions) לפי זמני כתיבת הנתונים יש באופן זמני ערך NULL בעמודה הווירטואלית _PARTITIONTIME. בשביל שורות כאלה, BigQuery מקצה את הערך הסופי שאינו NULL של העמודה PARTITIONTIME ברקע, בדרך כלל תוך כמה דקות. במקרים נדירים, התהליך הזה יכול להימשך עד 90 דקות.

יכול להיות שחלק מהשורות שנוספו לאחרונה לסטרימינג לא יהיו זמינות להעתקת הטבלה בדרך כלל למשך כמה דקות. במקרים נדירים, התהליך הזה יכול להימשך עד 90 דקות. כדי לראות אם יש נתונים להעתקת הטבלה, בודקים את התשובה tables.get בקטע שנקרא streamingBuffer. אם הקטע streamingBuffer לא מופיע, הנתונים שלכם זמינים להעתקה. אפשר גם להשתמש בשדה streamingBuffer.oldestEntryTime כדי לזהות את גיל הרשומות במאגר הזמני של הסטרימינג.

הסרת כפילויות בהקדם האפשרי

כשמספקים את הערך insertId לשורה שמוסיפים, מערכת BigQuery משתמשת במזהה הזה כדי לתמוך בהסרת כפילויות במידת האפשר למשך דקה אחת. כלומר, אם אתם מעבירים את אותה שורה עם אותו insertId יותר מפעם אחת בתוך פרק הזמן הזה לאותה טבלה, יכול להיות ש-BigQuery יסיר את הכפילויות של השורה הזו וישאיר רק אחת מהן.

המערכת מצפה שהשורות שמופיעות עם ערכי insertId זהים יהיו גם זהות. אם לשתי שורות יש ערכים זהים ב-insertId, לא ניתן לקבוע איזו שורה BigQuery ישמור.

הסרת כפילויות מיועדת בדרך כלל לתרחישי ניסיון חוזר במערכת מבוזרת, שבה אין אפשרות לקבוע את מצב הזנת הזרם בתנאי שגיאה מסוימים, כמו שגיאות ברשת בין המערכת שלכם לבין BigQuery או שגיאות פנימיות ב-BigQuery. אם מנסים שוב להוסיף נתונים, צריך להשתמש באותו insertId לאותה קבוצת שורות כדי ש-BigQuery יוכל לנסות להסיר כפילויות מהנתונים. מידע נוסף זמין במאמר בנושא פתרון בעיות בהוספת נתונים בסטרימינג.

הסרת הכפילויות שמציעה BigQuery היא כמיטב היכולת, ולא מומלץ להסתמך עליה כמנגנון שמבטיח שלא יהיו כפילויות בנתונים. בנוסף, יכול להיות שב-BigQuery תהיה ירידה באיכות של ביטול הכפילויות בשיטת 'המאמץ הכי טוב' בכל שלב, כדי להבטיח מהימנות וזמינות גבוהות יותר של הנתונים.

אם יש לכם דרישות מחמירות לביטול כפילויות בנתונים, Google Cloud Datastore הוא שירות חלופי שתומך בטרנזקציות.

השבתה של ביטול כפילויות בהקדם האפשרי

כדי להשבית את הסרת הכפילויות בשיטת הכי טוב שאפשר, לא צריך למלא את השדה insertId בכל שורה שמוסיפים. זו הדרך המומלצת להוספת נתונים.

‫Apache Beam ו-Dataflow

כדי להשבית את ההסרה של רשומות כפולות בשיטת הכי טוב שאפשר כשמשתמשים במחבר BigQuery I/O של Apache Beam ל-Java, צריך להשתמש ב-method‏ ignoreInsertIds().

הסרה ידנית של כפילויות

כדי לוודא שלא יהיו שורות כפולות אחרי שתסיימו את הסטרימינג, אפשר להשתמש בתהליך הידני הבא:

  1. מוסיפים את המאפיין insertId כעמודה בסכימת הטבלה וכוללים את הערך insertId בנתונים של כל שורה.
  2. אחרי שהסטרימינג מפסיק, מריצים את השאילתה הבאה כדי לבדוק אם יש כפילויות:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    אם התוצאה גדולה מ-1, יש כפילויות.
  3. כדי להסיר כפילויות, מריצים את השאילתה הבאה. מציינים טבלת יעד, מאפשרים תוצאות גדולות ומשביתים את השטחת התוצאות.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

הערות לגבי שאילתת ההסרה של תוכן כפול:

  • השיטה הבטוחה יותר להסרת כפילויות היא להגדיר יעד לטבלה חדשה. לחלופין, אפשר לטרגט את טבלת המקור באמצעות מאפיין של פעולת כתיבה WRITE_TRUNCATE.
  • שאילתת הסרת הכפילויות מוסיפה עמודה row_number עם הערך 1 לסוף סכימת הטבלה. השאילתה משתמשת בהצהרה SELECT * EXCEPT מ-GoogleSQL כדי להחריג את העמודה row_number מטבלת היעד. הקידומת #standardSQL מפעילה את GoogleSQL עבור השאילתה הזו. לחלופין, אפשר לבחור עמודות ספציפיות כדי להשמיט את העמודה הזו.
  • כדי לשלוח שאילתות לנתונים פעילים בלי כפילויות, אפשר גם ליצור תצוגה מעל הטבלה באמצעות שאילתת הסרת הכפילויות. חשוב לדעת שעלויות השאילתות שמופעלות על התצוגה מחושבות על סמך העמודות שנבחרו בתצוגה, ולכן יכולות להיות גדולות.

הזרמה לטבלאות מחולקות למחיצות (Partitions) לפי זמן

כשמבצעים סטרימינג של נתונים לטבלה עם חלוקה למחיצות לפי זמן, לכל מחיצה יש מאגר זמני לסטרימינג. מאגר הנתונים הזמני של הסטרימינג נשמר כשמבצעים טעינה, שאילתה או העתקה של משימה שדורסת מחיצה על ידי הגדרת המאפיין writeDisposition לערך WRITE_TRUNCATE. כדי להסיר את מאגר הנתונים הזמני של הסטרימינג, צריך לוודא שהוא ריק על ידי קריאה ל-tables.get במחיצה.

חלוקה למחיצות (partitioning) לפי זמני כתיבת הנתונים

כשמבצעים סטרימינג לטבלה מחולקת למחיצות (Partitions) לפי זמני כתיבת הנתונים, מערכת BigQuery מסיקה את מחיצת היעד מהשעה הנוכחית ב-UTC.

נתונים חדשים שמגיעים ממוקמים באופן זמני במחיצה __UNPARTITIONED__ בזמן שהם במאגר הזמני של הנתונים. אם יש מספיק נתונים לא מחולקים, BigQuery מחלק את הנתונים למחיצה הנכונה. עם זאת, אין הסכם רמת שירות (SLA) לגבי משך הזמן שנדרש להעברת נתונים ממחיצת __UNPARTITIONED__. אפשר להחריג נתונים ממאגר הנתונים הזמני של הסטרימינג משאילתה על ידי סינון הערכים של NULL מהמחיצה __UNPARTITIONED__ באמצעות אחד מהעמודות הווירטואליות (_PARTITIONTIME או _PARTITIONDATE, בהתאם לסוג הנתונים המועדף).

אם אתם מזרימים נתונים לטבלה מחולקת למחיצות לפי יום, אתם יכולים לבטל את ההיסק של התאריך על ידי ציון קישוט של מחיצה כחלק מהבקשה.insertAll כוללים את ה-decorator בפרמטר tableId. לדוגמה, אפשר לשדר למחיצה שמתאימה לתאריך 2021-03-01 בטבלה table1 באמצעות קישוט המחיצה:

table1$20210301

כשמבצעים סטרימינג באמצעות תוסף חלוקה, אפשר לבצע סטרימינג לחלוקות בטווח של 31 ימים אחורה ו-16 ימים קדימה ביחס לתאריך הנוכחי, על סמך השעה הנוכחית ב-UTC. כדי לכתוב למחיצות של תאריכים שחורגים מהגבולות המותרים האלה, צריך להשתמש במקום זאת בעבודת טעינה או שאילתה, כמו שמתואר במאמר הוספה לנתונים של טבלה עם מחיצות והחלפתם.

הזרמת נתונים באמצעות כלי לקישוט מחיצות נתמכת רק בטבלאות עם מחיצות יומיות. התכונה לא נתמכת בטבלאות מחולקות לפי שעה, חודש או שנה.

לצורך בדיקה, אפשר להשתמש בכלי שורת הפקודה של BigQuery, בפקודת ה-CLI‏ bq insert. לדוגמה, הפקודה הבאה מעבירה שורה אחת בסטרימינג למחיצה של התאריך 1 בינואר 2017 ($20170101) לטבלה מחולקת למחיצות בשם mydataset.mytable:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

חלוקה למחיצות לפי עמודה של יחידת זמן

אפשר להזרים נתונים לטבלה עם חלוקה למחיצות לפי עמודה של DATE, DATETIME או TIMESTAMP, שבה התאריך הוא בין 10 שנים בעבר לשנה אחת בעתיד. נתונים מחוץ לטווח הזה נדחים.

כשנתונים מועברים בסטרימינג, הם מוצבים בהתחלה במחיצה __UNPARTITIONED__. כשיש מספיק נתונים לא מחולקים, BigQuery מחלק מחדש את הנתונים באופן אוטומטי וממקם אותם במחיצה המתאימה. עם זאת, אין הסכם רמת שירות (SLA) לגבי משך הזמן שנדרש להעברת נתונים ממחיצת __UNPARTITIONED__.

  • הערה: מחיצות יומיות עוברות עיבוד שונה ממחיצות שעתיות, חודשיות ושנתיות. רק נתונים מחוץ לטווח התאריכים (מ-7 הימים האחרונים עד 3 הימים הבאים) מחולצים למחיצה UNPARTITIONED, וממתינים לחלוקה מחדש למחיצות. לעומת זאת, בטבלה שמחולקת למחיצות לפי שעה, הנתונים תמיד מחולצים למחיצה UNPARTITIONED, ואחר כך מחולקים מחדש למחיצות.

יצירת טבלאות באופן אוטומטי באמצעות טבלאות תבנית

טבלאות תבנית מספקות מנגנון לפיצול טבלה לוגית למספר טבלאות קטנות יותר כדי ליצור קבוצות קטנות יותר של נתונים (לדוגמה, לפי מזהה משתמש). יש כמה מגבלות על טבלאות של תבניות, והן מפורטות בהמשך. במקום זאת, מומלץ להשתמש בטבלאות מחולקות למחיצות ובטבלאות מקובצות לאשכולות כדי להשיג את ההתנהגות הזו.

כדי להשתמש בטבלת תבנית דרך BigQuery API, מוסיפים פרמטר templateSuffix לבקשת insertAll. בכלי שורת הפקודה של BigQuery, מוסיפים את הדגל template_suffix לפקודה insert. אם BigQuery מזהה פרמטר templateSuffix או את הדגל template_suffix, הוא מתייחס לטבלת היעד כתבנית בסיס. הפקודה יוצרת טבלה חדשה עם אותה סכימה כמו של טבלת היעד, ושם שכולל את הסיומת שצוינה:

<targeted_table_name> + <templateSuffix>

שימוש בטבלת תבנית מאפשר לכם להימנע מתקורת יצירת כל טבלה בנפרד וציון הסכימה לכל טבלה. צריך ליצור רק תבנית אחת ולספק סיומות שונות כדי ש-BigQuery יוכל ליצור את הטבלאות החדשות בשבילכם. מערכת BigQuery ממקמת את הטבלאות באותו פרויקט ובאותו מערך נתונים.

טבלאות שנוצרות באמצעות טבלאות תבנית זמינות בדרך כלל תוך כמה שניות. במקרים נדירים, יכול להיות שיעבור יותר זמן עד שהם יהיו זמינים.

שינוי סכימת הטבלה של התבנית

אם משנים את סכימת הטבלה של תבנית, כל הטבלאות שייווצרו בהמשך ישתמשו בסכימה המעודכנת. טבלאות שנוצרו בעבר לא מושפעות, אלא אם עדיין יש לטבלה הקיימת מאגר זמני של נתונים להזרמה.

אם יש טבלאות קיימות שעדיין יש להן מאגר זמני של נתונים בהזרמה, ואתם משנים את סכימת הטבלה של התבנית באופן שתואם לאחור, הסכימה של הטבלאות שנוצרות בהזרמה פעילה מתעדכנת גם כן. עם זאת, אם משנים את סכימת טבלת התבנית באופן שלא תואם לאחור, כל הנתונים שנשמרו במאגר הזמני שמשתמשים בסכימה הישנה יאבדו. בנוסף, אי אפשר להזרים נתונים חדשים לטבלאות קיימות שנוצרו באמצעות הסכימה הישנה, שכבר לא תואמת.

אחרי שמשנים את סכימת הטבלה של תבנית, צריך לחכות עד שהשינויים יתעדכנו לפני שמנסים להוסיף נתונים חדשים או לבצע שאילתה בטבלאות שנוצרו. בקשות להוספת שדות חדשים אמורות להצליח תוך כמה דקות. ניסיונות לשאילתות של השדות החדשים עשויים לדרוש המתנה ארוכה יותר של עד 90 דקות.

אם רוצים לשנות את הסכימה של טבלה שנוצרה, לא משנים את הסכימה עד שהסטרימינג דרך טבלת התבנית נפסק וקטע סטטיסטיקות הסטרימינג של הטבלה שנוצרה לא מופיע בתגובה tables.get(), מה שמצביע על כך שלא מתבצעת שמירת נתונים בטבלה.

טבלאות מחולקות למחיצות וטבלאות מקובצות לאשכולות לא סובלות מהמגבלות שצוינו למעלה, והן המנגנון המומלץ.

פרטי טבלת התבנית

ערך הסיומת של התבנית
הערך של templateSuffix (או --template_suffix) חייב להכיל רק אותיות (a-z, A-Z), ספרות (0-9) או קווים תחתונים (_). האורך המקסימלי המשולב של שם הטבלה והסיומת של הטבלה הוא 1,024 תווים.
מכסה

הטבלאות בתבניות כפופות למגבלות של מכסת הסטרימינג. בכל פרויקט אפשר ליצור עד 10 טבלאות בשנייה באמצעות טבלאות תבנית, בדומה ל-API‏ tables.insert. המכסה הזו חלה רק על טבלאות שנוצרות, ולא על טבלאות שמשתנות.

אם האפליקציה שלכם צריכה ליצור יותר מ-10 טבלאות בשנייה, מומלץ להשתמש בטבלאות מקובצות. לדוגמה, אפשר להזין את מזהה הטבלה עם הקרדינליות הגבוהה בעמודת המפתח של טבלת אשכול אחת.

משך החיים (TTL)

הטבלה שנוצרת מקבלת בירושה את זמן התפוגה ממערך הנתונים. בדומה לנתוני סטרימינג רגילים, אי אפשר להעתיק את הטבלאות שנוצרות באופן מיידי.

ביטול כפילויות

ביטול כפילויות מתבצע רק בין הפניות אחידות לטבלת יעד. לדוגמה, אם אתם מבצעים סטרימינג בו-זמנית לטבלה שנוצרה באמצעות טבלאות תבנית ופקודת insertAll רגילה, לא מתבצע ביטול כפילויות בין השורות שנוספו באמצעות טבלאות התבנית לבין השורות שנוספו באמצעות פקודת insertAll רגילה.

תצוגות

טבלת התבנית והטבלאות שנוצרו לא יכולות להיות תצוגות.

פתרון בעיות שקשורות להוספת מודעות לסטרימינג

בקטעים הבאים מוסבר איך לפתור בעיות שמתרחשות כשמעבירים נתונים בסטרימינג ל-BigQuery באמצעות Legacy Streaming API. מידע נוסף על פתרון שגיאות שקשורות למכסות של הזנת זרם נתונים זמין במאמר בנושא שגיאות שקשורות למכסות של הזנת זרם נתונים.

קודי תגובת HTTP של כשל

אם מקבלים קוד תגובת HTTP שמעיד על כשל, כמו שגיאה בחיבור לרשת, אין דרך לדעת אם הזנת זרם הנתונים הצליחה. אם תנסו לשלוח מחדש את הבקשה, יכול להיות שיהיו שורות כפולות בטבלה. כדי להגן על הטבלה מפני כפילויות, צריך להגדיר את המאפיין insertId כששולחים את הבקשה. ‫BigQuery משתמש במאפיין insertId כדי לבטל כפילויות.

אם מתקבלת שגיאת הרשאה, שגיאה של שם טבלה לא תקין או שגיאה של חריגה מהמכסה, לא מוכנסות שורות והבקשה כולה נכשלת.

קודי תגובת HTTP של הצלחה

גם אם מקבלים קוד תגובת HTTP של הצלחה, צריך לבדוק את המאפיין insertErrors של התגובה כדי לדעת אם הוספת השורות הצליחה, כי יכול להיות ש-BigQuery הצליח להוסיף רק חלק מהשורות. יכול להיות שתיתקלו באחד מהתרחישים הבאים:

  • כל השורות נוספו בהצלחה. אם הנכס insertErrors הוא רשימה ריקה, כל השורות הוכנסו בהצלחה.
  • חלק מהשורות נוספו בהצלחה. חוץ ממקרים שבהם יש אי התאמה בסכימה באחת מהשורות, השורות שמצוינות במאפיין insertErrors לא מוכנסות, וכל שאר השורות מוכנסות בהצלחה. המאפיין errors מכיל מידע מפורט על הסיבה לכשל בכל שורה לא מוצלחת. המאפיין index מציין את אינדקס השורה של הבקשה (החל מ-0) שהשגיאה רלוונטית לגביה.
  • אף אחת מהשורות לא נוספה בהצלחה. אם ב-BigQuery מזוהה אי התאמה בסכימה בשורות נפרדות בבקשה, אף אחת מהשורות לא מוכנסת ומוחזרת רשומה insertErrors לכל שורה, גם לשורות שלא הייתה בהן אי התאמה בסכימה. בשורה שלא הייתה בה אי התאמה לסכימה, השגיאה כוללת את המאפיין reason שמוגדר לערך stopped, ואפשר לשלוח אותה מחדש כמו שהיא. בשורות שנכשלו מופיע מידע מפורט על אי ההתאמה לסכימה. מידע על סוגי מאגרי אחסון לפרוטוקולים נתמכים לכל סוג נתונים ב-BigQuery זמין במאמר סוגי נתונים נתמכים של מאגרי אחסון לפרוטוקולים ו-Arrow.

שגיאות במטא-נתונים של שידורים חיים עם הפסקות פרסום

ממשק ה-API של BigQuery להזנת זרם נתונים מיועד לשיעורי הוספה גבוהים, ולכן שינויים במטא-נתונים של הטבלה הבסיסית הם עקביים בסופו של דבר כשמבצעים אינטראקציה עם מערכת הזנת הזרם. ברוב המקרים, שינויים במטא-נתונים מועברים תוך דקות, אבל במהלך התקופה הזו יכול להיות שבתשובות של ה-API יופיע מצב לא עקבי של הטבלה.

דוגמאות לתרחישים:

  • שינויים בסכימה. שינוי הסכימה של טבלה שקיבלה לאחרונה הוספות של נתונים בזמן אמת עלול לגרום לתגובות עם שגיאות של חוסר התאמה בסכימה, כי יכול להיות שמערכת הסטרימינג לא תזהה את השינוי בסכימה באופן מיידי.
  • יצירה או מחיקה של טבלה. הזרמת נתונים לטבלה שלא קיימת מחזירה וריאציה של תגובת notFound. יכול להיות שטבלה שנוצרה בתגובה לא תזוהה מיד על ידי הוספות של נתונים בהמשך. באופן דומה, מחיקה או יצירה מחדש של טבלה יכולות ליצור תקופה שבה הוספות בסטרימינג מועברות בפועל לטבלה הישנה. יכול להיות שהוספות הנתונים בסטרימינג לא יופיעו בטבלה החדשה.
  • חיתוך טבלה. באופן דומה, חיתוך של נתוני טבלה (באמצעות עבודת שאילתה שמשתמשת ב-writeDisposition של WRITE_TRUNCATE) עלול לגרום להשמטה של הוספות עוקבות במהלך תקופת העקביות.

נתונים חסרים או לא זמינים

הוספות של נתונים לסטרימינג נמצאות באופן זמני באחסון שעבר אופטימיזציה לכתיבה, שיש לו מאפייני זמינות שונים מאלה של אחסון מנוהל. פעולות מסוימות ב-BigQuery לא מתבצעות עם האחסון שעבר אופטימיזציה לכתיבה, כמו משימות של העתקת טבלאות ושיטות API כמו tabledata.list. נתונים עדכניים של סטרימינג לא יופיעו בטבלת היעד או בפלט.

שגיאות שקשורות למכסת הזנת זרם הנתונים

בקטע הזה מפורטים טיפים לפתרון בעיות שקשורות לשגיאות במכסות של נתונים שמוזרמים ל-BigQuery.

באזורים מסוימים, המכסה של הוספת נתונים לסטרימינג גבוהה יותר אם לא מאכלסים את השדה insertId בכל שורה. מידע נוסף על מכסות להוספות בסטרימינג זמין במאמר הוספות בסטרימינג. השגיאות שקשורות למכסת השימוש ב-BigQuery Streaming תלויות בנוכחות או בהיעדר של insertId.

הודעת השגיאה

אם השדה insertId ריק, יכול להיות שתופיע שגיאת המכסה הבאה:

מגבלת מכסה הודעת השגיאה
בייטים לשנייה לכל פרויקט הישות שלך עם gaia_id: GAIA_ID, פרויקט: PROJECT_ID באזור: REGION חרגה מהמכסה של בייטים להוספה לשנייה.

אם השדה insertId מאוכלס, יכולות להופיע שגיאות לגבי מכסת השימוש הבאות:

מגבלת מכסה הודעת השגיאה
שורות לשנייה לכל פרויקט הפרויקט שלך: PROJECT_ID ב-REGION חרג מהמכסה של הוספת שורות לשנייה בסטרימינג.
שורות לשנייה לכל טבלה הטבלה: TABLE_ID חרגה מהמכסה של הזנת זרם נתונים לשנייה.
בייטים לשנייה לכל טבלה הטבלה: TABLE_ID חרגה מהמכסה של הזנת זרם נתונים בבייטים לשנייה.

המטרה של השדה insertId היא לבטל כפילויות בשורות שנוספו. אם מגיעים כמה ערכי insert עם אותו insertId תוך כמה דקות, BigQuery כותב גרסה אחת של הרשומה. עם זאת, לא מובטח שהכפילויות יוסרו אוטומטית. כדי להשיג את קצב העברת הנתונים המקסימלי בסטרימינג, מומלץ לא לכלול insertId ולהשתמש במקום זאת בביטול כפילויות ידני. מידע נוסף זמין במאמר איך מוודאים את עקביות הנתונים.

אם נתקלתם בשגיאה הזו, אבחון הבעיה ואז ביצוע השלבים המומלצים יעזרו לכם לפתור אותה.

אבחון

אפשר להשתמש בתצוגות STREAMING_TIMELINE_BY_* כדי לנתח את תנועת הגולשים בסטרימינג. התצוגות האלה מציגות נתונים סטטיסטיים מצטברים של סטרימינג במרווחי זמן של דקה אחת, בקיבוץ לפי error_code. שגיאות שקשורות למכסות מופיעות בתוצאות עם ערך של error_code ששווה ל-RATE_LIMIT_EXCEEDED או ל-QUOTA_EXCEEDED.

בהתאם למכסה הספציפית שהגעתם אליה, כדאי לעיין במאמר total_rows או במאמר total_input_bytes. אם השגיאה היא מכסת שימוש ברמת הטבלה, מסננים לפי table_id.

לדוגמה, השאילתה הבאה מציגה את מספר הבייטים הכולל שהועבר בכל דקה ואת המספר הכולל של שגיאות שקשורות למכסת השימוש:

SELECT
 start_timestamp,
 error_code,
 SUM(total_input_bytes) as sum_input_bytes,
 SUM(IF(error_code IN ('QUOTA_EXCEEDED', 'RATE_LIMIT_EXCEEDED'),
     total_requests, 0)) AS quota_error
FROM
 `region-REGION_NAME`.INFORMATION_SCHEMA.STREAMING_TIMELINE_BY_PROJECT
WHERE
  start_timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 1 DAY)
GROUP BY
 start_timestamp,
 error_code
ORDER BY 1 DESC

רזולוציה

כדי לפתור את שגיאת המכסה הזו:

  • אם אתם משתמשים בשדה insertId לביטול כפילויות, והפרויקט שלכם נמצא באזור שתומך במכסת סטרימינג גבוהה יותר, מומלץ להסיר את השדה insertId. יכול להיות שיהיה צורך לבצע כמה שלבים נוספים כדי לבטל כפילויות מהנתונים באופן ידני. מידע נוסף מופיע במאמר הסרה ידנית של כפילויות.

  • אם אתם לא משתמשים ב-insertId, או אם אי אפשר להסיר אותו, כדאי לעקוב אחרי התנועה של הסטרימינג במשך 24 שעות ולנתח את שגיאות המכסה:

    • אם אתם רואים בעיקר שגיאות RATE_LIMIT_EXCEEDED ולא שגיאות QUOTA_EXCEEDED, ותנועת הגולשים הכוללת שלכם נמוכה מ-80% מהמכסה, סביר להניח שהשגיאות מצביעות על עליות זמניות. כדי לטפל בשגיאות האלה, צריך לנסות שוב את הפעולה באמצעות השהיה מעריכית לפני ניסיון חוזר בין הניסיונות החוזרים.

    • אם אתם משתמשים במשימת Dataflow כדי להוסיף נתונים, כדאי להשתמש במשימות טעינה במקום בהוספות של נתונים בזמן אמת. מידע נוסף זמין במאמר בנושא הגדרת שיטת ההוספה. אם אתם משתמשים ב-Dataflow עם מחבר קלט/פלט בהתאמה אישית, כדאי להשתמש במקום זאת במחבר קלט/פלט מובנה. מידע נוסף זמין במאמר בנושא דפוסי קלט/פלט מותאמים אישית.

    • אם אתם רואים QUOTA_EXCEEDED שגיאות או שתנועת הגולשים הכוללת חורגת באופן עקבי מ-80% מהמכסה, שלחו בקשה להגדלת המכסה. מידע נוסף זמין במאמר בנושא שליחת בקשה לשינוי המכסות.

    • אפשר גם להחליף את ההוספות של נתוני סטרימינג ב-Storage Write API החדש יותר, שכולל תפוקה גבוהה יותר, מחיר נמוך יותר ותכונות שימושיות רבות.