Von Dataflow in Datenbanken schreiben
Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Wenn Sie Daten aus Dataflow in relationale Datenbanken schreiben möchten, verwenden Sie den verwalteten E/A-Connector.
Abhängigkeiten
Fügen Sie Ihrem Projekt die folgenden Abhängigkeiten hinzu:
Im folgenden Beispiel werden einige Beispieldatensätze in eine PostgreSQL-Datenbank geschrieben.
In diesem Beispiel wird PostgreSQL verwendet. Die Konfiguration anderer unterstützter Datenbanken ist jedoch ähnlich.
import staticorg.apache.beam.sdk.schemas.Schema.toSchema;importcom.google.common.collect.ImmutableMap;importjava.util.Arrays;importjava.util.List;importjava.util.stream.Stream;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.PipelineResult;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.schemas.Schema;importorg.apache.beam.sdk.transforms.Create;importorg.apache.beam.sdk.values.Row;publicclassPostgresWrite{privatestaticSchemaINPUT_SCHEMA=Stream.of(Schema.Field.of("id",Schema.FieldType.INT32),Schema.Field.of("name",Schema.FieldType.STRING)).collect(toSchema());privatestaticList<Row>ROWS=Arrays.asList(Row.withSchema(INPUT_SCHEMA).withFieldValue("id",1).withFieldValue("name","John Doe").build(),Row.withSchema(INPUT_SCHEMA).withFieldValue("id",2).withFieldValue("name","Jane Smith").build());publicinterfaceOptionsextendsPipelineOptions{@Description("The JDBC URL of the PostgreSQL database to write to.")StringgetJdbcUrl();voidsetJdbcUrl(Stringvalue);@Description("The PostgresSQL table to write to.")StringgetTable();voidsetTable(Stringvalue);@Description("The username for the PostgreSQL database.")StringgetUsername();voidsetUsername(Stringvalue);@Description("The password for the PostgreSQL database.")StringgetPassword();voidsetPassword(Stringvalue);}publicstaticPipelineResult.Statemain(String[]args){// Parse the pipeline options passed into the application. Example:// --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE// --username=$USERNAME --password=$PASSWORD// For more information, see// https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsvaroptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);Pipelinepipeline=createPipeline(options);returnpipeline.run().waitUntilFinish();}publicstaticPipelinecreatePipeline(Optionsoptions){// Create configuration parameters for the Managed I/O transform.ImmutableMap<String,Object>config=ImmutableMap.<String,Object>builder().put("jdbc_url",options.getJdbcUrl()).put("location",options.getTable()).put("username",options.getUsername()).put("password",options.getPassword()).build();// Build the pipeline.varpipeline=Pipeline.create(options);pipeline// Create data to write to Postgres..apply(Create.of(ROWS)).setRowSchema(INPUT_SCHEMA)// Write data to a Postgres database using Managed I/O..apply(Managed.write(Managed.POSTGRES).withConfig(config)).getSinglePCollection();returnpipeline;}}
[[["Leicht verständlich","easyToUnderstand","thumb-up"],["Mein Problem wurde gelöst","solvedMyProblem","thumb-up"],["Sonstiges","otherUp","thumb-up"]],[["Schwer verständlich","hardToUnderstand","thumb-down"],["Informationen oder Beispielcode falsch","incorrectInformationOrSampleCode","thumb-down"],["Benötigte Informationen/Beispiele nicht gefunden","missingTheInformationSamplesINeed","thumb-down"],["Problem mit der Übersetzung","translationIssue","thumb-down"],["Sonstiges","otherDown","thumb-down"]],["Zuletzt aktualisiert: 2026-05-26 (UTC)."],[],[]]