使用 Dataflow 和 Cloud Storage 串流 Pub/Sub 訊息
Dataflow 是一項全代管服務,能夠轉換並充實串流 (即時) 模式和批次模式的資料,讓資料維持同等的穩定與明確性,並利用 Apache Beam SDK 提供簡化的管道開發環境。這個 SDK 具備多項時間區間設定與工作階段分析基元,以及來源與接收連接器生態系統。本快速入門導覽課程說明如何使用 Dataflow 執行下列作業:
- 讀取發布至 Pub/Sub 主題的訊息
- 依時間戳記建立訊息視窗或分組
- 將訊息寫入 Cloud Storage
本快速入門導覽課程將介紹如何使用 Java 和 Python 中的 Dataflow。也支援 SQL。這個快速入門導覽課程也提供 Google Cloud Skills Boost 教學課程,其中提供臨時憑證,協助您開始使用。
如果您不打算進行自訂資料處理,也可以先使用以 UI 為基礎的 Dataflow 範本。
事前準備
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Storage JSON API、Pub/Sub、Resource Manager 和 Cloud Scheduler API: Google Cloud
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
設定驗證方法:
-
確認您具備「建立服務帳戶」身分與存取權管理角色 (
roles/iam.serviceAccountCreator) 和「專案 IAM 管理員」角色 (roles/resourcemanager.projectIamAdmin)。瞭解如何授予角色。 -
建立服務帳戶:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
將
SERVICE_ACCOUNT_NAME換成服務帳戶的名稱。 -
將角色授予服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
請替換下列項目:
SERVICE_ACCOUNT_NAME:服務帳戶名稱PROJECT_ID:您建立服務帳戶的專案 IDROLE:要授予的角色
-
將必要角色指派給要將服務帳戶附加至其他資源的主體。
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
更改下列內容:
SERVICE_ACCOUNT_NAME:服務帳戶名稱PROJECT_ID:您建立服務帳戶的專案 IDUSER_EMAIL:Google 帳戶的電子郵件地址
-
確認您具備「建立服務帳戶」身分與存取權管理角色 (
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Storage JSON API、Pub/Sub、Resource Manager 和 Cloud Scheduler API: Google Cloud
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
設定驗證方法:
-
確認您具備「建立服務帳戶」身分與存取權管理角色 (
roles/iam.serviceAccountCreator) 和「專案 IAM 管理員」角色 (roles/resourcemanager.projectIamAdmin)。瞭解如何授予角色。 -
建立服務帳戶:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
將
SERVICE_ACCOUNT_NAME換成服務帳戶的名稱。 -
將角色授予服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
請替換下列項目:
SERVICE_ACCOUNT_NAME:服務帳戶名稱PROJECT_ID:您建立服務帳戶的專案 IDROLE:要授予的角色
-
將必要角色指派給要將服務帳戶附加至其他資源的主體。
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
更改下列內容:
SERVICE_ACCOUNT_NAME:服務帳戶名稱PROJECT_ID:您建立服務帳戶的專案 IDUSER_EMAIL:Google 帳戶的電子郵件地址
-
確認您具備「建立服務帳戶」身分與存取權管理角色 (
-
為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
設定 Pub/Sub 專案
-
為 bucket、專案和區域建立變數。 Cloud Storage bucket 名稱在全域範圍內都不可重複。選取靠近您執行本快速入門指令的 Dataflow區域。
REGION變數的值必須是有效的區域名稱。如要進一步瞭解地區和位置,請參閱「Dataflow 位置」。BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
建立這個專案擁有的 Cloud Storage bucket:
gcloud storage buckets create gs://$BUCKET_NAME
-
在本專案中建立 Pub/Sub 主題:
gcloud pubsub topics create $TOPIC_ID
-
在本專案建立 Cloud Scheduler 工作,這項工作會每隔一分鐘將訊息發布至 Pub/Sub 主題。
如果專案沒有 App Engine 應用程式,這個步驟會建立一個。
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
啟動工作。
gcloud scheduler jobs run publisher-job --location=$REGION
-
使用下列指令複製快速入門存放區,並前往程式碼範例目錄:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
將訊息從 Pub/Sub 串流至 Cloud Storage
程式碼範例
這個程式碼範例會使用 Dataflow 執行下列作業:
- 讀取 Pub/Sub 訊息。
- 根據發布時間戳記,建立訊息視窗或將訊息分組至固定長度的時間間隔。
將每個視窗中的訊息寫入 Cloud Storage 的檔案。
Java
Python
啟動管道
如要啟動管道,請執行下列指令:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
上述指令會在本地端執行,並啟動在雲端執行的 Dataflow 工作。當指令傳回 JOB_MESSAGE_DETAILED: Workers
have started successfully 時,請使用 Ctrl+C 結束本機程式。
觀察工作和管道進度
您可以在 Dataflow 控制台中查看工作進度。
開啟工作詳細資料檢視畫面,即可查看:
- 工作結構
- 工作記錄
- 階段指標
您可能需要稍候幾分鐘,才能在 Cloud Storage 看到輸出檔案。
或者,使用下列指令列檢查哪些檔案已寫入。
gcloud storage ls gs://${BUCKET_NAME}/samples/
輸出內容應如下所示:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1清除所用資源
為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請刪除含有這些資源的 Google Cloud 專案。
刪除 Cloud Scheduler 工作。
gcloud scheduler jobs delete publisher-job --location=$REGION
在 Dataflow 控制台中停止工作。取消管道,但不要排空管道。
刪除主題。
gcloud pubsub topics delete $TOPIC_ID
刪除管道建立的檔案。
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
移除 Cloud Storage bucket。
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
刪除服務帳戶:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
選用:撤銷您建立的驗證憑證,並刪除本機憑證檔案。
gcloud auth application-default revoke
-
選用:從 gcloud CLI 撤銷憑證。
gcloud auth revoke
後續步驟
如要依自訂時間戳記建立 Pub/Sub 訊息視窗,您可以在 Pub/Sub 訊息中將時間戳記指定為屬性,然後搭配 PubsubIO's
withTimestampAttribute使用自訂時間戳記。歡迎查看 Google 專為串流設計的開放原始碼 Dataflow 範本。
進一步瞭解 Dataflow 如何與 Pub/Sub 整合。
請參閱這篇教學課程,瞭解如何使用 Dataflow Flex 範本從 Pub/Sub 讀取資料,並寫入 BigQuery。
如要進一步瞭解視窗化,請參閱 Apache Beam 行動遊戲管道範例。