使用 Python 建立 Dataflow 管道
本文說明如何使用 Apache Beam SDK for Python 建構定義管道的程式。接著,您可以使用直接本機執行器或雲端執行器 (例如 Dataflow) 執行管道。如要瞭解 WordCount 管道,請觀看「如何在 Apache Beam 中使用 WordCount」影片。
如要直接在 Google Cloud 控制台中,按照這項工作的逐步指南操作,請按一下「Guide me」(逐步引導):
事前準備
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
將角色授予 Compute Engine 預設服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- 將
PROJECT_ID替換為專案 ID。 - 將
PROJECT_NUMBER替換為專案編號。 如要找出專案編號,請參閱「識別專案」一文,或使用gcloud projects describe指令。 - 將
SERVICE_ACCOUNT_ROLE替換為各個角色。
-
建立 Cloud Storage bucket,然後依照下列指示來設定 bucket:
-
將儲存空間級別設為
S(Standard)。 -
將儲存空間位置設定為下列區域:
US(美國)。 -
將
BUCKET_NAME替換成不重複的值區名稱。請勿在 bucket 名稱中加入任何機密資訊,因為 bucket 命名空間全域通用並會公開顯示。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
將儲存空間級別設為
- 複製 Google Cloud 專案 ID 和 Cloud Storage bucket 名稱。您會在本文後續步驟中用到這些值。
設定環境
在本節中,請使用命令提示字元設定獨立的 Python 虛擬環境,以便使用 venv 執行管道專案。這樣一來,您就能區隔不同專案的依附元件。
如果沒有命令提示字元可用,您可以使用 Cloud Shell。 Cloud Shell 已安裝 Python 3 的套件管理員,因此您可以略過這個步驟,直接建立虛擬環境。
如要安裝 Python 並建立虛擬環境,請按照下列步驟操作:
- 請確認系統已執行 Python 3 和
pip:python --version python -m pip --version
- 視需要安裝 Python 3,然後設定 Python 虛擬環境:按照「 設定 Python 開發環境」頁面的「安裝 Python」和「設定 venv」一節提供的操作說明進行。
完成快速入門後,您可以執行 deactivate 來停用虛擬環境。
取得 Apache Beam SDK
Apache Beam SDK 是用於資料管道的開放原始碼程式設計模型。您可以使用 Apache Beam 程式定義管道,然後選擇 Dataflow 等執行器來執行管道。
如要下載及安裝 Apache Beam SDK,請按照下列步驟操作:
- 確認您位於先前建立的 Python 虛擬環境中。
確認提示以
<env_name>開頭,其中env_name是虛擬環境的名稱。 - 安裝最新版本的 Python 適用的 Apache Beam SDK:
pip install apache-beam[gcp]
在本機執行管道
如要查看管道在本機的執行情況,請使用 apache_beam 套件隨附的 wordcount 範例專用 Python 模組。
wordcount 管道範例會執行下列操作:
擷取文字檔案做為輸入。
這個文字檔案位於資源名稱為
gs://dataflow-samples/shakespeare/kinglear.txt的 Cloud Storage 值區。- 將每一行剖析為字詞。
- 對代碼化的字詞執行頻率計數。
如要在本機暫存 wordcount 管道,請按照下列步驟操作:
- 從本機終端機執行
wordcount範例:python -m apache_beam.examples.wordcount \ --output outputs
- 查看管道的輸出內容:
more outputs* - 如要退出,請按 q。
wordcount.py 原始碼。在 Dataflow 服務上執行管道
在本節中,請在 Dataflow 服務上執行apache_beam 套件中的 wordcount 範例管道。這個範例會將 DataflowRunner 指定為 --runner 的參數。- 執行管道:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
更改下列內容:
DATAFLOW_REGION:您要部署 Dataflow 工作的區域,例如europe-west1--region標記會覆寫中繼資料伺服器、本機用戶端或環境變數中設定的預設區域。BUCKET_NAME:您先前複製的 Cloud Storage bucket 名稱PROJECT_ID:您先前複製的 Google Cloud 專案 ID
查看結果
透過 Dataflow 執行管道時,結果會儲存於 Cloud Storage bucket。在本節中,請使用 Google Cloud 控制台或本機終端機,確認管道正在執行。
Google Cloud 控制台
如要在 Google Cloud 控制台中查看結果,請按照下列步驟操作:
- 前往 Google Cloud 控制台的 Dataflow「Jobs」(工作) 頁面。
「Jobs」(工作) 頁面會顯示
wordcount工作的詳細資料,包括一開始的「Running」(執行中) 狀態,以及後來的「Succeeded」(成功) 狀態。 - 前往 Cloud Storage 的「Buckets」(值區) 頁面。
在專案的值區清單中,按一下您先前建立的儲存空間值區。
在
wordcount目錄中,會顯示工作建立的輸出檔案。
本機終端機
從終端機或使用 Cloud Shell 查看結果。
- 如要列出輸出檔案,請使用
gcloud storage ls指令:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- 如要查看輸出檔案中的結果,請使用
gcloud storage cat指令:gcloud storage cat gs://BUCKET_NAME/results/outputs*
請將 BUCKET_NAME 改成管道程式中使用的 Cloud Storage 值區名稱。
修改管道程式碼
上述範例中的wordcount 管道區分大寫和小寫字詞。
以下步驟說明如何修改管道,讓 wordcount 管道不區分大小寫。- 在本機電腦上,從 Apache Beam GitHub 存放區下載
wordcount程式碼的最新副本。 - 從本機終端機執行管道:
python wordcount.py --output outputs
- 查看結果:
more outputs* - 如要退出,請按 q。
- 在您選擇的編輯器中開啟
wordcount.py檔案。 - 在
run函式中,檢查管道步驟:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
在
split之後,這幾行將會以字串形式分割成字詞。 - 如要將字串改為小寫,請修改
split後的這幾行: 這項修改會將counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lower函式對應到每個字詞。這行相當於beam.Map(lambda word: str.lower(word))。 - 儲存檔案並執行修改後的
wordcount工作:python wordcount.py --output outputs
- 查看已修改管道的結果:
more outputs* - 如要退出,請按 q。
- 在 Dataflow 服務上執行修改後的管道:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
更改下列內容:
DATAFLOW_REGION:您要部署 Dataflow 工作的區域BUCKET_NAME:Cloud Storage bucket 名稱PROJECT_ID:您的 Google Cloud 專案 ID
清除所用資源
為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請刪除含有這些資源的 Google Cloud 專案。
- 前往 Google Cloud 控制台的 Cloud Storage「Buckets」(值區) 頁面。
- 按一下要刪除的值區旁的核取方塊。
- 如要刪除值區,請依序點選 「Delete」(刪除),然後按照指示操作。
如果保留專案,請撤銷您授予 Compute Engine 預設服務帳戶的角色。針對下列每個 IAM 角色執行一次下列指令:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
選用:撤銷您建立的驗證憑證,並刪除本機憑證檔案。
gcloud auth application-default revoke
-
選用:從 gcloud CLI 撤銷憑證。
gcloud auth revoke