本教學課程使用 Pub/Sub 訂閱項目到 BigQuery 範本,透過 Google Cloud 控制台或 Google Cloud CLI 建立並執行 Dataflow 範本工作。本教學課程將逐步說明串流管道範例,該管道會從 Pub/Sub 讀取以 JSON 編碼的訊息,並將這些訊息寫入 BigQuery 資料表。
串流分析和資料整合管道會使用 Pub/Sub 擷取及發布資料。Pub/Sub 可讓您建立事件生產者和消費者系統,也就是發布者和訂閱者。發布者會以非同步方式將事件傳送至 Pub/Sub 服務,而 Pub/Sub 會將事件傳送至所有需要做出反應的服務。
Dataflow 是一項全代管服務,能夠轉換並充實串流 (即時) 模式和批次模式的資料,並利用 Apache Beam SDK 提供簡化的管道開發環境,轉換輸入資料,然後輸出轉換後的資料。
這個工作流程的優點是,您可以使用 UDF 轉換訊息資料,再將資料寫入 BigQuery。
針對這個情境執行 Dataflow 管道前,請考慮Pub/Sub BigQuery 訂閱項目搭配 UDF 是否符合您的需求。
目標
- 建立 Pub/Sub 主題。
- 建立含有資料表和結構定義的 BigQuery 資料集。
- 使用 Google 提供的串流範本,透過 Dataflow 將 Pub/Sub 訂閱項目中的資料串流至 BigQuery。
費用
在本文件中,您會使用下列 Google Cloud的計費元件:
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
如要根據預測用量估算費用,請使用 Pricing Calculator。
完成本文所述工作後,您可以刪除建立的資源,避免繼續計費,詳情請參閱「清除所用資源」。
事前準備
本節說明如何選取專案、啟用 API,以及將適當的角色授予使用者帳戶和工作者服務帳戶。
控制台
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.如要完成本教學課程中的步驟,使用者帳戶必須具備「服務帳戶使用者」角色。Compute Engine 預設服務帳戶必須具備下列角色:Dataflow 工作者、Dataflow 管理員、Pub/Sub 編輯者、Storage 物件管理員和 BigQuery 資料編輯者。如要在 Google Cloud 控制台中新增必要角色,請按照下列步驟操作:
前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。
前往「IAM」頁面- 選取專案。
- 在包含您使用者帳戶的資料列中,依序點選 「Edit principal」(編輯主體) 和 「Add another role」(新增其他角色)。
- 在下拉式選單中,選取「服務帳戶使用者」角色。
- 在包含 Compute Engine 預設服務帳戶的列中,依序點選 「Edit principal」(編輯主體) 和 「Add another role」(新增其他角色)。
- 在下拉式清單中,選取「Dataflow Worker」角色。
針對「Dataflow 管理員」、「Pub/Sub 編輯者」、「Storage 物件管理員」和「BigQuery 資料編輯者」角色重複上述步驟,然後按一下「儲存」。
如要進一步瞭解如何授予角色,請參閱「使用控制台授予 IAM 角色」。
gcloud
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Compute Engine、Dataflow、Cloud Logging、BigQuery、Pub/Sub、Cloud Storage、Resource Manager API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
如果您使用本機殼層,請為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果您使用 Cloud Shell,則不需要執行這項操作。
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Compute Engine、Dataflow、Cloud Logging、BigQuery、Pub/Sub、Cloud Storage、Resource Manager API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
如果您使用本機殼層,請為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果您使用 Cloud Shell,則不需要執行這項操作。
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
-
將角色授予 Compute Engine 預設服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/dataflow.adminroles/dataflow.workerroles/storage.adminroles/pubsub.editorroles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
更改下列內容:
PROJECT_ID:您的專案 ID。PROJECT_NUMBER:您的專案編號。 如要找出專案編號,請使用gcloud projects describe指令。SERVICE_ACCOUNT_ROLE:每個角色。
建立 Cloud Storage 值區
首先,請使用 Google Cloud 控制台或 Google Cloud CLI 建立 Cloud Storage bucket。Dataflow 管道會將這個 bucket 做為暫時儲存位置。
控制台
gcloud
使用 gcloud storage buckets create 指令:
gcloud storage buckets create gs://BUCKET_NAME
將 BUCKET_NAME 替換為符合值區命名規定的 Cloud Storage 值區名稱。Cloud Storage bucket 名稱不得重複。
建立 Pub/Sub 主題和訂閱項目
建立 Pub/Sub 主題,然後建立該主題的訂閱項目。
控制台
如要建立主題,請完成下列步驟。
前往 Google Cloud 控制台的 Pub/Sub「Topics」(主題) 頁面。
按一下「建立主題」。
在「主題 ID」欄位中,輸入主題的 ID。如要瞭解如何命名主題,請參閱主題或訂閱項目命名規範。
保留「新增預設訂閱項目」選項。 請勿選取其他選項。
點選「建立」。
- 在主題詳細資料頁面中,建立的訂閱項目名稱會列在「Subscription ID」下方。請記下這個值,以便在後續步驟中使用。
gcloud
如要建立主題,請執行 gcloud pubsub topics create 指令。如要瞭解如何命名訂閱項目,請參閱「主題或訂閱項目命名規範」。
gcloud pubsub topics create TOPIC_ID
將 TOPIC_ID 替換為 Pub/Sub 主題的名稱。
如要為主題建立訂閱項目,請執行 gcloud pubsub subscriptions create 指令:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
將 SUBSCRIPTION_ID 替換為 Pub/Sub 訂閱名稱。
建立 BigQuery 資料表
在這個步驟中,將建立具有下列結構定義的 BigQuery 資料表:
| 資料欄名稱 | 資料類型 |
|---|---|
name |
STRING |
customer_id |
INTEGER |
如果沒有 BigQuery 資料集,請先建立一個。詳情請參閱「建立資料集」。然後建立新的空白資料表:
控制台
前往「BigQuery」頁面。
在「Explorer」窗格中展開專案,然後選取資料集。
在「Dataset」(資料集) 資訊部分,按一下「Create table」(建立資料表)。
在「Create table from」(建立資料表來源) 清單中,選取「Empty table」(空白資料表)。
在「Table」(資料表) 方塊中,輸入資料表名稱。
在「Schema」(結構定義) 部分中,按一下「Edit as Text」(以文字形式編輯)。
貼上下列結構定義:
name:STRING, customer_id:INTEGER按一下「Create table」(建立資料表)。
gcloud
使用 bq mk 指令。
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
更改下列內容:
PROJECT_ID:專案 IDDATASET_NAME:資料集名稱TABLE_NAME:要建立的資料表名稱
執行管道
使用 Google 提供的「Pub/Sub 訂閱到 BigQuery」範本執行串流管道。管道會從 Pub/Sub 主題取得輸入資料,並將資料輸出至 BigQuery 資料集。
控制台
前往 Google Cloud 控制台的 Dataflow「Jobs」(工作) 頁面。
按一下「Create job from template」(依據範本建立工作)。
在「Job name」(工作名稱) 中輸入 Dataflow 工作的名稱。
在「區域端點」中,選取 Dataflow 工作的區域。
在「Dataflow template」(Dataflow 範本) 欄位中,選取「Pub/Sub Subscription to BigQuery」(Pub/Sub 訂閱項目到 BigQuery) 範本。
在「BigQuery output table」(BigQuery 輸出資料表) 中,選取「Browse」(瀏覽),然後選取 BigQuery 資料表。
在「Pub/Sub input subscription」清單中,選取 Pub/Sub 訂閱項目。
在「Temporary location」(臨時位置) 中輸入下列資訊:
gs://BUCKET_NAME/temp/將
BUCKET_NAME換成 Cloud Storage bucket 名稱。temp資料夾會儲存 Dataflow 工作的暫存檔案。按一下「Run Job」(執行工作)。
gcloud
如要在殼層或終端機中執行範本,請使用 gcloud dataflow jobs run 指令。
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
請替換下列變數:
JOB_NAME:工作名稱DATAFLOW_REGION:作業的區域PROJECT_ID:您 Google Cloud 專案的名稱SUBSCRIPTION_ID:Pub/Sub 訂閱項目名稱DATASET_NAME:BigQuery 資料集名稱TABLE_NAME:BigQuery 資料表的名稱
將訊息發布至 Pub/Sub
Dataflow 工作啟動後,您就可以將訊息發布至 Pub/Sub,管道會將訊息寫入 BigQuery。
控制台
在 Google Cloud 控制台,依序前往「Pub/Sub」>「Topics」(主題) 頁面。
在主題清單中,按一下主題名稱。
按一下「訊息」。
按一下「發布訊息」。
在「Number of messages」(訊息數) 中輸入
10。在「Message body」(訊息內文) 中輸入
{"name": "Alice", "customer_id": 1}。按一下「發布」。
gcloud
如要將訊息發布至主題,請使用 gcloud pubsub topics publish 指令。
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
將 TOPIC_ID 替換為主題名稱。
查看結果
查看寫入 BigQuery 資料表的資料。資料最多可能需要一分鐘才會顯示在資料表中。
控制台
前往 Google Cloud 控制台的「BigQuery」頁面。
前往 BigQuery 頁面在查詢編輯器中執行下列查詢:
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000請替換下列變數:
PROJECT_ID:您 Google Cloud專案的名稱DATASET_NAME:BigQuery 資料集名稱TABLE_NAME:BigQuery 資料表的名稱
gcloud
執行下列查詢,在 BigQuery 中查看結果:
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
請替換下列變數:
PROJECT_ID:您 Google Cloud專案的名稱DATASET_NAME:BigQuery 資料集名稱TABLE_NAME:BigQuery 資料表的名稱
使用 UDF 轉換資料
本教學課程假設 Pub/Sub 訊息的格式為 JSON,且 BigQuery 資料表結構定義與 JSON 資料相符。
您可以選擇提供 JavaScript user-defined function (UDF),在資料寫入 BigQuery 前轉換資料。UDF 可以執行其他處理作業,例如篩選、移除個人識別資訊 (PII),或使用其他欄位擴充資料。
詳情請參閱「為 Dataflow 範本建立 user-defined function」。
使用無效信件資料表
工作執行期間,管道可能無法將個別訊息寫入 BigQuery。可能的錯誤包括:
- 序列化錯誤,包括格式錯誤的 JSON。
- 類型轉換錯誤,是由於資料表結構定義與 JSON 資料不符所致。
- JSON 資料中不在資料表結構定義中的額外欄位。
管道會將這些錯誤寫入 BigQuery 的無效信件資料表。根據預設,管道會自動建立名為 TABLE_NAME_error_records 的死信資料表,其中 TABLE_NAME 是輸出資料表的名稱。如要使用其他名稱,請設定 outputDeadletterTable 範本參數。
清除所用資源
為避免因為本教學課程所用資源,導致系統向 Google Cloud 收取費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。
刪除專案
如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。 Google Cloud
控制台
- 前往 Google Cloud 控制台的「Manage resources」(管理資源) 頁面。
- 在專案清單中選取要刪除的專案,然後點選「Delete」(刪除)。
- 在對話方塊中輸入專案 ID,然後按一下 [Shut down] (關閉) 以刪除專案。
gcloud
刪除 Google Cloud 專案:
gcloud projects delete PROJECT_ID
刪除個別資源
如要重複使用專案,可以保留專案,但要刪除在本教學課程中建立的資源。
停止 Dataflow 管道
控制台
前往 Google Cloud 控制台的 Dataflow「Jobs」(工作) 頁面。
按一下要停止的工作。
如要停止工作,工作狀態必須為「執行中」。
在工作詳細資料頁面中,按一下「停止」。
按一下「取消」。
如要確認所選項目,請按一下「停止工作」。
gcloud
如要取消 Dataflow 工作,請使用 gcloud dataflow jobs 指令。
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
清理 Google Cloud 專案資源
控制台
刪除 Pub/Sub 主題和訂閱項目。
前往 Google Cloud 控制台的 Pub/Sub「主題」頁面。
選取您建立的主題。
按一下「刪除」以永久刪除主題。
前往 Google Cloud 控制台的 Pub/Sub「Subscriptions」(訂閱項目) 頁面。
選取以主題建立的訂閱項目。
按一下「刪除」即可永久刪除訂閱項目。
刪除 BigQuery 資料表和資料集。
前往 Google Cloud 控制台的「BigQuery」頁面。
在「Explorer」面板中展開專案。
在要刪除的資料集旁邊,依序按一下「View actions」(查看動作) 和「delete」(刪除)。
刪除 Cloud Storage 值區。
在 Google Cloud 控制台,前往「Cloud Storage bucket」頁面。
選取要刪除的值區,然後依序點選 「刪除」,並按照指示操作。
gcloud
如要刪除 Pub/Sub 訂閱項目和主題,請使用
gcloud pubsub subscriptions delete和gcloud pubsub topics delete指令。gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID如要刪除 BigQuery 資料表,請使用
bq rm指令。bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial刪除 BigQuery 資料集。單獨使用資料集不會產生任何費用。
bq rm -r -f -d PROJECT_ID:tutorial_dataset如要刪除 Cloud Storage bucket 及其物件,請使用
gcloud storage rm指令。單獨使用儲存空間不會產生任何費用。gcloud storage rm gs://BUCKET_NAME --recursive
撤銷憑證
控制台
如果保留專案,請撤銷您授予 Compute Engine 預設服務帳戶的角色。
- 前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。
選取專案、資料夾或機構。
找出包含要撤銷存取權主體的資料列。 在該列中,按一下 「編輯主體」。
針對您要撤銷的每個角色按一下「刪除」圖示 按鈕,然後按一下「儲存」。
gcloud
- 如果保留專案,請撤銷您授予 Compute Engine 預設服務帳戶的角色。針對下列每個 IAM 角色,執行一次下列指令:
roles/dataflow.adminroles/dataflow.workerroles/storage.adminroles/pubsub.editorroles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var> -
選用:撤銷您建立的驗證憑證,並刪除本機憑證檔案。
gcloud auth application-default revoke
-
選用:從 gcloud CLI 撤銷憑證。
gcloud auth revoke
後續步驟
- 使用 UDF 擴充 Dataflow 範本。
- 進一步瞭解如何使用 Dataflow 範本。
- 查看所有 Google 提供的範本。
- 請參閱建立及使用主題的 Pub/Sub 說明,以及如何建立提取訂閱項目。
- 請參閱這篇文章,瞭解如何使用 BigQuery 建立資料集。
- 瞭解 Pub/Sub 訂閱項目。
- 查看 Google Cloud 的參考架構、圖表和最佳做法。歡迎瀏覽我們的 Cloud Architecture Center。