Escreva do Dataflow para o Cloud Storage

Este documento descreve como escrever dados de texto do Dataflow para o Cloud Storage através do TextIO conetor I/O do Apache Beam.

Inclua a dependência da biblioteca da Google Cloud Platform

Para usar o conetor TextIO com o Cloud Storage, inclua a seguinte dependência. Esta biblioteca fornece um controlador de esquemas para nomes de ficheiros "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Ir

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Para mais informações, consulte o artigo Instale o SDK do Apache Beam.

Ative o gRPC no conetor Apache Beam I/O no Dataflow

Pode estabelecer ligação ao Cloud Storage através do gRPC através do conetor de E/S do Apache Beam no Dataflow. O gRPC é uma framework de chamada de procedimento remoto (RPC) de código aberto de alto desempenho desenvolvida pela Google que pode usar para interagir com o Cloud Storage.

Para acelerar os pedidos de escrita da sua tarefa do Dataflow para o Cloud Storage, pode ativar o conetor Apache Beam I/O no Dataflow para usar o gRPC.

Linha de comandos

  1. Certifique-se de que usa a versão 2.55.0 ou posterior do 2.55.0.
  2. Para executar uma tarefa do Dataflow, use a opção de pipeline --additional-experiments=use_grpc_for_gcs. Para obter informações sobre as diferentes opções de pipeline, consulte as flags opcionais.

SDK Apache Beam

  1. Certifique-se de que usa a versão 2.55.0 ou posterior do 2.55.0.
  2. Para executar uma tarefa do Dataflow, use a opção de pipeline --experiments=use_grpc_for_gcs. Para obter informações sobre as diferentes opções de pipeline, consulte as opções básicas.

Pode configurar o conetor de E/S do Apache Beam no Dataflow para gerar métricas relacionadas com gRPC no Cloud Monitoring. As métricas relacionadas com gRPC podem ajudar a fazer o seguinte:

  • Monitorize e otimize o desempenho dos pedidos gRPC para o Cloud Storage.
  • Resolva e depure problemas.
  • Obtenha estatísticas sobre a utilização e o comportamento da sua aplicação.

Para obter informações sobre como configurar o conetor Apache Beam I/O no Dataflow para gerar métricas relacionadas com gRPC, consulte o artigo Use métricas do lado do cliente. Se a recolha de métricas não for necessária para o seu exemplo de utilização, pode optar por desativá-la. Para ver instruções, consulte o artigo Desative as métricas do lado do cliente.

Paralelismo

O paralelismo é determinado principalmente pelo número de fragmentos. Por predefinição, o executor define automaticamente este valor. Para a maioria dos pipelines, recomendamos a utilização do comportamento predefinido. Neste documento, consulte as Práticas recomendadas.

Desempenho

A tabela seguinte mostra as métricas de desempenho para a gravação no Cloud Storage. As cargas de trabalho foram executadas num e2-standard2trabalhador, com o SDK do Apache Beam 2.49.0 para Java. Não usaram o Runner v2.

100 M de registos | 1 kB | 1 coluna Débito (bytes) Débito (elementos)
Escrever 130 MBps 130 000 elementos por segundo

Estas métricas baseiam-se em pipelines de processamento em lote simples. Destinam-se a comparar o desempenho entre conectores de E/S e não são necessariamente representativos de pipelines do mundo real. O desempenho do pipeline do Dataflow é complexo e é uma função do tipo de VM, dos dados que estão a ser processados, do desempenho das origens e dos destinos externos, e do código do utilizador. As métricas baseiam-se na execução do SDK Java e não são representativas das características de desempenho de outros SDKs de idiomas. Para mais informações, consulte o artigo Desempenho do Beam IO.

Práticas recomendadas

  • Em geral, evite definir um número específico de fragmentos. Isto permite ao corredor selecionar um valor adequado para a sua escala. Para ativar a divisão automática, chame .withAutoSharding() e não .withNumShards(0). Se ajustar o número de fragmentos, recomendamos que escreva entre 100 MB e 1 GB por fragmento. No entanto, o valor ideal pode depender da carga de trabalho.

  • O Cloud Storage pode ser dimensionado para um número muito elevado de pedidos por segundo. No entanto, se a sua pipeline tiver picos grandes no volume de escrita, considere escrever em vários contentores para evitar sobrecarregar temporariamente qualquer contentor do Cloud Storage.

  • Em geral, a gravação no Cloud Storage é mais eficiente quando cada gravação é maior (1 KB ou mais). A gravação de pequenos registos num grande número de ficheiros pode resultar num pior desempenho por byte.

  • Ao gerar nomes de ficheiros, considere usar nomes de ficheiros não sequenciais para distribuir a carga. Para mais informações, consulte o artigo Use uma convenção de nomenclatura que distribua a carga uniformemente pelos intervalos de chaves.

  • Ao atribuir nomes a ficheiros, não use o símbolo arroba ("@") seguido de um número ou um asterisco ("*"). Para mais informações, consulte o artigo "@*" e "@N" são especificações de divisão reservadas.

Exemplo: escrever ficheiros de texto no Cloud Storage

O exemplo seguinte cria um pipeline em lote que escreve ficheiros de texto usando a compressão GZIP:

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Se a entrada PCollection não estiver limitada, tem de definir uma janela ou um acionador na recolha e, em seguida, especificar escritas em janelas chamando TextIO.Write.withWindowedWrites.

Python

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Para o caminho de saída, especifique um caminho do Cloud Storage que inclua o nome do contentor e um prefixo do nome de ficheiro. Por exemplo, se especificar gs://my_bucket/output/file, o conector TextIO escreve no contentor do Cloud Storage denominado my_bucket, e os ficheiros de saída têm o prefixo output/file*.

Por predefinição, o conetor TextIO fragmenta os ficheiros de saída, usando uma convenção de nomenclatura como esta: <file-prefix>-00000-of-00001. Opcionalmente, pode especificar um sufixo do nome do ficheiro e um esquema de compressão, conforme mostrado no exemplo.

Para garantir gravações idempotentes, o Dataflow grava num ficheiro temporário e, em seguida, copia o ficheiro temporário concluído para o ficheiro final. Para controlar onde estes ficheiros temporários são armazenados, use o método withTempDirectory.

O que se segue?