使用 Go 建立 Dataflow 管道
本頁說明如何使用 Go 適用的 Apache Beam SDK 建構定義管道的程式。接著,您會在本地和 Dataflow 服務上執行管道。如要瞭解 WordCount 管道,請觀看「如何在 Apache Beam 中使用 WordCount」影片。
事前準備
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON 和 Cloud Resource Manager API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON 和 Cloud Resource Manager API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
將角色授予 Compute Engine 預設服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- 將
PROJECT_ID替換為專案 ID。 - 將
PROJECT_NUMBER替換為專案編號。 如要找出專案編號,請參閱「識別專案」一文,或使用gcloud projects describe指令。 - 將
SERVICE_ACCOUNT_ROLE替換為各個角色。
-
建立 Cloud Storage bucket,然後依照下列指示來設定 bucket:
-
將儲存空間級別設為
S(Standard)。 -
將儲存空間位置設定為下列區域:
US(美國)。 -
將
BUCKET_NAME替換成不重複的值區名稱。請勿在 bucket 名稱中加入任何機密資訊,因為 bucket 命名空間全域通用並會公開顯示。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
將儲存空間級別設為
- 複製 Google Cloud 專案 ID 和 Cloud Storage bucket 名稱。 您會在後續步驟中用到這些值。
設定開發環境
Apache Beam SDK 是用於資料管道的開放原始碼程式設計模型。您可以使用 Apache Beam 程式定義管道,然後選擇執行器 (例如 Dataflow) 來執行管道。
使用 Go 適用的 Apache Beam SDK 時,建議您使用最新版 Go。如果尚未安裝最新版 Go,請參閱 Go 的下載及安裝指南,下載並安裝適用於您作業系統的 Go 版本。
如要確認已安裝的 Go 版本,請在本機終端機執行下列指令:
go version執行 Beam 字數範例
Go 適用的 Apache Beam SDK 包含wordcount管道範例。wordcount 範例會執行下列動作:
- 讀取文字檔案做為輸入。根據預設,這個範例會讀取位於 Cloud Storage 值區的文字檔,資源名稱為
gs://dataflow-samples/shakespeare/kinglear.txt。 - 將每一行剖析為字詞。
- 對代碼化的字詞執行頻率計數。
如要在本機電腦上執行最新版本的 Beam wordcount 範例,請完成下列步驟:
使用
git clone指令複製apache/beamGitHub 存放區:git clone https://github.com/apache/beam.git切換至
beam/sdks/go目錄:cd beam/sdks/go使用下列指令執行管道:
go run examples/wordcount/wordcount.go \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsinput標記會指定要讀取的檔案,output標記則會指定頻率計數輸出的檔案名稱。
管道完成後,請查看輸出結果:
more outputs*如要退出,請按 q。
修改管道程式碼
Beam wordcount 管道會區分大小寫字詞。下列步驟說明如何建立自己的 Go 模組、修改 wordcount 管道,讓管道不區分大小寫,並在 Dataflow 上執行。
建立 Go 模組
如要變更管道程式碼,請按照下列步驟操作。
在所選位置建立 Go 模組的目錄:
mkdir wordcountcd wordcount建立 Go 模組。在本範例中,請使用
example/dataflow做為模組路徑。go mod init example/dataflow從 Apache Beam GitHub 存放區下載
wordcount程式碼的最新副本。將這個檔案放入您建立的wordcount目錄。如果您使用非 Linux 作業系統,必須取得 Go
unix套件。必須有這個套件,才能在 Dataflow 服務上執行管道。go get -u golang.org/x/sys/unix確認
go.mod檔案與模組的原始碼相符:go mod tidy
執行未修改的管道
確認未經修改的 wordcount 管道可在本機執行。
在終端機中,建構並在本機執行管道:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs查看輸出結果:
more outputs*如要退出,請按 q。
變更管道程式碼
如要變更管道,使其不區分大小寫,請修改程式碼,將 strings.ToLower 函式套用至所有字詞。
在您選擇的編輯器中開啟
wordcount.go檔案。檢查
init區塊 (為求清楚起見,已移除註解):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }新增一行來註冊
strings.ToLower函式:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }檢查
CountWords函式:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }如要將字詞轉換為小寫,請新增 ParDo,並對每個字詞套用
strings.ToLower:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }儲存檔案。
在本機執行更新後的管道
在本機執行更新後的 wordcount 管道,並確認輸出內容已變更。
建構並執行修改後的
wordcount管道:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs查看修改後管道的輸出結果。所有字詞都應為小寫。
more outputs*如要退出,請按 q。
在 Dataflow 服務上執行管道
如要在 Dataflow 服務上執行更新後的 wordcount 範例,請使用下列指令:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/更改下列內容:
BUCKET_NAME:Cloud Storage bucket 名稱。PROJECT_ID: Google Cloud 專案 ID。DATAFLOW_REGION:您要部署 Dataflow 工作的區域。例如:europe-west1。 如需可用位置清單,請參閱「Dataflow 位置」。--region標記會覆寫中繼資料伺服器、本機用戶端或環境變數中設定的預設區域。
查看結果
您可以在Google Cloud 控制台中查看 Dataflow 工作清單。前往 Google Cloud 控制台的 Dataflow「Jobs」(工作) 頁面。
「Jobs」(工作) 頁面會顯示 wordcount 工作的詳細資料,包括一開始的「Running」(執行中) 狀態,以及後來的「Succeeded」(成功) 狀態。
透過 Dataflow 執行管道時,結果會儲存於 Cloud Storage bucket。使用Google Cloud 控制台或本機終端機查看輸出結果。
控制台
如要在 Google Cloud 控制台查看結果,請前往 Cloud Storage 的「Bucket」頁面。
在專案的值區清單中,按一下您先前建立的儲存空間值區。工作建立的輸出檔案會顯示在 results 目錄中。
終端機
從終端機或使用 Cloud Shell 查看結果。
如要列出輸出檔案,請使用
gcloud storage ls指令:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long將
BUCKET_NAME替換為指定輸出 Cloud Storage bucket 的名稱。如要查看輸出檔案中的結果,請使用
gcloud storage cat指令:gcloud storage cat gs://BUCKET_NAME/results/outputs*
清除所用資源
為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請刪除含有這些資源的 Google Cloud 專案。
- 前往 Google Cloud 控制台的 Cloud Storage「Buckets」(值區) 頁面。
- 按一下要刪除的值區旁的核取方塊。
- 如要刪除值區,請依序點選 「Delete」(刪除),然後按照指示操作。
如果保留專案,請撤銷您授予 Compute Engine 預設服務帳戶的角色。針對下列每個 IAM 角色執行一次下列指令:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
選用:撤銷您建立的驗證憑證,並刪除本機憑證檔案。
gcloud auth application-default revoke
-
選用:從 gcloud CLI 撤銷憑證。
gcloud auth revoke