Ler do Cloud Storage para o Dataflow

Para ler dados do Cloud Storage para o Dataflow, use o Apache Beam TextIO ou o AvroIO conetor de I/O.

Inclua a dependência da biblioteca da Google Cloud Platform

Para usar o conetor TextIO ou AvroIO com o Cloud Storage, inclua a seguinte dependência. Esta biblioteca fornece um controlador de esquemas para nomes de ficheiros."gs://"

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Ir

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Para mais informações, consulte o artigo Instale o SDK do Apache Beam.

Ative o gRPC no conetor Apache Beam I/O no Dataflow

Pode estabelecer ligação ao Cloud Storage através do gRPC através do conetor de E/S do Apache Beam no Dataflow. O gRPC é uma framework de chamada de procedimento remoto (RPC) de código aberto de alto desempenho desenvolvida pela Google que pode usar para interagir com o Cloud Storage.

Para acelerar os pedidos de leitura da sua tarefa do Dataflow para o Cloud Storage, pode ativar o conetor Apache Beam I/O no Dataflow para usar o gRPC.

Linha de comandos

  1. Certifique-se de que usa a versão 2.55.0 ou posterior do 2.55.0.
  2. Para executar uma tarefa do Dataflow, use a opção de pipeline --additional-experiments=use_grpc_for_gcs. Para obter informações sobre as diferentes opções de pipeline, consulte as flags opcionais.

SDK Apache Beam

  1. Certifique-se de que usa a versão 2.55.0 ou posterior do 2.55.0.
  2. Para executar uma tarefa do Dataflow, use a opção de pipeline --experiments=use_grpc_for_gcs. Para obter informações sobre as diferentes opções de pipeline, consulte as opções básicas.

Pode configurar o conetor de E/S do Apache Beam no Dataflow para gerar métricas relacionadas com gRPC no Cloud Monitoring. As métricas relacionadas com gRPC podem ajudar a fazer o seguinte:

  • Monitorize e otimize o desempenho dos pedidos gRPC para o Cloud Storage.
  • Resolva e depure problemas.
  • Obtenha estatísticas sobre a utilização e o comportamento da sua aplicação.

Para obter informações sobre como configurar o conetor Apache Beam I/O no Dataflow para gerar métricas relacionadas com gRPC, consulte o artigo Use métricas do lado do cliente. Se a recolha de métricas não for necessária para o seu exemplo de utilização, pode optar por desativá-la. Para ver instruções, consulte o artigo Desative as métricas do lado do cliente.

Paralelismo

Os conetores TextIO e AvroIO suportam dois níveis de paralelismo:

  • Os ficheiros individuais são indexados separadamente para que possam ser lidos por vários trabalhadores.
  • Se os ficheiros não estiverem comprimidos, o conetor pode ler subintervalos de cada ficheiro separadamente, o que leva a um nível de paralelismo muito elevado. Esta divisão só é possível se cada linha no ficheiro for um registo significativo. Por exemplo, não está disponível por predefinição para ficheiros JSON.

Desempenho

A tabela seguinte mostra as métricas de desempenho para a leitura a partir do Cloud Storage. As cargas de trabalho foram executadas num e2-standard2trabalhador, com o SDK do Apache Beam 2.49.0 para Java. Não usaram o Runner v2.

100 M de registos | 1 kB | 1 coluna Débito (bytes) Débito (elementos)
Lida 320 MBps 320 000 elementos por segundo

Estas métricas baseiam-se em pipelines de processamento em lote simples. Destinam-se a comparar o desempenho entre conectores de E/S e não são necessariamente representativos de pipelines do mundo real. O desempenho do pipeline do Dataflow é complexo e é uma função do tipo de VM, dos dados que estão a ser processados, do desempenho das origens e dos destinos externos, e do código do utilizador. As métricas baseiam-se na execução do SDK Java e não são representativas das características de desempenho de outros SDKs de idiomas. Para mais informações, consulte o artigo Desempenho do Beam IO.

Práticas recomendadas

  • Evite usar watchForNewFiles com o armazenamento na nuvem. Esta abordagem não é adequada para pipelines de produção grandes, porque o conector tem de manter uma lista de ficheiros vistos na memória. Não é possível limpar a lista da memória, o que reduz a memória de trabalho dos trabalhadores ao longo do tempo. Considere usar notificações do Pub/Sub para o Cloud Storage em alternativa. Para mais informações, consulte o artigo Padrões de processamento de ficheiros.

  • Se o nome do ficheiro e o conteúdo do ficheiro forem dados úteis, use a classe FileIO para ler os nomes dos ficheiros. Por exemplo, um nome de ficheiro pode conter metadados úteis quando processa os dados no ficheiro. Para mais informações, consulte Aceder a nomes de ficheiros. A documentação FileIO também mostra um exemplo deste padrão.

Exemplo

O exemplo seguinte mostra como ler a partir do Cloud Storage.

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

O que se segue?