Lire des données depuis des bases de données vers Dataflow

Pour lire des données de bases de données relationnelles dans Dataflow, utilisez le connecteur d'E/S géré.

Dépendances

Ajoutez les dépendances suivantes au projet :

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-jdbc</artifactId>
  <version>${beam.version}</version>
</dependency>

Exemple

L'exemple suivant lit une base de données PostgreSQL et écrit les données dans des fichiers texte. Bien que cet exemple utilise PostgreSQL, la configuration des autres bases de données compatibles est similaire.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class PostgresRead {

  public interface Options extends PipelineOptions {
    @Description("The JDBC URL of the PostgreSQL database to read from.")
    String getJdbcUrl();

    void setJdbcUrl(String value);

    @Description("The PostgresSQL table to read from.")
    String getTable();

    void setTable(String value);

    @Description("The username for the PostgreSQL database.")
    String getUsername();

    void setUsername(String value);

    @Description("The password for the PostgreSQL database.")
    String getPassword();

    void setPassword(String value);

    @Description(
        "The path to write the output file. Can be a local file path, "
            + "a GCS path, or a path to any other supported file systems.")
    String getOutputPath();

    void setOutputPath(String value);
  }

  public static PipelineResult.State main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE
    //   --username=$USERNAME --password=$PASSWORD --outputPath=$OUTPUT_FILE
    // For more information, see
    // https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = createPipeline(options);
    return pipeline.run().waitUntilFinish();
  }

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config =
        ImmutableMap.<String, Object>builder()
            .put("jdbc_url", options.getJdbcUrl())
            .put("location", options.getTable())
            .put("username", options.getUsername())
            .put("password", options.getPassword())
            .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read data from a Postgres database using Managed I/O.
        .apply(Managed.read(Managed.POSTGRES).withConfig(config))
        .getSinglePCollection()
        // Convert each row to a string.
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via((row -> String.format("%d,%s", row.getInt32("id"), row.getString("name")))))
        // Write strings to a text file.
        .apply(TextIO.write().to(options.getOutputPath()).withSuffix(".txt").withNumShards(1));
    return pipeline;
  }
}

Étapes suivantes