Bigtable 변경 내역 처리

이 튜토리얼에서는 Bigtable 테이블의 변경 내역에서 가져온 데이터베이스 변경의 실시간 스트림을 위해 데이터 파이프라인을 Dataflow에 배포하는 방법을 보여줍니다. 이 파이프라인의 출력은 Cloud Storage의 일련의 파일에 기록됩니다.

음악 듣기 애플리케이션의 예시 데이터 세트가 제공됩니다. 이 튜토리얼에서는 청취한 노래를 추적하여 특정 기간 동안 상위 5개의 항목에 대해 순위를 지정합니다.

이 튜토리얼은 Google Cloud에서 코드 작성 및 데이터 파이프라인 배포에 익숙한 기술 사용자를 대상으로 합니다.

환경 준비

코드 가져오기

샘플 코드가 있는 저장소를 클론합니다. 이전에 이 저장소를 이미 다운로드했으면 최신 버전을 가져옵니다.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

버킷 만들기

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    Bigtable 인스턴스를 만듭니다.

    이 튜토리얼의 기존 인스턴스를 사용하거나 가까운 리전의 기본 구성으로 인스턴스를 만들 수 있습니다.

    테이블 만들기

    샘플 애플리케이션은 사용자가 청취하는 노래를 추적하고 청취 이벤트를 Bigtable에 저장합니다. column family(cf) 1개와 column 1개(song)가 있고 row key에 사용자 ID를 사용하는 변경 내역이 사용 설정된 테이블을 만듭니다.

    테이블을 만듭니다.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    다음을 바꿉니다.

    • PROJECT_ID: 사용 중인 프로젝트의 ID입니다.
    • BIGTABLE_INSTANCE_ID: 새 테이블을 포함할 인스턴스의 ID입니다.

    파이프라인 시작

    이 파이프라인은 다음을 수행하여 변경 내역을 변환합니다.

    1. 변경 내역 읽기
    2. 노래 이름 가져오기
    3. 노래 청취 이벤트를 N초 단위로 그룹화
    4. 상위 5개 노래 집계
    5. 결과 출력

    파이프라인을 실행합니다.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    BIGTABLE_REGION을 Bigtable 인스턴스가 있는 리전의 ID(예: us-east5)로 바꿉니다.

    파이프라인 이해

    다음 파이프라인의 코드 스니펫은 실행 중인 코드를 이해하는 데 도움이 됩니다.

    변경 내역 읽기

    이 샘플의 코드는 특정 Bigtable 인스턴스 및 테이블의 매개변수로 소스 스트림을 구성합니다.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    노래 이름 가져오기

    노래를 들으면 노래 이름이 column family cf 및 column qualifier song에 기록되므로 코드는 변경 내역 변형에서 값을 추출하고 파이프라인의 다음 단계로 출력합니다.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    상위 5개 노래 집계

    기본 제공 Beam 함수 CountTop.of를 사용하여 현재 창에서 상위 5개 노래를 가져올 수 있습니다.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    결과 출력

    이 파이프라인은 결과를 표준 출력뿐만 아니라 파일에 기록합니다. 파일의 경우 쓰기를 10개 요소의 그룹 또는 1분 세그먼트로 분할합니다.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    파이프라인 보기

    1. Google Cloud 콘솔에서 Dataflow 페이지로 이동합니다.

      Dataflow로 이동

    2. 이름이 song-rank로 시작하는 작업을 클릭합니다.

    3. 화면 하단에서 표시를 클릭하여 로그 패널을 엽니다.

    4. 작업자 로그를 클릭하여 변경 내역의 출력 로그를 모니터링합니다.

    스트림 쓰기

    cbt CLI를 사용하여 다양한 사용자가 청취하는 곡 수를 song-rank 테이블에 씁니다. 이 기능은 시간에 따른 노래 스트리밍 듣기를 시뮬레이션하기 위해 몇 분 이상 작성되도록 설계되었습니다.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    출력 보기

    Cloud Storage의 출력을 읽고 가장 인기 있는 곡을 확인합니다.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    출력 예시:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]