다음 예시에서는 PostgreSQL 데이터베이스에서 읽고 데이터를 텍스트 파일에 씁니다. 이 예에서는 PostgreSQL을 사용하지만 지원되는 다른 데이터베이스를 구성하는 것도 비슷합니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다.
자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
importcom.google.common.collect.ImmutableMap;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.PipelineResult;importorg.apache.beam.sdk.io.TextIO;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.TypeDescriptors;publicclassPostgresRead{publicinterfaceOptionsextendsPipelineOptions{@Description("The JDBC URL of the PostgreSQL database to read from.")StringgetJdbcUrl();voidsetJdbcUrl(Stringvalue);@Description("The PostgresSQL table to read from.")StringgetTable();voidsetTable(Stringvalue);@Description("The username for the PostgreSQL database.")StringgetUsername();voidsetUsername(Stringvalue);@Description("The password for the PostgreSQL database.")StringgetPassword();voidsetPassword(Stringvalue);@Description("The path to write the output file. Can be a local file path, "+"a GCS path, or a path to any other supported file systems.")StringgetOutputPath();voidsetOutputPath(Stringvalue);}publicstaticPipelineResult.Statemain(String[]args){// Parse the pipeline options passed into the application. Example:// --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE// --username=$USERNAME --password=$PASSWORD --outputPath=$OUTPUT_FILE// For more information, see// https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsvaroptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);Pipelinepipeline=createPipeline(options);returnpipeline.run().waitUntilFinish();}publicstaticPipelinecreatePipeline(Optionsoptions){// Create configuration parameters for the Managed I/O transform.ImmutableMap<String,Object>config=ImmutableMap.<String,Object>builder().put("jdbc_url",options.getJdbcUrl()).put("location",options.getTable()).put("username",options.getUsername()).put("password",options.getPassword()).build();// Build the pipeline.varpipeline=Pipeline.create(options);pipeline// Read data from a Postgres database using Managed I/O..apply(Managed.read(Managed.POSTGRES).withConfig(config)).getSinglePCollection()// Convert each row to a string..apply(MapElements.into(TypeDescriptors.strings()).via((row->String.format("%d,%s",row.getInt32("id"),row.getString("name")))))// Write strings to a text file..apply(TextIO.write().to(options.getOutputPath()).withSuffix(".txt").withNumShards(1));returnpipeline;}}