Questa pagina descrive le caratteristiche di rendimento dei job di streaming Dataflow che leggono da Pub/Sub e scrivono in BigQuery. Fornisce i risultati del test di benchmark per due tipi di pipeline di streaming:
Solo mappa (trasformazione per messaggio): pipeline che eseguono trasformazioni per messaggio, senza tenere traccia dello stato o raggruppare gli elementi nel flusso. Gli esempi includono ETL, convalida dei campi e mappatura dello schema.
Aggregazione in finestra (
GroupByKey): pipeline che eseguono operazioni con stato e raggruppano i dati in base a una chiave e a una finestra temporale. Alcuni esempi includono il conteggio degli eventi, il calcolo delle somme e la raccolta dei record per una sessione utente.
La maggior parte dei carichi di lavoro per l'integrazione dei dati di streaming rientra in queste due categorie. Se la tua pipeline segue un pattern simile, puoi utilizzare questi benchmark per valutare il tuo job Dataflow rispetto a una configurazione di riferimento con prestazioni ottimali.
Metodologia di test
I benchmark sono stati condotti utilizzando le seguenti risorse:
Un argomento Pub/Sub di cui è stato eseguito il provisioning preliminare con un carico di input costante. I messaggi sono stati generati utilizzando il modello Generatore di dati di streaming.
- Frequenza dei messaggi: circa 1.000.000 di messaggi al secondo
- Input Load: 1 GiB/s
- Formato del messaggio: testo JSON generato in modo casuale con uno schema fisso
- Dimensioni messaggio: circa 1 KiB per messaggio
Una tabella BigQuery standard.
Pipeline Dataflow in modalità flusso basate sul modello Da Pub/Sub a BigQuery. Queste pipeline eseguono l'analisi e la mappatura dello schema minime richieste. Non è stata utilizzata alcuna funzione definita dall'utente personalizzata.
Dopo che lo scaling orizzontale si è stabilizzato e la pipeline ha raggiunto lo stato stazionario, le pipeline sono state eseguite per circa un giorno, dopodiché i risultati sono stati raccolti e analizzati.
Pipeline Dataflow
Sono state testate due varianti della pipeline:
Pipeline solo mappa. Questa pipeline esegue una semplice mappatura e conversione dei messaggi JSON. Per questo test, è stato utilizzato il modello Da Pub/Sub a BigQuery senza modifiche.
- Semantica: la pipeline è stata testata utilizzando sia la modalità exactly-once sia la modalità at-least-once. L'elaborazione at-least-once offre una velocità effettiva migliore. Tuttavia, deve essere utilizzato solo quando i record duplicati sono accettabili o il sink downstream gestisce la deduplicazione.
Pipeline di aggregazione in finestra. Questa pipeline raggruppa i messaggi in base a una chiave specifica in finestre di dimensioni fisse e scrive i record aggregati in BigQuery. Per questo test è stata utilizzata una pipeline Apache Beam personalizzata basata sul modello Da Pub/Sub a BigQuery.
Logica di aggregazione: per ogni finestra fissa e non sovrapposta di 1 minuto, i messaggi con la stessa chiave sono stati raccolti e scritti come un unico record aggregato in BigQuery. Questo tipo di aggregazione viene comunemente utilizzato nell'elaborazione dei log per combinare eventi correlati, ad esempio l'attività di un utente, in un unico record per l'analisi downstream.
Parallelismo delle chiavi: il benchmark ha utilizzato 1.000.000 di chiavi distribuite in modo uniforme.
Semantica: la pipeline è stata testata utilizzando la modalità di esecuzione esatta. Le aggregazioni richiedono la semantica exactly-once per garantire la correttezza ed evitare il doppio conteggio all'interno di un gruppo e di una finestra.
Configurazione job
La tabella seguente mostra come sono stati configurati i job Dataflow.
| Impostazione | Mappa solo una volta | Solo mappa, almeno una volta | Aggregazione in finestra, esattamente una volta |
|---|---|---|---|
| Tipo di macchina worker | n1-standard-2 |
n1-standard-2 |
n1-standard-2 |
| vCPU della macchina worker | 2 | 2 | 2 |
| RAM della macchina worker | 7,5 GiB | 7,5 GiB | 7,5 GiB |
| Persistent Disk della macchina worker | Disco permanente standard (HDD), 30 GB | Disco permanente standard (HDD), 30 GB | Disco permanente standard (HDD), 30 GB |
| Worker iniziali | 70 | 30 | 180 |
| Numero massimo di worker | 100 | 100 | 250 |
| Streaming Engine | Sì | Sì | Sì |
| Scalabilità automatica orizzontale | Sì | Sì | Sì |
| Modello di fatturazione | Fatturazione basata sulle risorse | Fatturazione basata sulle risorse | Fatturazione basata sulle risorse |
| L'API Storage Write è abilitata? | Sì | Sì | Sì |
| Stream dell'API Storage Write | 200 | Non applicabile | 500 |
| Frequenza di attivazione dell'API Storage Write | 5 secondi | Non applicabile | 5 secondi |
L'API BigQuery Storage Write è consigliata per le pipeline di streaming. Quando utilizzi la modalità exactly-once con l'API Storage Write, puoi modificare le seguenti impostazioni:
Numero di flussi di scrittura. Per garantire un parallelismo delle chiavi sufficiente nella fase di scrittura, imposta il numero di stream dell'API Storage Write su un valore maggiore del numero di CPU worker, mantenendo un livello ragionevole di velocità effettiva dello stream di scrittura BigQuery.
Frequenza di attivazione. Un valore di un secondo a una sola cifra è adatto per pipeline ad alto rendimento.
Per ulteriori informazioni, vedi Scrivere da Dataflow a BigQuery.
Risultati benchmark
Questa sezione descrive i risultati dei test di benchmark.
Utilizzo del throughput e delle risorse
La tabella seguente mostra i risultati del test per il throughput della pipeline e l'utilizzo delle risorse.
| Risultato | Mappa solo una volta | Solo mappa, almeno una volta | Aggregazione in finestra, esattamente una volta |
|---|---|---|---|
| Velocità effettiva di input per lavoratore | Media: 17 MBps, n=3 | Media: 21 MBps, n=3 | Media: 6 MBps, n=3 |
| Utilizzo medio della CPU in tutti i worker | Media: 65%, n=3 | Media: 69%, n=3 | Media: 80%, n=3 |
| Numero di nodi worker | Media: 57, n=3 | Media: 48, n=3 | Media: 169, n=3 |
| Unità di calcolo Streaming Engine all'ora | Media: 125, n=3 | Media: 46, n=3 | Media: 354, n=3 |
L'algoritmo di scalabilità automatica può influire sul livello di utilizzo della CPU target. Per ottenere un utilizzo della CPU di destinazione più alto o più basso, puoi impostare l'intervallo di scalabilità automatica o il suggerimento per l'utilizzo dei worker. Target di utilizzo più elevati possono comportare costi inferiori, ma anche una latenza finale peggiore, soprattutto per carichi variabili.
Per una pipeline di aggregazione a finestra, il tipo di aggregazione, le dimensioni della finestra e il parallelismo delle chiavi possono influire notevolmente sull'utilizzo delle risorse.
Latenza
La tabella seguente mostra i risultati del benchmark per la latenza della pipeline.
| Latenza end-to-end totale dello stage | Mappa solo una volta | Solo mappa, almeno una volta | Aggregazione in finestra, esattamente una volta |
|---|---|---|---|
| P50 | Media: 800 ms, n=3 | Media: 160 ms, n=3 | Media: 3400 ms, n=3 |
| P95 | Media: 2000 ms, n=3 | Media: 250 ms, n=3 | Media: 13.000 ms, n=3 |
| P99 | Media: 2800 ms, n=3 | Media: 410 ms, n=3 | Media: 25.000 ms, n=3 |
I test hanno misurato la latenza end-to-end per fase
(la metrica job/streaming_engine/stage_end_to_end_latencies) in tre esecuzioni di test a lunga esecuzione. Questa metrica misura il tempo
che Streaming Engine trascorre in ogni fase della pipeline. Comprende tutti i passaggi interni
della pipeline, ad esempio:
- Rimescolamento e accodamento dei messaggi per l'elaborazione
- Il tempo di elaborazione effettivo, ad esempio la conversione dei messaggi in oggetti riga
- Scrittura dello stato persistente, nonché tempo trascorso in coda per scrivere lo stato persistente
Un'altra metrica di latenza è l'aggiornamento dei dati. Tuttavia, l'aggiornamento dei dati è influenzato da fattori quali il raggruppamento in finestre definito dall'utente e i ritardi a monte nell'origine. La latenza di sistema fornisce una base di riferimento più oggettiva per l'efficienza e l'integrità dell'elaborazione interna di una pipeline sotto carico.
I dati sono stati misurati per circa un giorno per esecuzione, con i periodi di avvio iniziali scartati per riflettere prestazioni stabili e in regime stazionario. I risultati mostrano due fattori che introducono una latenza aggiuntiva:
Modalità "exactly-once". Per ottenere una semantica di esecuzione esatta, sono necessari lo shuffling deterministico e le ricerche di stato persistente per la deduplicazione. La modalità Almeno una volta è molto più veloce perché ignora questi passaggi.
Aggregazione in finestra. I messaggi devono essere completamente rimescolati, memorizzati nel buffer e scritti nello stato persistente prima della chiusura della finestra, il che aumenta la latenza end-to-end.
I benchmark mostrati qui rappresentano una base di riferimento. La latenza è molto sensibile alla complessità della pipeline. Le UDF personalizzate, le trasformazioni aggiuntive e la logica di finestre complesse possono aumentare la latenza. Le aggregazioni semplici e con un alto grado di riduzione, come somma e conteggio, tendono a generare una latenza inferiore rispetto alle operazioni con stato, come la raccolta di elementi in un elenco.
Stima i costi
Puoi stimare il costo di base della tua pipeline comparabile con la fatturazione basata sulle risorse utilizzando il Calcolatore prezzi di Google Cloud Platform, come segue:
- Apri il Calcolatore prezzi.
- Fai clic su Aggiungi alla stima.
- Seleziona Dataflow.
- Per Tipo di servizio, seleziona "Dataflow Classic".
- Seleziona Impostazioni avanzate per visualizzare l'insieme completo delle opzioni.
- Scegli la località in cui viene eseguito il job.
- Per Tipo di job, seleziona "Streaming".
- Seleziona Abilita Streaming Engine.
- Inserisci le informazioni relative alle ore di esecuzione del job, ai nodi worker, alle macchine worker e allo spazio di archiviazione su Persistent Disk.
- Inserisci il numero stimato di unità di calcolo Streaming Engine.
L'utilizzo delle risorse e i costi aumentano in modo approssimativamente lineare con la velocità effettiva di input, anche se per i job di piccole dimensioni con pochi worker, il costo totale è dominato dai costi fissi. Come punto di partenza, puoi estrapolare il numero di nodi worker e il consumo di risorse dai risultati del benchmark.
Ad esempio, supponiamo di eseguire una pipeline solo per la mappatura in modalità exactly-once, con una velocità dei dati di input di 100 MiB/s. In base ai risultati del benchmark per una pipeline da 1 GB/s, puoi stimare i requisiti delle risorse nel seguente modo:
- Fattore di scalabilità: (100 MiB/s) / (1 GiB/s) = 0,1
- Nodi worker previsti: 57 worker × 0,1 = 5,7 worker
- Numero previsto di unità di calcolo di Streaming Engine all'ora: 125 × 0,1 = 12,5 unità all'ora
Questo valore deve essere utilizzato solo come stima iniziale. Il throughput e il costo effettivi possono variare in modo significativo in base a fattori quali tipo di macchina, distribuzione delle dimensioni dei messaggi, codice utente, tipo di aggregazione, parallelismo delle chiavi e dimensione della finestra. Per ulteriori informazioni, consulta Best practice per l'ottimizzazione dei costi di Dataflow.
Esegui una pipeline di test
Questa sezione mostra i comandi
gcloud dataflow flex-template run
utilizzati per eseguire la pipeline solo per la mappatura.
Modalità "exactly-once"
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 70 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5
Modalità at-least-once
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 30 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
--additional-experiments streaming_mode_at_least_once
Sostituisci quanto segue:
JOB_ID: l'ID job DataflowPROJECT_ID: l'ID progettoSUBSCRIPTION_NAME: il nome dell'abbonamento Pub/SubDATASET: il nome del set di dati BigQueryTABLE_NAME: il nome della tabella BigQuery
Generare dati di test
Per generare dati di test, utilizza il seguente comando per eseguire il template Streaming Data Generator:
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--num-workers 70 \
--max-workers 100 \
--parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION
Sostituisci quanto segue:
JOB_ID: l'ID job DataflowPROJECT_ID: l'ID progettoTOPIC_NAME: il nome dell'argomento Pub/SubSCHEMA_LOCATION: il percorso di un file schema in Cloud Storage
Il modello Generatore di dati di streaming utilizza un file Generatore di dati JSON per definire lo schema del messaggio. I test di benchmark hanno utilizzato uno schema di messaggio simile al seguente:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
Passaggi successivi
- Utilizzare l'interfaccia di monitoraggio dei job Dataflow
- Best practice per l'ottimizzazione dei costi di Dataflow
- Risolvere i problemi relativi ai job di streaming lenti o bloccati
- Leggi da Pub/Sub a Dataflow
- Scrivere da Dataflow a BigQuery