בדוגמה הבאה מוצגות כמה רשומות לדוגמה שנכתבות למסד נתונים של PostgreSQL.
בדוגמה הזו נעשה שימוש ב-PostgreSQL, אבל ההגדרה של מסדי נתונים נתמכים אחרים דומה.
Java
כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials.
מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.
import staticorg.apache.beam.sdk.schemas.Schema.toSchema;importcom.google.common.collect.ImmutableMap;importjava.util.Arrays;importjava.util.List;importjava.util.stream.Stream;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.PipelineResult;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.schemas.Schema;importorg.apache.beam.sdk.transforms.Create;importorg.apache.beam.sdk.values.Row;publicclassPostgresWrite{privatestaticSchemaINPUT_SCHEMA=Stream.of(Schema.Field.of("id",Schema.FieldType.INT32),Schema.Field.of("name",Schema.FieldType.STRING)).collect(toSchema());privatestaticList<Row>ROWS=Arrays.asList(Row.withSchema(INPUT_SCHEMA).withFieldValue("id",1).withFieldValue("name","John Doe").build(),Row.withSchema(INPUT_SCHEMA).withFieldValue("id",2).withFieldValue("name","Jane Smith").build());publicinterfaceOptionsextendsPipelineOptions{@Description("The JDBC URL of the PostgreSQL database to write to.")StringgetJdbcUrl();voidsetJdbcUrl(Stringvalue);@Description("The PostgresSQL table to write to.")StringgetTable();voidsetTable(Stringvalue);@Description("The username for the PostgreSQL database.")StringgetUsername();voidsetUsername(Stringvalue);@Description("The password for the PostgreSQL database.")StringgetPassword();voidsetPassword(Stringvalue);}publicstaticPipelineResult.Statemain(String[]args){// Parse the pipeline options passed into the application. Example:// --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE// --username=$USERNAME --password=$PASSWORD// For more information, see// https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsvaroptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);Pipelinepipeline=createPipeline(options);returnpipeline.run().waitUntilFinish();}publicstaticPipelinecreatePipeline(Optionsoptions){// Create configuration parameters for the Managed I/O transform.ImmutableMap<String,Object>config=ImmutableMap.<String,Object>builder().put("jdbc_url",options.getJdbcUrl()).put("location",options.getTable()).put("username",options.getUsername()).put("password",options.getPassword()).build();// Build the pipeline.varpipeline=Pipeline.create(options);pipeline// Create data to write to Postgres..apply(Create.of(ROWS)).setRowSchema(INPUT_SCHEMA)// Write data to a Postgres database using Managed I/O..apply(Managed.write(Managed.POSTGRES).withConfig(config)).getSinglePCollection();returnpipeline;}}
[[["התוכן קל להבנה","easyToUnderstand","thumb-up"],["התוכן עזר לי לפתור בעיה","solvedMyProblem","thumb-up"],["סיבה אחרת","otherUp","thumb-up"]],[["התוכן קשה להבנה","hardToUnderstand","thumb-down"],["שגיאות בקוד לדוגמה או במידע","incorrectInformationOrSampleCode","thumb-down"],["חסרים לי פרטים או דוגמאות","missingTheInformationSamplesINeed","thumb-down"],["בעיה בתרגום","translationIssue","thumb-down"],["סיבה אחרת","otherDown","thumb-down"]],["עדכון אחרון: 2026-06-18 (שעון UTC)."],[],[]]