Membuat pipeline Dataflow menggunakan Python
Dokumen ini menunjukkan cara menggunakan Apache Beam SDK untuk Python guna membuat program yang menentukan pipeline. Kemudian, Anda menjalankan pipeline menggunakan runner lokal langsung atau runner berbasis cloud seperti Dataflow. Untuk pengantar tentang pipeline WordCount, lihat video Cara menggunakan WordCount di Apache Beam.
Untuk mengikuti panduan langkah demi langkah untuk tugas ini langsung di Google Cloud konsol, klik Pandu saya:
Sebelum memulai
- Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
-
Instal Google Cloud CLI.
-
Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.
-
Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:
gcloud init -
Buat atau pilih Google Cloud project.
Peran yang diperlukan untuk memilih atau membuat project
- Pilih project: Memilih project tidak memerlukan peran IAM tertentu—Anda dapat memilih project mana pun yang telah diberi peran.
-
Membuat project: Untuk membuat project, Anda memerlukan peran Pembuat Project
(
roles/resourcemanager.projectCreator), yang berisi izinresourcemanager.projects.create. Pelajari cara memberikan peran.
-
Buat Google Cloud project:
gcloud projects create PROJECT_ID
Ganti
PROJECT_IDdengan nama untuk Google Cloud project yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_IDdengan nama project Google Cloud Anda.
-
Verifikasi bahwa penagihan diaktifkan untuk project Google Cloud Anda.
Aktifkan Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, dan Cloud Resource Manager API:
Peran yang diperlukan untuk mengaktifkan API
Untuk mengaktifkan API, Anda memerlukan peran IAM Service Usage Admin (
roles/serviceusage.serviceUsageAdmin), yang berisi izinserviceusage.services.enable. Pelajari cara memberikan peran.gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Buat kredensial autentikasi lokal untuk akun pengguna Anda:
gcloud auth application-default login
Jika error autentikasi ditampilkan, dan Anda menggunakan penyedia identitas (IdP) eksternal, konfirmasi bahwa Anda telah login ke gcloud CLI dengan identitas gabungan Anda.
-
Memberikan peran ke akun pengguna Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Ganti kode berikut:
PROJECT_ID: Project ID Anda.USER_IDENTIFIER: ID untuk akun pengguna Anda. Misalnya,myemail@example.com.ROLE: Peran IAM yang Anda berikan ke akun pengguna Anda.
-
Instal Google Cloud CLI.
-
Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.
-
Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:
gcloud init -
Buat atau pilih Google Cloud project.
Peran yang diperlukan untuk memilih atau membuat project
- Pilih project: Memilih project tidak memerlukan peran IAM tertentu—Anda dapat memilih project mana pun yang telah diberi peran.
-
Membuat project: Untuk membuat project, Anda memerlukan peran Pembuat Project
(
roles/resourcemanager.projectCreator), yang berisi izinresourcemanager.projects.create. Pelajari cara memberikan peran.
-
Buat Google Cloud project:
gcloud projects create PROJECT_ID
Ganti
PROJECT_IDdengan nama untuk Google Cloud project yang Anda buat. -
Pilih project Google Cloud yang Anda buat:
gcloud config set project PROJECT_ID
Ganti
PROJECT_IDdengan nama project Google Cloud Anda.
-
Verifikasi bahwa penagihan diaktifkan untuk project Google Cloud Anda.
Aktifkan Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, dan Cloud Resource Manager API:
Peran yang diperlukan untuk mengaktifkan API
Untuk mengaktifkan API, Anda memerlukan peran IAM Service Usage Admin (
roles/serviceusage.serviceUsageAdmin), yang berisi izinserviceusage.services.enable. Pelajari cara memberikan peran.gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Buat kredensial autentikasi lokal untuk akun pengguna Anda:
gcloud auth application-default login
Jika error autentikasi ditampilkan, dan Anda menggunakan penyedia identitas (IdP) eksternal, konfirmasi bahwa Anda telah login ke gcloud CLI dengan identitas gabungan Anda.
-
Memberikan peran ke akun pengguna Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Ganti kode berikut:
PROJECT_ID: Project ID Anda.USER_IDENTIFIER: ID untuk akun pengguna Anda. Misalnya,myemail@example.com.ROLE: Peran IAM yang Anda berikan ke akun pengguna Anda.
Berikan peran ke akun layanan default Compute Engine Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Ganti
PROJECT_IDdengan project ID Anda. - Ganti
PROJECT_NUMBERdengan nomor project Anda. Untuk menemukan nomor project Anda, lihat Mengidentifikasi project atau gunakan perintahgcloud projects describe. - Ganti
SERVICE_ACCOUNT_ROLEdengan setiap peran individual.
-
Buat bucket Cloud Storage dan konfigurasikan sebagai berikut:
-
Tetapkan kelas penyimpanan ke
S(Standard). -
Tetapkan lokasi penyimpanan sebagai berikut:
US(Amerika Serikat). -
Ganti
BUCKET_NAMEdengan nama bucket yang unik. Jangan sertakan informasi sensitif pada nama bucket karena namespace bucket bersifat global dan dapat dilihat publik.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Tetapkan kelas penyimpanan ke
- Salin Google Cloud project ID dan nama bucket Cloud Storage. Anda memerlukan nilai ini nanti dalam dokumen ini.
Menyiapkan lingkungan Anda
Di bagian ini, gunakan command prompt untuk menyiapkan lingkungan virtual Python yang terisolasi untuk menjalankan project pipeline Anda dengan menggunakan venv. Proses ini memungkinkan Anda mengisolasi dependensi satu project dari dependensi project lainnya.
Jika Anda belum memiliki command prompt yang tersedia, Anda dapat menggunakan Cloud Shell. Cloud Shell sudah menginstal pengelola paket untuk Python 3, sehingga Anda dapat langsung membuat lingkungan virtual.
Untuk menginstal Python, lalu membuat lingkungan virtual, ikuti langkah-langkah berikut:
- Pastikan Anda telah menjalankan Python 3 dan
pipdi sistem Anda:python --version python -m pip --version
- Jika diperlukan, instal Python 3, lalu siapkan lingkungan virtual Python: ikuti petunjuk yang diberikan di bagian Menginstal Python dan Menyiapkan venv di halaman Menyiapkan lingkungan pengembangan Python.
Setelah menyelesaikan panduan memulai cepat, Anda dapat menonaktifkan lingkungan virtual dengan menjalankan deactivate.
Mendapatkan Apache Beam SDK
Apache Beam SDK adalah model pemrograman open source untuk pipeline data. Anda menentukan pipeline dengan program Apache Beam, lalu memilih runner, seperti Dataflow, untuk menjalankan pipeline Anda.
Untuk mendownload dan menginstal Apache Beam SDK, ikuti langkah-langkah berikut:
- Pastikan Anda berada di lingkungan virtual Python yang Anda buat di bagian sebelumnya.
Pastikan perintah dimulai dengan
<env_name>, denganenv_nameadalah nama lingkungan virtual. - Instal versi terbaru Apache Beam SDK untuk Python:
pip install apache-beam[gcp]
Menjalankan pipeline secara lokal
Untuk melihat cara menjalankan pipeline secara lokal, gunakan modul Python siap pakai untuk contoh wordcount
yang disertakan dengan paket apache_beam.
Contoh pipeline wordcount melakukan hal berikut:
Menggunakan file teks sebagai input.
File teks ini berada di bucket Cloud Storage dengan nama resource
gs://dataflow-samples/shakespeare/kinglear.txt.- Mengurai setiap baris menjadi kata-kata.
- Melakukan penghitungan frekuensi pada kata yang telah di-tokenisasi.
Untuk menyiapkan pipeline wordcount secara lokal, ikuti langkah-langkah berikut:
- Dari terminal lokal, jalankan contoh
wordcount:python -m apache_beam.examples.wordcount \ --output outputs
- Lihat output pipeline:
more outputs* - Untuk keluar, tekan q.
wordcount.py di GitHub Apache Beam.
Menjalankan pipeline di layanan Dataflow
Di bagian ini, jalankan contoh pipelinewordcount dari paket
apache_beam di layanan Dataflow. Contoh
ini menentukan DataflowRunner sebagai parameter untuk
--runner.
- Menjalankan pipeline:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Ganti kode berikut:
DATAFLOW_REGION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,europe-west1Flag
--regionmenggantikan region default yang ditetapkan di server metadata, klien lokal, atau variabel lingkungan.BUCKET_NAME: nama bucket Cloud Storage yang Anda salin sebelumnyaPROJECT_ID: Google Cloud project ID yang Anda salin sebelumnya
Melihat hasil penelusuran Anda
Saat Anda menjalankan pipeline menggunakan Dataflow, hasilnya akan disimpan di bucket Cloud Storage. Di bagian ini, verifikasi bahwa pipeline sedang berjalan menggunakan konsol Google Cloud atau terminal lokal.
KonsolGoogle Cloud
Untuk melihat hasil di konsol Google Cloud , ikuti langkah-langkah berikut:
- Di konsol Google Cloud , buka halaman Tugas Dataflow.
Halaman Tugas menampilkan detail tugas
wordcountAnda, termasuk status Berjalan pada awalnya, lalu Berhasil. - Buka halaman Bucket Cloud Storage.
Dari daftar bucket di project Anda, klik bucket penyimpanan yang Anda buat sebelumnya.
Di direktori
wordcount, file output yang dibuat oleh tugas Anda akan ditampilkan.
Terminal lokal
Lihat hasil dari terminal Anda atau menggunakan Cloud Shell.
- Untuk mencantumkan file output, gunakan perintah
gcloud storage ls:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- Untuk melihat hasil dalam file output, gunakan perintah
gcloud storage cat:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Ganti BUCKET_NAME dengan nama bucket Cloud Storage yang digunakan
dalam program pipeline.
Mengubah kode pipeline
Pipelinewordcount dalam contoh sebelumnya membedakan antara kata-kata berhuruf besar dan kecil.
Langkah-langkah berikut menunjukkan cara mengubah pipeline sehingga pipeline wordcount tidak peka huruf besar/kecil.
- Di komputer lokal Anda, download salinan terbaru kode
wordcountdari repositori GitHub Apache Beam. - Dari terminal lokal, jalankan pipeline:
python wordcount.py --output outputs
- Lihat hasilnya:
more outputs* - Untuk keluar, tekan q.
- Di editor pilihan Anda, buka file
wordcount.py. - Di dalam fungsi
run, periksa langkah-langkah pipeline:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Setelah
split, baris dibagi menjadi kata-kata sebagai string. - Untuk mengubah string menjadi huruf kecil, ubah baris setelah
split: Modifikasi ini memetakan fungsicounts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lowerke setiap kata. Baris ini setara denganbeam.Map(lambda word: str.lower(word)). - Simpan file dan jalankan tugas
wordcountyang diubah:python wordcount.py --output outputs
- Lihat hasil pipeline yang diubah:
more outputs* - Untuk keluar, tekan q.
- Jalankan pipeline yang telah diubah di layanan Dataflow:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Ganti kode berikut:
DATAFLOW_REGION: region tempat Anda ingin men-deploy tugas DataflowBUCKET_NAME: nama bucket Cloud Storage AndaPROJECT_ID: Project ID Google Cloud Anda
Pembersihan
Agar akun Google Cloud Anda tidak dikenai biaya untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.
- Di konsol Google Cloud , buka halaman Buckets Cloud Storage.
- Klik kotak centang untuk bucket yang ingin Anda dihapus.
- Untuk menghapus bucket, klik Hapus, lalu ikuti petunjuk.
Jika Anda mempertahankan project, cabut peran yang Anda berikan ke akun layanan default Compute Engine. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.
gcloud auth application-default revoke
-
Opsional: Cabut kredensial dari gcloud CLI.
gcloud auth revoke