Membuat pipeline menggunakan formulir builder di UI builder tugas

Tutorial ini menunjukkan cara menggunakan sintaksis YAML Apache Beam untuk membuat pipeline pemrosesan data Dataflow. Anda akan mempelajari cara membaca data dari file, menerapkan transformasi, dan menulis hasilnya ke file lain, menggunakan UI builder tugas di konsol Google Cloud . Tutorial ini ditujukan bagi developer yang baru menggunakan Apache Beam atau ingin mempelajari cara menggunakan YAML API untuk membangun pipeline.

Tabel berikut menunjukkan grafik pipeline di konsol Google Cloud dan spesifikasi YAML yang sesuai.

Grafik tugas Dataflow.
pipeline:
  transforms:
    - name: ReadFromCsv
      type: ReadFromCsv
      config:
        path: 'gs://[...]/restaurant-data.csv'
    - name: MapToFields
      type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          Lowercase_menu_item: Item.lower()
          Total_price: Price + Tax
        append: true
    - name: WriteToJson
      type: WriteToJson
      input: MapToFields
      config:
        path: 'gs://[...]/restaurant-data_map-fields.json'

Tujuan

Dalam tutorial ini, Anda akan mempelajari cara melakukan hal-hal berikut:

  • Buat pipeline YAML Beam yang membaca, menulis, dan mengubah data.
  • Memfilter data berdasarkan konten.
  • Memetakan kolom menggunakan ekspresi Python.
  • Menggunakan SQL untuk membuat kueri dan menggabungkan data.
  • Bangun dan jalankan pipeline YAML Beam menggunakan formulir builder di UI builder tugas di konsol Google Cloud .

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, baca bagian Pembersihan.

Sebelum memulai

Selesaikan langkah-langkah berikut sebelum menjalankan pipeline Anda.

Menyiapkan project

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Membuat bucket Cloud Storage

Sebelum dapat menjalankan pipeline, Anda harus membuat bucket Cloud Storage.

  1. Membuat bucket Cloud Storage:

    1. Di konsol Google Cloud , buka halaman Buckets Cloud Storage.

      Buka Buckets

    2. Klik Create.
    3. Di halaman Buat bucket, masukkan informasi bucket Anda. Untuk melanjutkan ke langkah berikutnya, klik Lanjutkan.
      1. Untuk Beri nama bucket Anda, masukkan nama bucket yang unik. Jangan sertakan informasi sensitif pada nama bucket, karena namespace bucket bersifat global dan dapat dilihat publik.
      2. Di bagian Pilih tempat untuk menyimpan data Anda, lakukan tindakan berikut:
        1. Pilih Jenis lokasi.
        2. Pilih lokasi tempat data bucket Anda disimpan secara permanen dari menu drop-down Location type.
          • Jika memilih jenis lokasi dual-region, Anda juga dapat memilih untuk mengaktifkan replikasi turbo dengan menggunakan kotak centang yang relevan.
        3. Untuk menyiapkan replikasi lintas bucket, pilih Add cross-bucket replication via Storage Transfer Service dan ikuti langkah-langkah berikut:

          Menyiapkan replikasi lintas bucket

          1. Di menu Bucket, pilih bucket.
          2. Di bagian Setelan replikasi, klik Konfigurasi untuk mengonfigurasi setelan bagi tugas replikasi.

            Panel Konfigurasi replikasi lintas bucket akan muncul.

            • Untuk memfilter objek yang akan direplikasi menurut awalan nama objek, masukkan awalan yang ingin Anda sertakan atau kecualikan objeknya, lalu klik Tambahkan awalan.
            • Untuk menetapkan kelas penyimpanan bagi objek yang direplikasi, pilih kelas penyimpanan dari menu Kelas penyimpanan. Jika Anda melewati langkah ini, objek yang direplikasi akan menggunakan kelas penyimpanan bucket tujuan secara default.
            • Klik Done.
      3. Di bagian Choose how to store your data, lakukan tindakan berikut:
        1. Di bagian Setel kelas default, pilih opsi berikut: Standard.
        2. Untuk mengaktifkan namespace hierarkis, di bagian Optimalkan penyimpanan untuk beban kerja intensif data, pilih Aktifkan namespace hierarkis di bucket ini.
      4. Di bagian Pilih cara mengontrol akses ke objek, pilih apakah bucket Anda menerapkan pencegahan akses publik atau tidak, lalu pilih metode kontrol akses untuk objek bucket Anda.
      5. Di bagian Pilih cara melindungi data objek, lakukan tindakan berikut:
        • Pilih salah satu opsi di bagian Perlindungan data yang ingin Anda tetapkan untuk bucket Anda.
          • Untuk mengaktifkan penghapusan sementara, klik kotak centang Kebijakan penghapusan sementara (Untuk pemulihan data), dan tentukan jumlah hari Anda ingin mempertahankan objek setelah penghapusan.
          • Untuk menyetel Pembuatan Versi Objek, klik kotak centang Pembuatan versi objek (Untuk kontrol versi), dan tentukan jumlah maksimum versi per objek dan jumlah hari setelah versi lama berakhir.
          • Untuk mengaktifkan kebijakan retensi pada objek dan bucket, klik kotak centang Retensi (Untuk kepatuhan), lalu lakukan hal berikut:
            • Untuk mengaktifkan Penguncian Retensi Objek, centang kotak Aktifkan retensi objek.
            • Untuk mengaktifkan Bucket Lock, centang kotak Setel kebijakan retensi bucket, lalu pilih satuan waktu dan durasi untuk periode retensi data Anda.
        • Untuk memilih cara mengenkripsi data objek Anda, luaskan bagian Enkripsi data (), lalu pilih metode Enkripsi data.
    4. Klik Create.
  2. Salin perintah berikut karena Anda akan memerlukannya di bagian selanjutnya:

    • Nama bucket Cloud Storage Anda.
    • ID project Google Cloud Anda.

    Untuk menemukan ID ini, lihat Mengidentifikasi project.

