Menulis dari Dataflow ke database

Untuk menulis dari Dataflow ke database relasional, gunakan konektor I/O terkelola.

Dependensi

Tambahkan dependensi berikut ke project Anda:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-jdbc</artifactId>
  <version>${beam.version}</version>
</dependency>

Contoh

Contoh berikut menulis beberapa contoh kumpulan data ke database PostgreSQL. Meskipun contoh ini menggunakan PostgreSQL, konfigurasi database lain yang didukung serupa.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import static org.apache.beam.sdk.schemas.Schema.toSchema;

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;

public class PostgresWrite {

  private static Schema INPUT_SCHEMA =
      Stream.of(
              Schema.Field.of("id", Schema.FieldType.INT32),
              Schema.Field.of("name", Schema.FieldType.STRING))
          .collect(toSchema());

  private static List<Row> ROWS =
      Arrays.asList(
          Row.withSchema(INPUT_SCHEMA)
              .withFieldValue("id", 1)
              .withFieldValue("name", "John Doe")
              .build(),
          Row.withSchema(INPUT_SCHEMA)
              .withFieldValue("id", 2)
              .withFieldValue("name", "Jane Smith")
              .build());

  public interface Options extends PipelineOptions {
    @Description("The JDBC URL of the PostgreSQL database to write to.")
    String getJdbcUrl();

    void setJdbcUrl(String value);

    @Description("The PostgresSQL table to write to.")
    String getTable();

    void setTable(String value);

    @Description("The username for the PostgreSQL database.")
    String getUsername();

    void setUsername(String value);

    @Description("The password for the PostgreSQL database.")
    String getPassword();

    void setPassword(String value);
  }

  public static PipelineResult.State main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE
    //   --username=$USERNAME --password=$PASSWORD
    // For more information, see
    // https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = createPipeline(options);
    return pipeline.run().waitUntilFinish();
  }

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config =
        ImmutableMap.<String, Object>builder()
            .put("jdbc_url", options.getJdbcUrl())
            .put("location", options.getTable())
            .put("username", options.getUsername())
            .put("password", options.getPassword())
            .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Create data to write to Postgres.
        .apply(Create.of(ROWS))
        .setRowSchema(INPUT_SCHEMA)
        // Write data to a Postgres database using Managed I/O.
        .apply(Managed.write(Managed.POSTGRES).withConfig(config))
        .getSinglePCollection();
    return pipeline;
  }
}

Langkah berikutnya