Leggi da Cloud Storage a Dataflow

Per leggere i dati da Cloud Storage a Dataflow, utilizza il connettore I/O TextIO o AvroIO di Apache Beam.

Includi la dipendenza dalla Google Cloud libreria

Per utilizzare il connettore TextIO o AvroIO con Cloud Storage, includi la seguente dipendenza. Questa libreria fornisce un gestore di schemi per "gs://" nomi file.

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Vai

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Per saperne di più, consulta Installare l'SDK Apache Beam.

Abilita gRPC sul connettore I/O Apache Beam su Dataflow

Puoi connetterti a Cloud Storage utilizzando gRPC tramite il connettore I/O Apache Beam su Dataflow. gRPC è un framework open source per chiamata di procedura remota (RPC) ad alte prestazioni sviluppato da Google che puoi utilizzare per interagire con Cloud Storage.

Per velocizzare le richieste di lettura del job Dataflow a Cloud Storage, puoi abilitare il connettore I/O Apache Beam su Dataflow per utilizzare gRPC.

Riga di comando

  1. Assicurati di utilizzare l' SDK Apache Beam versione 2.55.0 o successive.
  2. Per eseguire un job Dataflow, utilizza l'opzione della pipeline --additional-experiments=use_grpc_for_gcs. Per informazioni sulle diverse opzioni della pipeline, consulta Flag facoltativi.

SDK Apache Beam

  1. Assicurati di utilizzare l' SDK Apache Beam versione 2.55.0 o successive.
  2. Per eseguire un job Dataflow, utilizza l'opzione della pipeline --experiments=use_grpc_for_gcs. Per informazioni sulle diverse opzioni della pipeline, consulta Opzioni di base.

Puoi configurare il connettore I/O Apache Beam su Dataflow per generare metriche correlate a gRPC in Cloud Monitoring. Le metriche correlate a gRPC possono aiutarti a:

  • Monitorare e ottimizzare le prestazioni delle richieste gRPC a Cloud Storage.
  • Risolvere i problemi ed eseguire il debug.
  • Ottenere informazioni dettagliate sull'utilizzo e sul comportamento della tua applicazione.

Per informazioni su come configurare il connettore I/O Apache Beam su Dataflow per generare metriche correlate a gRPC, consulta Utilizzare le metriche lato client. Se la raccolta delle metriche non è necessaria per il tuo caso d'uso, puoi scegliere di disattivare la raccolta delle metriche. Per le istruzioni, consulta Disattivare le metriche lato client.

Parallelismo

I connettori TextIO e AvroIO supportano due livelli di parallelismo:

  • I singoli file vengono suddivisi in chiavi separatamente, in modo che più worker possano leggerli.
  • Se i file non sono compressi, il connettore può leggere separatamente i sottointervalli di ogni file, il che porta a un livello di parallelismo molto elevato. Questa suddivisione è possibile solo se ogni riga del file è un record significativo. Ad esempio, non è disponibile per impostazione predefinita per i file JSON.

Prestazioni

La tabella seguente mostra le metriche delle prestazioni per la lettura da Cloud Storage. I carichi di lavoro sono stati eseguiti su un worker e2-standard2, utilizzando l'SDK Apache Beam 2.49.0 per Java. Non hanno utilizzato Runner v2.

100 M record | 1 kB | 1 colonna Velocità effettiva (byte) Velocità effettiva (elementi)
Leggi 320 MBps 320.000 elementi al secondo

Queste metriche si basano su semplici pipeline batch. Sono progettate per confrontare le prestazioni tra i connettori I/O e non sono necessariamente rappresentative delle pipeline reali. Le prestazioni della pipeline Dataflow sono complesse e dipendono dal tipo di VM, dai dati elaborati, dalle prestazioni delle origini e dei sink esterni e dal codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentative delle caratteristiche delle prestazioni di altri SDK di linguaggio. Per saperne di più, consulta Prestazioni I/O di Beam.

Best practice

  • Evita di utilizzare watchForNewFiles con Cloud Storage. Questo approccio non è scalabile per le pipeline di produzione di grandi dimensioni, perché il connettore deve conservare in memoria un elenco dei file visualizzati. L'elenco non può essere scaricato dalla memoria, il che riduce la memoria di lavoro dei worker nel tempo. Valuta la possibilità di utilizzare le notifiche Pub/Sub per Cloud Storage. Per saperne di più, consulta Pattern di elaborazione dei file.

  • Se sia il nome file sia i contenuti del file sono dati utili, utilizza la FileIO classe per leggere i nomi file. Ad esempio, un nome file potrebbe contenere metadati utili per l'elaborazione dei dati nel file. Per saperne di più, consulta Accesso ai nomi file. La FileIO documentazione mostra anche un esempio di questo pattern.

Esempio

L'esempio seguente mostra come leggere da Cloud Storage.

Java

Per eseguire l'autenticazione in Dataflow, configura le credenziali predefinite dell'applicazione. Per saperne di più, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

Passaggi successivi