Jaringan VPC

Secara default, setiap project baru dimulai dengan jaringan default. Jika jaringan default untuk project Anda dinonaktifkan atau dihapus, Anda harus memiliki jaringan di project Anda yang akun penggunanya memiliki peran Compute Network User (roles/compute.networkUser).

Membaca, menulis, dan mengubah data

Bagian ini menunjukkan cara menggunakan sintaksis YAML Beam dengan Dataflow untuk membaca, menulis, dan memfilter data menggunakan hal berikut:

  • Pengembangan berbasis antarmuka pengguna untuk membuat dan menjalankan tugas di UI pembuat tugas di konsol Google Cloud . Secara khusus, Anda akan menggunakan formulir builder di UI builder tugas, sehingga Anda tidak perlu membuat file YAML secara manual.
  • Data file CSV yang disimpan di bucket Cloud Storage yang dapat dilihat secara publik. Data ini berisi data menu restoran tiruan dan terlihat seperti berikut:

    restaurant-data.csv

    Menu item,Category,Price,Tax
    Classic Cheeseburger,Entree,9.99,0.7
    Margherita Pizza,Entree,14.50,1.02
    Grilled Salmon with Asparagus,Entree,21.99,1.54
    Chicken Caesar Salad,Salad,12.75,0.89
    Spaghetti Carbonara,Entree,16.25,1.14
    Beef Tacos (3),Entree,10.50,0.74
    Vegetable Stir-Fry,Entree,13.00,0.91
    Shrimp Scampi,Entree,19.75,1.38
    Chicken Pot Pie,Entree,15.50,1.09
    Steak Frites,Entree,28.00,1.96
    Lobster Mac and Cheese,Entree,25.50,1.79
    Pork Belly Bao Buns (2),Appetizer/Side,11.25,0.79
    Mushroom Risotto,Entree,17.50,1.23
    Fish and Chips,Entree,14.00,0.98
    Buffalo Wings (6),Appetizer/Side,9.50,0.67
    French Onion Soup,Appetizer/Side,7.00,0.49
    Tomato Soup with Grilled Cheese,Appetizer/Side,10.00,0.7
    Avocado Toast,Appetizer/Side,8.50,0.6
    Quesadilla with Chicken,Appetizer/Side,11.75,0.82
    Pad Thai,Entree,15.00,1.05
    Chicken Tikka Masala,Entree,18.50,1.3
    Burrito Bowl,Entree,13.50,0.95
    Sushi Combo (8 pieces),Entree,22.00,1.54
    Greek Salad,Salad,11.00,0.77
    Clam Chowder,Appetizer/Side,8.00,0.56
    New York Cheesecake,Dessert,6.50,0.46
    Chocolate Lava Cake,Dessert,7.50,0.53
    Apple Pie,Dessert,5.00,0.35
    Tiramisu,Dessert,8.00,0.56
    Crème brûlée,Dessert,7.00,0.49
    Iced Coffee,Beverage,3.50,0.25
    Lemonade,Beverage,3.00,0.21
    Orange Juice,Beverage,4.00,0.28
    Soda,Beverage,2.50,0.18
    Craft Beer,Beverage,6.00,0.42
    Glass of Wine,Beverage,9.00,0.63
    Margarita,Beverage,12.00,0.84
    Moscow Mule,Beverage,11.50,0.81
    Old Fashioned,Beverage,13.00,0.91
    Espresso,Beverage,3.00,0.21
    Cappuccino,Beverage,4.50,0.32
    Latte,Beverage,5.00,0.35
    Mocha,Beverage,5.50,0.39
    Hot Chocolate,Beverage,4.00,0.28
    Breakfast Burrito,Breakfast,10.50,0.74
    Pancakes (3),Breakfast,8.00,0.56
    Waffles,Breakfast,9.00,0.63
    Eggs Benedict,Breakfast,14.00,0.98
    Omelette,Breakfast,11.00,0.77
    Fruit Salad,Salad,7.50,0.53
    Yogurt Parfait,Breakfast,6.00,0.42

