Menulis dari Dataflow ke Cloud Storage

Dokumen ini menjelaskan cara menulis data teks dari Dataflow ke Cloud Storage menggunakan konektor I/O TextIO Apache Beam.

Menyertakan dependensi Google Cloud library

Untuk menggunakan konektor TextIO dengan Cloud Storage, sertakan dependensi berikut. Library ini menyediakan pengendali skema untuk nama file "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Untuk mengetahui informasi selengkapnya, lihat Menginstal Apache Beam SDK.

Mengaktifkan gRPC di konektor I/O Apache Beam di Dataflow

Anda dapat terhubung ke Cloud Storage menggunakan gRPC melalui konektor I/O Apache Beam di Dataflow. gRPC adalah framework panggilan prosedur jarak jauh (RPC) open source berperforma tinggi yang dikembangkan oleh Google yang dapat Anda gunakan untuk berinteraksi dengan Cloud Storage.

Untuk mempercepat permintaan tulis tugas Dataflow ke Cloud Storage, Anda dapat mengaktifkan konektor I/O Apache Beam di Dataflow untuk menggunakan gRPC.

Command line

  1. Pastikan Anda menggunakan Apache Beam SDK versi 2.55.0 atau yang lebih baru.
  2. Untuk menjalankan tugas Dataflow, gunakan opsi pipeline --additional-experiments=use_grpc_for_gcs. Untuk mengetahui informasi tentang berbagai opsi pipeline, lihat Flag opsional.

Apache Beam SDK

  1. Pastikan Anda menggunakan Apache Beam SDK versi 2.55.0 atau yang lebih baru.
  2. Untuk menjalankan tugas Dataflow, gunakan --experiments=use_grpc_for_gcs opsi pipeline. Untuk mengetahui informasi tentang berbagai opsi pipeline, lihat Opsi dasar.

Anda dapat mengonfigurasi konektor I/O Apache Beam di Dataflow untuk membuat metrik terkait gRPC di Cloud Monitoring. Metrik terkait gRPC dapat membantu Anda melakukan hal berikut:

  • Memantau dan mengoptimalkan performa permintaan gRPC ke Cloud Storage.
  • Memecahkan masalah dan melakukan proses debug.
  • Mendapatkan insight tentang penggunaan dan perilaku aplikasi Anda.

Untuk mengetahui informasi tentang cara mengonfigurasi konektor I/O Apache Beam di Dataflow untuk membuat metrik terkait gRPC, lihat Menggunakan metrik sisi klien. Jika pengumpulan metrik tidak diperlukan untuk kasus penggunaan Anda, Anda dapat memilih untuk tidak ikut pengumpulan metrik. Untuk mengetahui petunjuknya, lihat Memilih tidak ikut metrik sisi klien.

Keparalelan

Keparalelan ditentukan terutama oleh jumlah pecahan. Secara default, runner akan otomatis menetapkan nilai ini. Untuk sebagian besar pipeline, sebaiknya gunakan perilaku default. Dalam dokumen ini, lihat Praktik terbaik.

Performa

Tabel berikut menunjukkan metrik performa untuk menulis ke Cloud Storage. Workload dijalankan pada satu pekerja e2-standard2, menggunakan Apache Beam SDK 2.49.0 untuk Java. Workload tersebut tidak menggunakan Runner v2.

100 M rekaman | 1 kB | 1 kolom Throughput (byte) Throughput (elemen)
Menulis 130 MBps 130.000 elemen per detik

Metrik ini didasarkan pada pipeline batch sederhana. Metrik ini dimaksudkan untuk membandingkan performa antara konektor I/O, dan tidak selalu mewakili pipeline dunia nyata. Performa pipeline Dataflow bersifat kompleks, dan merupakan fungsi dari jenis VM, data yang diproses, performa sumber dan sink eksternal, serta kode pengguna. Metrik didasarkan pada menjalankan Java SDK, dan tidak mewakili karakteristik performa SDK bahasa lainnya. Untuk mengetahui informasi selengkapnya, lihat Performa Beam IO.

Praktik terbaik

  • Secara umum, hindari menetapkan jumlah pecahan tertentu. Hal ini memungkinkan runner memilih nilai yang sesuai untuk skala Anda. Untuk mengaktifkan pecahan otomatis, panggil .withAutoSharding(), bukan .withNumShards(0). Jika Anda menyesuaikan jumlah pecahan, sebaiknya tulis antara 100 MB dan 1 GB per pecahan. Namun, nilai optimal mungkin bergantung pada workload.

  • Cloud Storage dapat menskalakan ke jumlah permintaan yang sangat besar per detik. Namun, jika pipeline Anda memiliki lonjakan besar dalam volume tulis, pertimbangkan untuk menulis ke beberapa bucket, guna menghindari kelebihan beban sementara pada satu bucket Cloud Storage.

  • Secara umum, menulis ke Cloud Storage lebih efisien jika setiap penulisan lebih besar (1 kb atau lebih besar). Menulis rekaman kecil ke sejumlah besar file dapat menghasilkan performa yang lebih buruk per byte.

  • Saat membuat nama file, pertimbangkan untuk menggunakan nama file non-berurutan, guna mendistribusikan beban. Untuk mengetahui informasi selengkapnya, lihat Menggunakan konvensi penamaan yang mendistribusikan beban secara merata di seluruh rentang kunci.

  • Saat memberi nama file, jangan gunakan simbol at ('@') yang diikuti dengan angka atau tanda bintang ('*'). Untuk mengetahui informasi selengkapnya, lihat "@*" dan "@N" adalah spesifikasi pecahan yang dicadangkan.

Contoh: Menulis file teks ke Cloud Storage

Contoh berikut membuat pipeline batch yang menulis file teks menggunakan kompresi GZIP:

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Jika PCollection input tidak terbatas, Anda harus menentukan jendela atau pemicu pada koleksi, lalu menentukan penulisan berjendela dengan memanggil TextIO.Write.withWindowedWrites.

Python

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Untuk jalur output, tentukan jalur Cloud Storage yang menyertakan nama bucket dan awalan nama file. Misalnya, jika Anda menentukan gs://my_bucket/output/file, konektor TextIO akan menulis ke bucket Cloud Storage bernama my_bucket, dan file output memiliki awalan output/file*.

Secara default, konektor TextIO memecah file output, menggunakan konvensi penamaan seperti ini: <file-prefix>-00000-of-00001. Secara opsional, Anda dapat menentukan akhiran nama file dan skema kompresi, seperti yang ditunjukkan dalam contoh.

Untuk memastikan penulisan idempoten, Dataflow menulis ke file sementara, lalu menyalin file sementara yang telah selesai ke file akhir. Untuk mengontrol tempat penyimpanan file sementara ini, gunakan withTempDirectory metode.

Langkah berikutnya