Memproses aliran data perubahan Bigtable

Tutorial ini menunjukkan cara men-deploy pipeline data ke Dataflow untuk aliran perubahan database real-time yang bersumber dari aliran perubahan tabel Bigtable. Output pipeline ditulis ke serangkaian file di Cloud Storage.

Contoh set data untuk aplikasi mendengarkan musik disediakan. Dalam tutorial ini, Anda akan melacak lagu yang didengarkan, lalu memberi peringkat lima lagu teratas selama jangka waktu tertentu.

Tutorial ini ditujukan bagi pengguna teknis yang sudah terbiasa menulis kode dan men-deploy pipeline data ke Google Cloud.

Tujuan

Tutorial ini menunjukkan kepada Anda cara melakukan hal berikut:

  • Buat tabel Bigtable dengan aliran perubahan yang diaktifkan.
  • Men-deploy pipeline di Dataflow yang mentransformasi dan menghasilkan aliran perubahan.
  • Lihat hasil pipeline data Anda.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, baca bagian Pembersihan.

Sebelum memulai

    Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.

    Instal Google Cloud CLI. Setelah penginstalan, inisialisasi Google Cloud CLI dengan menjalankan perintah berikut:

    gcloud init

    Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

    Buat atau pilih Google Cloud project.

    Peran yang diperlukan untuk memilih atau membuat project

    • Pilih project: Memilih project tidak memerlukan peran IAM tertentu—Anda dapat memilih project mana pun yang telah diberi peran.
    • Membuat project: Untuk membuat project, Anda memerlukan peran Pembuat Project (roles/resourcemanager.projectCreator), yang berisi izin resourcemanager.projects.create. Pelajari cara memberikan peran.
    • Buat Google Cloud project:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk Google Cloud project yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

    Verifikasi bahwa penagihan diaktifkan untuk project Google Cloud Anda.

    Aktifkan Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, dan Cloud Storage API:

    Peran yang diperlukan untuk mengaktifkan API

    Untuk mengaktifkan API, Anda memerlukan peran IAM Service Usage Admin (roles/serviceusage.serviceUsageAdmin), yang berisi izin serviceusage.services.enable. Pelajari cara memberikan peran.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com

    Instal Google Cloud CLI. Setelah penginstalan, inisialisasi Google Cloud CLI dengan menjalankan perintah berikut:

    gcloud init

    Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

    Buat atau pilih Google Cloud project.

    Peran yang diperlukan untuk memilih atau membuat project

    • Pilih project: Memilih project tidak memerlukan peran IAM tertentu—Anda dapat memilih project mana pun yang telah diberi peran.
    • Membuat project: Untuk membuat project, Anda memerlukan peran Pembuat Project (roles/resourcemanager.projectCreator), yang berisi izin resourcemanager.projects.create. Pelajari cara memberikan peran.
    • Buat Google Cloud project:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk Google Cloud project yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

    Verifikasi bahwa penagihan diaktifkan untuk project Google Cloud Anda.

    Aktifkan Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, dan Cloud Storage API:

    Peran yang diperlukan untuk mengaktifkan API

    Untuk mengaktifkan API, Anda memerlukan peran IAM Service Usage Admin (roles/serviceusage.serviceUsageAdmin), yang berisi izin serviceusage.services.enable. Pelajari cara memberikan peran.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  1. Update dan instal CLI cbt .
    gcloud components update
    gcloud components install cbt

Menyiapkan lingkungan

Mendapatkan kode

Buat clone repositori yang berisi kode contoh. Jika sebelumnya Anda telah mendownload repositori ini, tarik untuk mendapatkan versi terbaru.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Membuat bucket

  • Membuat bucket Cloud Storage:
    gcloud storage buckets create gs://BUCKET_NAME
    Ganti BUCKET_NAME dengan nama bucket yang memenuhi persyaratan penamaan bucket.
  • Membuat instance Bigtable

    Anda dapat menggunakan instance yang ada untuk tutorial ini atau membuat instance dengan konfigurasi default di region terdekat.

    Membuat tabel

    Aplikasi contoh melacak lagu yang didengarkan pengguna dan menyimpan peristiwa dengarkan di Bigtable. Buat tabel dengan aliran perubahan yang diaktifkan yang memiliki satu grup kolom (cf) dan satu kolom (lagu) serta menggunakan ID pengguna untuk kunci baris.

    Buat tabel.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Ganti kode berikut:

    • PROJECT_ID: ID project yang Anda gunakan
    • BIGTABLE_INSTANCE_ID: ID instance yang akan berisi tabel baru

    Memulai pipeline

    Pipeline ini mengubah aliran perubahan dengan melakukan hal berikut:

    1. Membaca aliran perubahan
    2. Mendapatkan nama lagu
    3. Mengelompokkan peristiwa mendengarkan lagu ke dalam jendela N detik
    4. Menghitung lima lagu teratas
    5. Menampilkan hasil

    Menjalankan pipeline.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Ganti BIGTABLE_REGION dengan ID region tempat instance Bigtable Anda berada, seperti us-east5.

    Memahami pipeline

    Cuplikan kode berikut dari pipeline dapat membantu Anda memahami kode yang Anda jalankan.

    Membaca aliran perubahan

    Kode dalam contoh ini mengonfigurasi aliran sumber dengan parameter untuk instance dan tabel Bigtable tertentu.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Mendapatkan nama lagu

    Saat lagu didengarkan, nama lagu ditulis ke grup kolom cf dan penentu kolom song, sehingga kode mengekstrak nilai dari mutasi aliran perubahan dan menampilkannya ke langkah berikutnya dalam pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Menghitung lima lagu teratas

    Anda dapat menggunakan fungsi Beam bawaan Count dan Top.of untuk mendapatkan lima lagu teratas di jendela saat ini.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Menampilkan hasil

    Pipeline ini menulis hasil ke output standar serta file. Untuk file, file ini mengelompokkan penulisan ke dalam grup 10 elemen atau segmen satu menit.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Melihat pipeline

    1. Di konsol Google Cloud , buka halaman Dataflow.

      Buka Dataflow

    2. Klik tugas dengan nama yang diawali dengan song-rank.

    3. Di bagian bawah layar, klik Tampilkan untuk membuka panel log.

    4. Klik Worker logs untuk memantau log output aliran perubahan.

    Penulisan streaming

    Gunakan CLI cbt untuk menulis sejumlah pemutaran lagu untuk berbagai pengguna ke tabel song-rank. Tindakan ini dirancang untuk menulis selama beberapa menit guna menyimulasikan streaming lagu yang didengarkan dari waktu ke waktu.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Melihat output

    Baca output di Cloud Storage untuk melihat lagu yang paling populer.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Contoh output:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    Pembersihan

    Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.

    Menghapus project

      Menghapus Google Cloud project:

      gcloud projects delete PROJECT_ID

    Menghapus resource satu per satu

    1. Hapus bucket dan file.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Nonaktifkan aliran perubahan pada tabel.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Hapus tabel song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Hentikan pipeline aliran perubahan.

      1. Buat daftar tugas untuk mendapatkan ID tugas.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Membatalkan tugas.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Ganti JOB_ID dengan ID tugas yang ditampilkan setelah perintah sebelumnya.

    Langkah berikutnya