Membaca dan memfilter data

Contoh berikut menunjukkan cara membaca data dari file CSV, memfilternya untuk mendapatkan informasi tertentu, dan menulis data yang difilter ke file JSON.

Contoh ini menggunakan Filter transformasi, yang memungkinkan Anda menyimpan data secara selektif yang memenuhi kriteria tertentu. Contoh berikut memfilter set data untuk hanya menyimpan data dengan Price yang lebih besar dari atau sama dengan 20.00.

Untuk membaca data CSV dan menampilkan konten JSON yang difilter, selesaikan langkah-langkah berikut:

  1. Di konsol Google Cloud , buka halaman Tugas Dataflow.

    Buka Tugas

  2. Klik Create Job From Builder.

  3. Di tab Job builder, biarkan Builder form dipilih.

  4. Di kolom Nama tugas, masukkan filter-python-job.

  5. Untuk Job type, biarkan Batch dipilih.

  6. Di bagian Sumber:

    1. Di kolom Nama sumber di panel Sumber baru, ubah nama menjadi ReadCsv.

    2. Dalam daftar Source type, pilih CSV from Cloud Storage.

    3. Di kolom CSV location, masukkan:

      cloud-samples-data/dataflow/tutorials/restaurant-data.csv
      
    4. Klik Done.

  7. Di bagian Transforms:

    1. Klik Tambahkan transformasi.

    2. Di kolom Nama transformasi, masukkan FilterPrice.

    3. Dalam daftar Jenis transformasi, pilih Filter (Python).

    4. Di kolom Python filter expression, masukkan Price >= 20.00.

    5. Dalam daftar Langkah input untuk transformasi, biarkan ReadCsv dipilih.

    6. Klik Done.

  8. Di bagian Sinks:

    1. Di kolom Sink name, ubah nama menjadi WriteJson.

    2. Dalam daftar Sink type, pilih JSON files on Cloud Storage.

    3. Di kolom JSON location, masukkan:

      BUCKET_NAME/output/restaurant-data_filtered.json
      

      Ganti BUCKET_NAME dengan nama bucket Cloud Storage Anda.

    4. Di daftar Input step for the sink, biarkan FilterPrice dipilih.

    5. Klik Done.

  9. Di bagian Dataflow Options, klik Run job.

Periksa output tugas

Setelah tugas selesai, selesaikan langkah-langkah berikut untuk melihat output dari pipeline:

  1. Di konsol Google Cloud , buka halaman Buckets Cloud Storage.

    Buka Bucket

  2. Di daftar bucket, klik nama bucket yang Anda buat di Membuat bucket Cloud Storage.

  3. Klik file bernama restaurant-data_filtered.json-00000-of-00001.

  4. Di halaman Object details, klik URL yang diautentikasi untuk melihat output pipeline.

Output-nya akan terlihat seperti berikut:

{"Item":"Grilled Salmon with Asparagus","Category":"Entree","Price":21.99,"Tax":1.54}
{"Item":"Steak Frites","Category":"Entree","Price":28.0,"Tax":1.96}
{"Item":"Lobster Mac and Cheese","Category":"Entree","Price":25.5,"Tax":1.79}
{"Item":"Sushi Combo (8 pieces)","Category":"Entree","Price":22.0,"Tax":1.54}

Memetakan kolom menggunakan Python

