Membaca dari database ke Dataflow

Untuk membaca dari database relasional ke Dataflow, gunakan konektor I/O terkelola.

Dependensi

Tambahkan dependensi berikut ke project Anda:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-jdbc</artifactId>
  <version>${beam.version}</version>
</dependency>

Contoh

Contoh berikut membaca dari database PostgreSQL dan menulis data ke file teks. Meskipun contoh ini menggunakan PostgreSQL, konfigurasi database lain yang didukung serupa.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class PostgresRead {

  public interface Options extends PipelineOptions {
    @Description("The JDBC URL of the PostgreSQL database to read from.")
    String getJdbcUrl();

    void setJdbcUrl(String value);

    @Description("The PostgresSQL table to read from.")
    String getTable();

    void setTable(String value);

    @Description("The username for the PostgreSQL database.")
    String getUsername();

    void setUsername(String value);

    @Description("The password for the PostgreSQL database.")
    String getPassword();

    void setPassword(String value);

    @Description(
        "The path to write the output file. Can be a local file path, "
            + "a GCS path, or a path to any other supported file systems.")
    String getOutputPath();

    void setOutputPath(String value);
  }

  public static PipelineResult.State main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE
    //   --username=$USERNAME --password=$PASSWORD --outputPath=$OUTPUT_FILE
    // For more information, see
    // https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = createPipeline(options);
    return pipeline.run().waitUntilFinish();
  }

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config =
        ImmutableMap.<String, Object>builder()
            .put("jdbc_url", options.getJdbcUrl())
            .put("location", options.getTable())
            .put("username", options.getUsername())
            .put("password", options.getPassword())
            .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read data from a Postgres database using Managed I/O.
        .apply(Managed.read(Managed.POSTGRES).withConfig(config))
        .getSinglePCollection()
        // Convert each row to a string.
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via((row -> String.format("%d,%s", row.getInt32("id"), row.getString("name")))))
        // Write strings to a text file.
        .apply(TextIO.write().to(options.getOutputPath()).withSuffix(".txt").withNumShards(1));
    return pipeline;
  }
}

Langkah berikutnya