다음 예시에서는 Apache Iceberg 테이블에서 읽고 데이터를 텍스트 파일에 씁니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다.
자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
importcom.google.common.collect.ImmutableMap;importjava.util.Map;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.io.TextIO;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.PCollectionRowTuple;importorg.apache.beam.sdk.values.TypeDescriptors;publicclassApacheIcebergRead{staticfinalStringCATALOG_TYPE="hadoop";publicinterfaceOptionsextendsPipelineOptions{@Description("The URI of the Apache Iceberg warehouse location")StringgetWarehouseLocation();voidsetWarehouseLocation(Stringvalue);@Description("Path to write the output file")StringgetOutputPath();voidsetOutputPath(Stringvalue);@Description("The name of the Apache Iceberg catalog")StringgetCatalogName();voidsetCatalogName(Stringvalue);@Description("The name of the table to write to")StringgetTableName();voidsetTableName(Stringvalue);}publicstaticvoidmain(String[]args){// Parse the pipeline options passed into the application. Example:// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \// --tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);Pipelinepipeline=Pipeline.create(options);// Configure the Iceberg source I/OMapcatalogConfig=ImmutableMap.<String,Object>builder().put("warehouse",options.getWarehouseLocation()).put("type",CATALOG_TYPE).build();ImmutableMap<String,Object>config=ImmutableMap.<String,Object>builder().put("table",options.getTableName()).put("catalog_name",options.getCatalogName()).put("catalog_properties",catalogConfig).build();// Build the pipeline.pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection()// Format each record as a string with the format 'id:name'..apply(MapElements.into(TypeDescriptors.strings()).via((row->{returnString.format("%d:%s",row.getInt64("id"),row.getString("name"));})))// Write to a text file..apply(TextIO.write().to(options.getOutputPath()).withNumShards(1).withSuffix(".txt"));pipeline.run().waitUntilFinish();}}