Dengan MapToFields transformasi, Anda dapat membuat kolom baru berdasarkan kolom yang ada. Contoh berikut membuat versi huruf kecil item menu, menghitung total harga, dan menambahkan nilai setelah nilai yang ada.

  1. Buka halaman Jobs Dataflow di konsolGoogle Cloud .

    Buka Tugas

  2. Klik Create job from builder.

  3. Di tab Job builder, biarkan Builder form dipilih.

  4. Di kolom Nama tugas, masukkan map-python-job.

  5. Untuk Job type, biarkan Batch dipilih.

  6. Di bagian Sumber:

    1. Di kolom Nama sumber di panel Sumber baru, ubah nama menjadi ReadFromCsvPy.

    2. Dalam daftar Source type, pilih CSV from Cloud Storage.

    3. Di kolom CSV location, masukkan:

      cloud-samples-data/dataflow/tutorials/restaurant-data.csv
      
    4. Klik Done.

  7. Di bagian Transforms:

    1. Klik Tambahkan transformasi.

    2. Di kolom Nama transformasi, masukkan MapToFieldsPy.

    3. Di daftar Transform type, pilih Map fields (Python).

    4. Biarkan Pertahankan kolom yang ada dipilih.

    5. Di bagian Kolom yang dipetakan, klik Tambahkan kolom.

    6. Di panel Kolom baru yang terbuka, masukkan Lowercase_menu_item sebagai Nama kolom.

    7. Di kolom Python expression, masukkan Item.lower().

    8. Klik Done.

    9. Di bagian Kolom yang dipetakan yang sama, klik Tambahkan kolom lagi.

    10. Di panel Kolom baru yang terbuka, masukkan Total_price sebagai Nama kolom.

    11. Di kolom Python expression, masukkan Price + Tax.

    12. Di panel New field ini, klik Done.

    13. Di panel New transform ini, klik Done.

  8. Di bagian Sinks:

    1. Di kolom Sink name, ubah nama menjadi WriteToJsonPy.

    2. Dalam daftar Sink type, pilih JSON files on Cloud Storage.

    3. Di kolom JSON location, masukkan:

      BUCKET_NAME/output/restaurant-data_map-fields.json
      

      Ganti BUCKET_NAME dengan nama bucket Cloud Storage Anda.

    4. Di daftar Input step for the sink, biarkan MapToFieldsPy dipilih.

    5. Klik Done.

  9. Di bagian Dataflow Options, klik Run job.

Periksa output tugas

Setelah tugas selesai, selesaikan langkah-langkah berikut untuk melihat output dari pipeline:

  1. Di konsol Google Cloud , buka halaman Buckets Cloud Storage.

    Buka Bucket

  2. Di daftar bucket, klik nama bucket yang Anda buat di Membuat bucket Cloud Storage.

  3. Klik file bernama restaurant-data_map-fields.json-00000-of-00001.

  4. Di halaman Object details, klik URL yang diautentikasi untuk melihat output pipeline.

Output-nya akan terlihat seperti berikut:

{"Item":"Classic Cheeseburger","Category":"Entree","Price":9.99,"Tax":0.7,"Lowercase_menu_item":"classic cheeseburger","Total_price":10.69}
{"Item":"Margherita Pizza","Category":"Entree","Price":14.5,"Tax":1.02,"Lowercase_menu_item":"margherita pizza","Total_price":15.52}
{"Item":"Grilled Salmon with Asparagus","Category":"Entree","Price":21.99,"Tax":1.54,"Lowercase_menu_item":"grilled salmon with asparagus","Total_price":23.53}
{"Item":"Chicken Caesar Salad","Category":"Salad","Price":12.75,"Tax":0.89,"Lowercase_menu_item":"chicken caesar salad","Total_price":13.64}
{"Item":"Spaghetti Carbonara","Category":"Entree","Price":16.25,"Tax":1.14,"Lowercase_menu_item":"spaghetti carbonara","Total_price":17.39}
{"Item":"Beef Tacos (3)","Category":"Entree","Price":10.5,"Tax":0.74,"Lowercase_menu_item":"beef tacos (3)","Total_price":11.24}
[...]

Mentransformasi data menggunakan SQL

Transformasi Sql memungkinkan Anda menjalankan kueri SQL pada data. Contoh berikut mengelompokkan item menu menurut kategori (seperti Entree, Beverage, atau Dessert) dan menambahkan kolom dengan jumlah item di setiap kategori.

