Trasferire flussi di dati utilizzando l'API Storage Write
Questo documento descrive come utilizzare l' API BigQuery Storage Write per inserire flussi di dati in BigQuery.
Negli scenari di streaming, i dati arrivano continuamente e devono essere disponibili per le letture con una latenza minima. Quando utilizzi l'API BigQuery Storage Write per i carichi di lavoro di streaming, considera le garanzie di cui hai bisogno:
- Se la tua applicazione ha bisogno solo della semantica "almeno una volta", utilizza il flusso predefinito.
- Se hai bisogno della semantica "esattamente una volta", crea uno o più flussi di tipo committed e utilizza gli offset del flusso per garantire le scritture "esattamente una volta".
Nel tipo committed, i dati scritti nel flusso sono disponibili per le query non appena il server riconosce la richiesta di scrittura. Il flusso predefinito utilizza anche il tipo committed, ma non fornisce garanzie "esattamente una volta".
Utilizzare il flusso predefinito per la semantica "almeno una volta"
Se la tua applicazione può accettare la possibilità che nella tabella di destinazione vengano visualizzati record duplicati, ti consigliamo di utilizzare il flusso predefinito per gli scenari di streaming.
Il seguente codice mostra come scrivere i dati nel flusso predefinito:
Java
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery. Per saperne di più, consulta la documentazione di riferimento dell'API Java BigQuery.
Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura l'autenticazione per le librerie client.
Node.js
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery.
Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura l'autenticazione per le librerie client.
Python
Questo esempio mostra come inserire un record con due campi utilizzando il flusso predefinito:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
Questo esempio di codice dipende dal modulo del protocollo compilato sample_data_pb2.py. Per creare il modulo compilato, esegui il
protoc --python_out=. sample_data.proto comando, dove protoc è il
compilatore del buffer di protocollo. Il file sample_data.proto definisce il formato dei messaggi utilizzati nell'esempio Python. Per installare il compilatore protoc, segui le istruzioni riportate in Protocol Buffers - Google's data interchange format.
Ecco i contenuti del file sample_data.proto:
message SampleData {
required string name = 1;
required int64 age = 2;
}
Questo script utilizza il file entries.json, che contiene dati di riga di esempio da inserire nella tabella BigQuery:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Utilizzare il multiplexing
Puoi attivare
il multiplexing
solo a livello di writer di flusso per il flusso predefinito. Per attivare il multiplexing in
Java, chiama il setEnableConnectionPool metodo quando crei un
StreamWriter o JsonStreamWriter oggetto.
Dopo aver attivato il pool di connessioni, la libreria client Java gestisce le connessioni in background, aumentando il numero di connessioni se quelle esistenti sono considerate troppo occupate. Per la scalabilità automatica, ti consigliamo di ridurre il limite maxInflightRequests.
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();
Per attivare il multiplexing in Go, consulta Condivisione delle connessioni (multiplexing).
Utilizzare il tipo committed per la semantica "esattamente una volta"
Se hai bisogno della semantica di scrittura "esattamente una volta", crea un flusso di scrittura di tipo committed. Nel tipo committed, i record sono disponibili per le query non appena il client riceve la conferma dal backend.
Il tipo committed fornisce la consegna "esattamente una volta" all'interno di un flusso tramite l'utilizzo degli offset dei record. Utilizzando gli offset dei record, l'applicazione specifica l'offset di accodamento successivo in ogni chiamata a AppendRows. L'operazione di scrittura viene eseguita solo se il valore dell'offset corrisponde all'offset di accodamento successivo. Per saperne di più, consulta
Gestire gli offset dei flussi per ottenere la semantica "esattamente una volta".
Se non fornisci un offset, i record vengono aggiunti alla fine corrente del flusso. In questo caso, se una richiesta di accodamento restituisce un errore, il nuovo tentativo potrebbe comportare la visualizzazione del record più di una volta nel flusso.
Per utilizzare il tipo committed, segui questi passaggi:
Java
- Chiama
CreateWriteStreamper creare uno o più flussi di tipo committed. - Per ogni flusso, chiama
AppendRowsin un loop per scrivere batch di record. - Chiama
FinalizeWriteStreamper ogni flusso per rilasciarlo. Dopo aver chiamato questo metodo, non puoi scrivere altre righe nel flusso. Questo passaggio è facoltativo nel tipo committed, ma aiuta a evitare di superare il limite dei flussi attivi. Per saperne di più, consulta Limitare la frequenza di creazione dei flussi.
Node.js
- Chiama
createWriteStreamFullResponseper creare uno o più flussi di tipo committed. - Per ogni flusso, chiama
appendRowsin un loop per scrivere batch di record. - Chiama
finalizeper ogni flusso per rilasciarlo. Dopo aver chiamato questo metodo, non puoi scrivere altre righe nel flusso. Questo passaggio è facoltativo nel tipo committed, ma aiuta a evitare di superare il limite dei flussi attivi. Per saperne di più, consulta Limitare la frequenza di creazione dei flussi.
Non puoi eliminare un flusso in modo esplicito. I flussi seguono la durata (TTL) definita dal sistema:
- Un flusso committed ha una TTL di tre giorni se non è presente traffico sul flusso.
- Per impostazione predefinita, un flusso con buffer ha una TTL di sette giorni se non è presente traffico sul flusso.
Il seguente codice mostra come utilizzare il tipo committed:
Java
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery. Per saperne di più, consulta la documentazione di riferimento dell'API Java BigQuery.
Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura l'autenticazione per le librerie client.
Node.js
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery.
Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura l'autenticazione per le librerie client.
Utilizzare il formato Apache Arrow per importare i dati
Il seguente codice mostra come importare i dati utilizzando il formato Apache Arrow.
Python
Questo esempio mostra come importare una tabella PyArrow serializzata utilizzando il flusso predefinito. Per un esempio end-to-end più dettagliato, consulta l'esempio PyArrow su GitHub.
from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1
def append_rows_with_pyarrow(
pyarrow_table: pyarrow.Table,
project_id: str,
dataset_id: str,
table_id: str,
):
bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create request_template.
request_template = gapic_types.AppendRowsRequest()
request_template.write_stream = (
f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
)
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
arrow_data.writer_schema.serialized_schema = (
pyarrow_table.schema.serialize().to_pybytes()
)
request_template.arrow_rows = arrow_data
# Create AppendRowsStream.
append_rows_stream = AppendRowsStream(
bqstorage_write_client,
request_template,
)
# Create request with table data.
request = gapic_types.AppendRowsRequest()
request.arrow_rows.rows.serialized_record_batch = (
pyarrow_table.to_batches()[0].serialize().to_pybytes()
)
# Send request.
future = append_rows_stream.send(request)
# Wait for result.
future.result()
Java
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery. Per saperne di più, consulta la documentazione di riferimento dell'API Java BigQuery.
Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura l'autenticazione per le librerie client.