Questa pagina fornisce indicazioni sulle best practice da seguire quando crei ed esegui workflow Dataflow HPC altamente paralleli, tra cui come utilizzare codice esterno nelle pipeline, come eseguire la pipeline e come gestire la gestione degli errori.
Includere codice esterno nella pipeline
Un elemento di differenziazione chiave per le pipeline altamente parallele è che utilizzano
codice C++ all'interno di
DoFn
anziché uno dei linguaggi SDK Apache Beam standard. Per le pipeline Java, per semplificare l'utilizzo delle librerie C++ nella pipeline, ti consigliamo di utilizzare le chiamate di procedure esterne. Questa sezione descrive l'approccio generale utilizzato per l'esecuzione di codice esterno (C++) nelle pipeline Java.
Una definizione di pipeline Apache Beam ha diversi componenti chiave:
PCollectionssono raccolte immutabili di elementi omogenei.PTransformsvengono utilizzati per definire le trasformazioni in unPCollectionche genera un altroPCollection.- La pipeline è il costrutto che ti consente, tramite il codice, di dichiarare le interazioni tra
PTransformsePCollections. La pipeline è rappresentata come un grafo diretto aciclico (DAG).
Quando utilizzi codice da un linguaggio che non è uno dei linguaggi SDK Apache Beam standard, inserisci il codice in PTransform, che si trova all'interno di DoFn, e utilizza uno dei linguaggi SDK standard per definire la pipeline stessa.
Ti consigliamo di utilizzare l'SDK Apache Beam Python per definire la pipeline, perché l'SDK Python ha una classe di utilità che semplifica l'utilizzo di altro codice. Tuttavia, puoi utilizzare gli altri SDK Apache Beam.
Puoi utilizzare il codice per condurre esperimenti rapidi senza richiedere una build completa. Per un sistema di produzione, in genere crei i tuoi file binari, il che ti dà la libertà di ottimizzare il processo in base alle tue esigenze.
Il seguente diagramma illustra i due utilizzi dei dati della pipeline:
- I dati vengono utilizzati per guidare il processo.
- I dati vengono acquisiti durante l'elaborazione e uniti ai dati del driver.
In questa pagina, i dati primari (dall'origine) sono indicati come dati di guida e i dati secondari (dalla fase di elaborazione) sono indicati come dati di unione.
In un caso d'uso finanziario, i dati di guida potrebbero essere alcune centinaia di migliaia di transazioni. Ogni transazione deve essere elaborata insieme ai dati di mercato. In questo caso, i dati di mercato sono i dati di unione. In un caso d'uso multimediale, i dati di guida potrebbero essere file di immagini che richiedono l'elaborazione ma non necessitano di altre origini dati e, pertanto, non utilizzano i dati di unione.
Considerazioni sulle dimensioni per i dati di guida
Se la dimensione dell'elemento dei dati di guida è nell'intervallo dei megabyte bassi, trattala con il normale paradigma Apache Beam di creazione di un oggetto PCollection dall'origine e invio dell'oggetto alle trasformazioni Apache Beam per l'elaborazione.
Se la dimensione dell'elemento dei dati di guida è in megabyte elevati o in gigabyte, come è tipico per i media, puoi inserire i dati di guida in Cloud Storage. Poi, nell'oggetto PCollection iniziale, fai riferimento all'URI di archiviazione e utilizza solo un riferimento URI a questi dati.
Considerazioni sulle dimensioni per i dati di unione
Se i dati di unione sono di alcune centinaia di megabyte o meno, utilizza un input laterale per inviare questi dati alle trasformazioni Apache Beam. L'input laterale invia il pacchetto di dati a ogni worker che ne ha bisogno.
Se i dati di unione sono nell'intervallo di gigabyte o terabyte, utilizza Bigtable o Cloud Storage per unire i dati di unione ai dati di guida, a seconda della natura dei dati. Bigtable è ideale per gli scenari finanziari in cui i dati di mercato sono spesso accessibili come ricerche coppia chiave-valore da Bigtable. Per ulteriori informazioni sulla progettazione dello schema Bigtable, inclusi i consigli per l'utilizzo dei dati di serie temporali, consulta la seguente documentazione di Bigtable:
Eseguire il codice esterno
Puoi eseguire codice esterno in Apache Beam in molti modi.
Crea un processo chiamato da un oggetto
DoFnall'interno di una trasformazione Dataflow.Utilizza JNI con l'SDK Java.
Crea un sottoprocesso direttamente dall'oggetto
DoFn. Sebbene questo approccio non sia il più efficiente, è robusto e semplice da implementare. A causa dei potenziali problemi con l'utilizzo di JNI, questa pagina mostra l'utilizzo di una chiamata di sottoprocesso.
Quando progetti il workflow, considera la pipeline end-to-end completa. Eventuali inefficienze nel modo in cui viene eseguito il processo sono compensate dal fatto che lo spostamento dei dati dall'origine al sink viene eseguito con una singola pipeline. Se confronti questo approccio con altri, esamina i tempi end-to-end della pipeline e i costi end-to-end.
Inserire i file binari negli host
Quando utilizzi un linguaggio Apache Beam nativo, l'SDK Apache Beam sposta automaticamente tutto il codice richiesto nei worker. Tuttavia, quando effettui una chiamata a codice esterno, devi spostare il codice manualmente.
Per spostare il codice: L'esempio mostra i passaggi per l'SDK Apache Beam Java.
- Archivia il codice esterno compilato, insieme alle informazioni sul controllo delle versioni, in Cloud Storage.
- Nel
@Setupmetodo, crea un blocco sincronizzato per verificare se il file di codice è disponibile sulla risorsa locale. Anziché implementare un controllo fisico, puoi confermare la disponibilità utilizzando una variabile statica al termine del primo thread. - Se il file non è disponibile, utilizza la libreria client di Cloud Storage per estrarre il file dal bucket Cloud Storage al worker locale. Un
approccio consigliato è utilizzare la classe
FileSystemsdi Apache Beam per questa attività. - Dopo aver spostato il file, verifica che il bit di esecuzione sia impostato sul file di codice.
- In un sistema di produzione, controlla l'hash dei file binari per assicurarti che il file sia stato copiato correttamente.
L'utilizzo della funzione Apache Beam
filesToStage
è anche un'opzione, ma rimuove alcuni dei vantaggi della capacità del
runner di creare automaticamente pacchetti e spostare il codice Java. Inoltre, poiché la chiamata al sottoprocesso richiede una posizione assoluta del file, devi utilizzare il codice per determinare il percorso di classe e, di conseguenza, la posizione del file spostato da filesToStage. Non consigliamo questo approccio.
Eseguire i file binari esterni
Prima di poter eseguire codice esterno, devi creare un wrapper. Scrivi questo wrapper nello stesso linguaggio del codice esterno (ad esempio, C++) o come script shell. Il wrapper ti consente di passare gli handle dei file e implementare le ottimizzazioni come descritto nella sezione Progettare l'elaborazione per piccoli cicli di CPU in questa pagina. Il wrapper non deve essere sofisticato. Il seguente snippet mostra una struttura di un wrapper in C++.
int main(int argc, char* argv[])
{
if(argc < 3){
std::cerr << "Required return file and data to process" << '\n';
return 1;
}
std::string returnFile = argv[1];
std::string word = argv[2];
std::ofstream myfile;
myfile.open (returnFile);
myfile << word;
myfile.close();
return 0;
}
Questo codice legge due parametri dall'elenco degli argomenti. Il primo parametro è la posizione del file di restituzione in cui vengono inseriti i dati. Il secondo parametro sono i dati che il codice ripete all'utente. Nelle implementazioni reali, questo codice farebbe molto di più che ripetere "Hello, world"!
Dopo aver scritto il codice del wrapper, esegui il codice esterno:
- Trasmetti i dati ai file binari del codice esterno.
- Esegui i file binari, rileva eventuali errori e registra errori e risultati.
- Gestisci le informazioni di logging.
- Acquisisci i dati dall'elaborazione completata.
Trasmettere i dati ai file binari
Per avviare il processo di esecuzione della libreria, trasmetti i dati al codice C++. In questo passaggio puoi sfruttare l'integrazione di Dataflow con altri Google Cloud strumenti. Uno strumento come Bigtable può gestire set di dati molto grandi e gestire l'accesso a bassa latenza e ad alta concorrenza, consentendo a migliaia di core di accedere contemporaneamente al set di dati. Inoltre, Bigtable può pre-elaborare i dati, consentendo la modellazione, l'arricchimento e il filtraggio dei dati. Tutto questo lavoro può essere eseguito nelle trasformazioni Apache Beam prima di eseguire il codice esterno.
Per un sistema di produzione, il percorso consigliato è utilizzare un buffer di protocollo per incapsulare i dati di input. Puoi convertire i dati di input in byte e codificarli in base64 prima di passarli alla libreria esterna. I due modi per passare questi dati alla libreria esterna sono i seguenti:
- Dati di input di piccole dimensioni. Per i dati di piccole dimensioni che non superano la lunghezza massima del sistema per un argomento di comando, passa l'argomento nella posizione 2 del processo in fase di creazione con
java.lang.ProcessBuilder. - Dati di input di grandi dimensioni. Per dimensioni dei dati maggiori, crea un file il cui nome include un UUID per contenere i dati richiesti dal processo.
Eseguire il codice C++, rilevare gli errori e registrare
L'acquisizione e la gestione delle informazioni sugli errori sono una parte fondamentale della pipeline. Le risorse utilizzate dal runner Dataflow sono temporanee e spesso è difficile ispezionare i file di log dei worker. Devi assicurarti di acquisire e inserire tutte le informazioni utili nel logging del runner Dataflow e di archiviare i dati di logging in uno o più bucket Cloud Storage.
L'approccio consigliato è reindirizzare stdout e stderr ai file, in modo da evitare qualsiasi considerazione relativa alla memoria insufficiente. Ad esempio, nel runner Dataflow che chiama il codice C++, potresti includere righe come le seguenti:
Java
import java.lang.ProcessBuilder.Redirect;
...
processbuilder.redirectError(Redirect.appendTo(errfile));
processbuilder.redirectOutput(Redirect.appendTo(outFile));
Python
# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
integers
| beam.Map(collatz.total_stopping_time).with_exception_handling(
use_subprocess=True))
# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
os.path.splitext(output_path)[0] + '-bad.txt')
Gestire le informazioni di logging
Molti casi d'uso prevedono l'elaborazione di milioni di elementi. L'elaborazione riuscita genera log con poco o nessun valore, quindi devi prendere una decisione aziendale in merito alla conservazione dei dati di log. Ad esempio, considera queste alternative alla conservazione di tutti i dati di log:
- Se le informazioni contenute nei log dell'elaborazione degli elementi riuscita non sono utili, non conservarle.
- Crea una logica che campiona i dati di log, ad esempio campionando solo ogni 10.000 voci di log. Se l'elaborazione è omogenea, ad esempio quando molte iterazioni del codice generano dati di log essenzialmente identici, questo approccio fornisce un equilibrio efficace tra la conservazione dei dati di log e l'ottimizzazione dell'elaborazione.
Per le condizioni di errore, la quantità di dati di cui è stato eseguito il dump nei log potrebbe essere elevata. Una strategia efficace per la gestione di grandi quantità di dati di log degli errori è leggere le prime righe della voce di log e inserirle solo in Cloud Logging. Puoi caricare il resto del file di log nei bucket Cloud Storage. Questo approccio ti consente di esaminare le prime righe dei log degli errori in un secondo momento e, se necessario, fare riferimento a Cloud Storage per l'intero file.
È utile anche controllare le dimensioni del file di log. Se la dimensione del file è zero, puoi ignorarlo in sicurezza o registrare un semplice messaggio di log che indica che il file non contiene dati.
Acquisire i dati dall'elaborazione completata
Non è consigliabile utilizzare stdout per restituire il risultato del calcolo alla funzione DoFn. Anche altro codice chiamato dal codice C++ e persino il
tuo codice potrebbero inviare messaggi a stdout contaminando lo stdoutput
stream che altrimenti contiene i dati di logging. Invece, è una best practice apportare una modifica al codice del wrapper C++ per consentire al codice di accettare un parametro che indica dove creare il file che memorizza il valore. Idealmente, questo
file deve essere archiviato in modo indipendente dalla lingua utilizzando
i buffer di protocollo,
che consentono al codice C++ di restituire un oggetto al codice Java o Python. L'oggetto DoFn può leggere il risultato direttamente dal file e passare le informazioni sul risultato alla propria chiamata output.
L'esperienza ha dimostrato l'importanza di eseguire unit test che riguardano il processo stesso. È importante implementare un unit test che esegua il processo indipendentemente dalla pipeline Dataflow. Il debug della libreria può essere eseguito in modo molto più efficiente se è autonomo e non deve eseguire l'intera pipeline.
Progettare l'elaborazione per piccoli cicli di CPU
La chiamata a un sottoprocesso comporta un overhead. A seconda del carico di lavoro, potrebbe essere necessario eseguire un lavoro aggiuntivo per ridurre il rapporto tra il lavoro svolto e l'overhead amministrativo di avvio e arresto del processo.
Nel caso d'uso multimediale, la dimensione dell'elemento dei dati di guida potrebbe essere in megabyte elevati o in gigabyte. Di conseguenza, l'elaborazione di ogni elemento di dati può richiedere molti minuti. In questo caso, il costo della chiamata al sottoprocesso è insignificante rispetto al tempo di elaborazione complessivo. L'approccio migliore in questa situazione è che un singolo elemento avvii il proprio processo.
Tuttavia, in altri casi d'uso, come la finanza, l'elaborazione richiede unità di tempo CPU molto piccole (decine di millisecondi). In questo caso, l'overhead della chiamata al sottoprocesso è sproporzionatamente elevato. Una soluzione a questo
problema è utilizzare la trasformazione di Apache Beam
GroupByKey
per creare batch di 50-100 elementi da inserire nel
processo. Ad esempio, puoi procedere nel seguente modo:
- In una
DoFnfunzione, crea una coppia chiave-valore. Se stai elaborando transazioni finanziarie, puoi utilizzare il numero di transazione come chiave. Se non hai un numero univoco da utilizzare come chiave, puoi generare un checksum dai dati e utilizzare una funzione modulo per creare partizioni di 50 elementi. - Invia la chiave a una
GroupByKey.createfunzione, che restituisce unaKV<key,Iterable<data>>raccolta contenente i 50 elementi che puoi quindi inviare al processo.
Limitare il parallelismo dei worker
Quando lavori con un linguaggio supportato in modo nativo nel runner Dataflow, non devi mai pensare a cosa succede al worker. Dataflow ha molti processi che supervisionano il controllo del flusso e i thread in modalità batch o stream.
Tuttavia, se utilizzi un linguaggio esterno come C++, tieni presente che stai facendo qualcosa di un po' insolito avviando sottoprocessi. In modalità batch, il runner Dataflow utilizza un piccolo rapporto tra thread di lavoro e CPU rispetto alla modalità stream. Ti consigliamo, soprattutto in modalità stream, di creare un semaforo all'interno della classe per controllare più direttamente il parallelismo di un singolo worker.
Ad esempio, con l'elaborazione multimediale, potresti non voler elaborare centinaia di elementi di transcodifica in parallelo da un singolo worker. In questi casi, puoi creare una classe di utilità che fornisca autorizzazioni alla funzione DoFn per il lavoro svolto. L'utilizzo di questa classe ti consente di controllare direttamente i thread worker all'interno della pipeline.
Utilizzare sink di dati ad alta capacità in Google Cloud
Dopo l'elaborazione, i dati vengono inviati a un sink di dati. Il sink deve essere in grado di gestire il volume di risultati creati dalla soluzione di elaborazione della griglia.
Il seguente diagramma mostra alcuni dei sink disponibili in Google Cloud quando Dataflow esegue un carico di lavoro della griglia.
Bigtable, BigQuery e Pub/Sub possono gestire tutti stream di dati molto grandi. Ad esempio, ogni nodo Bigtable può gestire 10.000 inserimenti al secondo di dimensioni fino a 1 KB con una facile scalabilità orizzontale. Di conseguenza, un cluster Bigtable di 100 nodi può assorbire 1.000.000 di messaggi al secondo generati dalla griglia Dataflow.
Gestire gli errori di segmentazione
Quando utilizzi codice C++ all'interno di una pipeline, devi decidere come gestire gli errori di segmentazione, perché hanno ramificazioni non locali se non vengono gestiti correttamente. Il runner Dataflow crea processi in base alle esigenze in Java, Python o Go, quindi assegna il lavoro ai processi sotto forma di bundle.
Se la chiamata al codice C++ viene eseguita utilizzando strumenti strettamente accoppiati, come JNI o Cython, e il processo C++ genera un errore di segmentazione, si arrestano anche il processo di chiamata e la Java Virtual Machine (JVM). In questo scenario, i punti dati errati non sono rilevabili. Per rendere rilevabili i punti dati errati, utilizza un accoppiamento più libero, che ramifica i dati errati e consente alla pipeline di continuare. Tuttavia, con codice C++ maturo e completamente testato rispetto a tutte le variazioni dei dati, puoi utilizzare meccanismi come Cython.
Passaggi successivi
Segui il tutorial per creare una pipeline che utilizza container personalizzati con le librerie C++.
Visualizza il codice di esempio per questa pagina nel repository GitHub di Apache Beam .
Scopri di più sulla creazione di pipeline con Apache Beam.