Untuk menggunakan UI pembuat tugas guna membuat pipeline, ikuti langkah-langkah berikut:

  1. Buka halaman Jobs Dataflow di konsol Google Cloud .

    Buka Tugas

  2. Klik Create job from builder.

  3. Di tab Job builder, di kolom Job name, masukkan sql-transform-job.

  4. Untuk Job type, biarkan Batch dipilih.

  5. Di bagian Sumber:

    1. Di kolom Source name, ubah nama menjadi SqlTransformSource.

    2. Di tab New source, untuk Source type, pilih CSV from Cloud Storage. Kolom CSV location akan terbuka.

    3. Untuk CSV location, masukkan:

      cloud-samples-data/dataflow/tutorials/restaurant-data.csv
      
    4. Klik Done.

  6. Di bagian Transforms:

    1. Klik Tambahkan transformasi.

    2. Di kolom Transform name, perbarui nama menjadi SqlTransform.

    3. Untuk Transform type, pilih SQL transform. Opsi SQL transform akan terbuka.

    4. Di kolom SQL expression, masukkan:

      select Category, count(*) as category_count from PCOLLECTION group by Category
      
    5. Klik Done.

  7. Di bagian Sinks:

    1. Untuk Sink name, masukkan SqlTransformSink.

    2. Untuk Jenis sink, pilih File JSON di Cloud Storage. Opsi Tulis ke file JSON di Cloud Storage akan terbuka.

    3. Untuk JSON location, masukkan:

      BUCKET_NAME/output/restaurant-data_transform-sql.json
      

      Ganti BUCKET_NAME dengan nama bucket Cloud Storage Anda.

    4. Klik Done.

  8. Opsional: Lihat definisi YAML yang dihasilkan untuk pipeline ini.

    1. Buka bagian atas tab Job builder.

    2. Pilih Editor YAML. Anda akan melihat definisi YAML. Hasilnya akan terlihat seperti berikut:

      Spesifikasi YAML yang dihasilkan

      pipeline:
        transforms:
          - name: SqlTransformSource
            type: ReadFromCsv
            config:
              path: 'gs://cloud-samples-data/dataflow/tutorials/restaurant-data.csv'
          - name: SqlTransform
            type: Sql
            config:
              query: >-
                select Category, count(*) as category_count from PCOLLECTION group by
                Category
            input:
              input0: SqlTransformSource
          - name: SqlTransformSink
            type: WriteToJson
            input: SqlTransform
            config:
              path: 'gs://BUCKET_NAME/output/restaurant-data_transform-sql.json'
  9. Di bagian Dataflow Options, klik Run job.

Periksa output tugas

Setelah tugas selesai, selesaikan langkah-langkah berikut untuk melihat output dari pipeline:

  1. Di konsol Google Cloud , buka halaman Buckets Cloud Storage.

    Buka Bucket

  2. Di daftar bucket, klik nama bucket yang Anda buat di Membuat bucket Cloud Storage.

  3. Klik file bernama restaurant-data_transform-sql.json-00000-of-00001.

  4. Di halaman Object details, klik URL yang diautentikasi untuk melihat output pipeline.

Output-nya akan terlihat seperti berikut:

{"Category":"Entree","category_count":16}
{"Category":"Beverage","category_count":14}
{"Category":"Appetizer\/Side","category_count":7}
{"Category":"Dessert","category_count":5}
{"Category":"Breakfast","category_count":6}
{"Category":"Salad","category_count":3}

Pembersihan

Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.

Menghapus project

  1. Di Konsol Google Cloud , buka halaman Manage resources.

    Buka Kelola resource

  2. Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Menghapus resource satu per satu

Jika ingin menggunakan kembali project tersebut nanti, Anda dapat mempertahankan project tersebut, tetapi menghapus resource yang Anda buat selama tutorial.

Menghentikan pipeline Dataflow

  1. Di konsol Google Cloud , buka halaman Tugas Dataflow.

    Buka Tugas

  2. Klik tugas yang ingin Anda hentikan.

    Untuk menghentikan tugas, status tugas harus berjalan.

  3. Di halaman detail tugas, klik Stop.

  4. Klik Cancel.

  5. Untuk mengonfirmasi pilihan Anda, klik Hentikan Tugas.

Menghapus bucket Cloud Storage

  1. Di konsol Google Cloud , buka halaman Buckets Cloud Storage.

    Buka Buckets

  2. Klik kotak centang untuk bucket yang ingin Anda dihapus.
  3. Untuk menghapus bucket, klik Hapus, lalu ikuti petunjuk.

Langkah berikutnya