使用 Dataflow 範本建立串流管道
本快速入門導覽課程說明如何使用 Google 提供的 Dataflow 範本建立串流管道。具體來說,本快速入門導覽課程會以 Pub/Sub 到 BigQuery 範本為例。
Pub/Sub 到 BigQuery 範本為串流管道,可從 Pub/Sub 主題讀取 JSON 格式訊息,並將其寫入 BigQuery 資料表。
如要直接在 Google Cloud 控制台中,按照這項工作的逐步指南操作,請按一下「Guide me」(逐步引導):
事前準備
請先完成下列步驟,再執行管道。
設定專案
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
必要的角色
如要完成本快速入門導覽課程,您需要下列 Identity and Access Management (IAM) 角色。
如要取得完成本快速入門導覽課程所需的權限,請要求管理員在專案中授予您下列 IAM 角色:
-
BigQuery 使用者 (
roles/bigquery.user) -
Dataflow 管理員 (
roles/dataflow.admin) -
服務帳戶使用者 (
roles/iam.serviceAccountUser) - 儲存空間管理員 (
roles/storage.admin)
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
如要確保 Compute Engine 預設服務帳戶具備執行 Dataflow 工作所需的權限,請要求管理員授予專案的 Compute Engine 預設服務帳戶下列 IAM 角色:
-
BigQuery 資料編輯者 (
roles/bigquery.dataEditor) -
Dataflow 工作者 (
roles/dataflow.worker) -
Pub/Sub 編輯者 (
roles/pubsub.editor) -
Storage 物件管理員 (
roles/storage.objectAdmin) -
檢視者 (
roles/viewer)
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
管理員或許也能透過自訂角色或其他預先定義的角色,將必要權限授予 Compute Engine 預設服務帳戶。
建立 Cloud Storage 值區
您必須先建立 Cloud Storage 值區,才能執行管道。
建立 Cloud Storage bucket:
- 前往 Google Cloud 控制台的 Cloud Storage「Buckets」(值區) 頁面。
- 點選 「Create」(建立)。
- 在「建立 bucket」頁面中,輸入 bucket 資訊。如要前往下一個步驟,請按「繼續」。
- 在「Name your bucket」(為 bucket 命名) 欄位中,輸入 bucket 的專屬名稱。請勿在 bucket 名稱中加入任何機密資訊,因為 bucket 命名空間全域通用並會公開顯示。
-
在「Choose where to store your data」(選擇資料的儲存位置) 專區中,執行下列操作:
- 選取「位置類型」。
- 從「位置類型」下拉式選單中,選擇要永久儲存 bucket 資料的位置。
- 如果您選取「雙區域」位置類型,也可以使用相關核取方塊啟用強化型複製。
- 如要設定跨值區複製,請選取「透過 Storage 移轉服務新增跨值區複製作業」,然後按照下列步驟操作:
設定跨 bucket 複製作業
- 在「Bucket」選單中選取 bucket。
在「複製設定」部分,按一下「設定」,設定複製作業的設定。
系統隨即會顯示「設定跨 bucket 複製作業」窗格。
- 如要依物件名稱前置字串篩選要複製的物件,請輸入要納入或排除物件的前置字串,然後按一下「新增前置字串」。
- 如要為複製的物件設定儲存空間級別,請從「儲存空間級別」選單中選取儲存空間級別。如果略過這個步驟,複製的物件預設會使用目標值區的儲存空間級別。
- 按一下 [完成]。
-
在「選擇資料儲存方式」部分,執行下列操作:
- 在「設定預設類別」部分,選取「Standard」。
- 如要啟用階層命名空間,請在「為資料密集型工作負載提供最理想的儲存空間」部分,選取「為這個值區啟用階層命名空間」。
- 在「選取如何控制物件的存取權」部分,選取 bucket 是否要強制執行禁止公開存取,並為 bucket 的物件選取存取權控管方法。
-
在「選擇保護物件資料的方式」部分,執行下列操作:
- 在「資料保護」下方,選取要為 bucket 設定的選項。
- 如要啟用虛刪除,請按一下「虛刪除政策 (用於資料復原)」核取方塊,並指定要保留物件的天數 (刪除後)。
- 如要設定「物件版本管理」,請按一下「物件版本管理 (用於版本管控)」核取方塊,並指定每個物件的版本數量上限,以及非現行版本失效的天數。
- 如要為物件和 bucket 啟用保留政策,請勾選「保留 (符合法規)」核取方塊,然後執行下列操作:
- 如要啟用 Object Retention Lock,請按一下「啟用物件保留功能」核取方塊。
- 如要啟用「Bucket Lock」,請勾選「Set bucket retention policy」(設定值區保留政策) 核取方塊,然後選擇保留期限的時間單位和長度。
- 如要選擇物件資料的加密方式,請展開「資料加密」部分 (),然後選取「資料加密」方法。
- 在「資料保護」下方,選取要為 bucket 設定的選項。
- 點選「建立」。
複製下列內容,因為後續章節會用到:
- Cloud Storage bucket 名稱。
- 您的 Google Cloud 專案 ID。
如要找出這個 ID,請參閱「識別專案」。
虛擬私有雲網路
根據預設,每個新專案一開始都是使用預設網路。如果專案的預設網路已停用或刪除,您必須在專案中建立網路,並將Compute 網路使用者角色 (roles/compute.networkUser) 指派給使用者帳戶。
建立 BigQuery 資料集與資料表
使用 Google Cloud 控制台,為 Pub/Sub 主題建立具備適當結構定義的 BigQuery 資料集和資料表。
在這個範例中,資料集的名稱為 taxirides,資料表的名稱為 realtime。如要建立這個資料集和資料表,請按照下列步驟操作:
- 前往「BigQuery」BigQuery頁面。
前往 BigQuery - 在「Explorer」面板中,找到要建立資料集的專案,然後依序點選 「查看動作」和「建立資料集」。
- 在「建立資料集」面板中,按照下列步驟操作:
- 在「Dataset ID」(資料集 ID) 中輸入
taxirides。 每個 Google Cloud 專案的資料集 ID 皆不得重複。 - 針對「Location type」(位置類型) 選取「Multi-region」(多區域),然後選取「US (multiple regions in United States)」(us (多個美國區域))。公開資料集儲存在
US多地區位置。為簡單起見,建議您將資料集放在同一個區域。 - 保留其他預設設定,然後按一下「建立資料集」。
- 在
「Explorer」 面板中展開專案。 - 在
taxirides資料集旁邊,依序按一下「View actions」(查看動作) 和「Create table」(建立資料表)。 - 在「Create table」(建立資料表) 面板中按照下列步驟操作:
- 在「Source」(來源) 區段中,針對「Create table from」(使用下列資料建立資料表),選取「Empty table」(空白資料表)。
- 在「Destination」(目的地) 區段中,在「Table」(資料表) 輸入
realtime。 - 在「Schema」(結構定義) 區段中,按一下「Edit as text」(以文字形式編輯) 切換鈕,並在輸入框中貼上以下的結構定義:
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- 在「Partition and cluster settings」(分區與叢集設定) 區段的「Partitioning」(分區) 中,選取「Timestamp」(時間戳記) 欄位。
- 保留其他預設設定,然後按一下「建立資料表」。
執行管道
使用 Google 提供的「Pub/Sub 到 BigQuery」範本執行串流管道。管道會從輸入主題取得輸入資料。
- 前往 Dataflow 的「Jobs」(工作) 頁面:
前往「Jobs」(工作) - 按一下
「Create job from template」(利用範本建立工作) 。 - 在 Dataflow 工作的「Job name」(工作名稱) 中輸入
taxi-data。 - 在「Dataflow template」(Dataflow 範本) 欄位中,選取 「Pub/Sub to BigQuery」(Pub/Sub 到 BigQuery) 範本。
- 在「BigQuery output table」(BigQuery 輸出資料表) 中輸入下列資訊:
PROJECT_ID:taxirides.realtime
將
PROJECT_ID替換為您建立 BigQuery 資料集的專案 ID。 - 在「Optional source parameters」(選用來源參數) 專區的「Input Pub/Sub topic」(輸入 Pub/Sub 主題) 欄位中,按一下「Enter topic manually」(手動輸入主題)。
- 在「Topic name」(主題名稱) 對話方塊中輸入以下內容,然後按一下「Save」(儲存):
projects/pubsub-public-data/topics/taxirides-realtime
這個公開的 Pub/Sub 主題是以紐約市計程車暨禮車管理局的公開資料集為基礎,以下是這個主題的訊息範例,格式為 JSON:
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- 在「Temp location」(臨時位置) 中輸入下列內容:
gs://BUCKET_NAME/temp/
請將
BUCKET_NAME改成您的 Cloud Storage 值區名稱。temp資料夾會儲存暫存檔案,例如暫存管道工作。 - 如果專案沒有預設網路,請輸入「網路」和「子網路」。詳情請參閱指定網路和子網路。
- 按一下「Run Job」(執行工作)。
查看結果
如要查看寫入realtime 資料表的資料,請按照下列步驟操作:
前往「BigQuery」頁面
按一下「撰寫新查詢」。系統會隨即開啟新的「Editor」(編輯器) 分頁。
SELECT * FROM `PROJECT_ID.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
將
PROJECT_ID替換為您建立 BigQuery 資料集的專案 ID。最多可能需要五分鐘,資料才會顯示於資料表。按一下「執行」。
查詢會傳回過去 24 小時內新增至資料表的資料列。您也可以使用標準 SQL 執行查詢。
清除所用資源
為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請按照下列步驟操作。
刪除專案
如要避免付費,最簡單的方法就是刪除您為快速入門導覽課程建立的 Google Cloud 專案。- 前往 Google Cloud 控制台的「Manage resources」(管理資源) 頁面。
- 在專案清單中選取要刪除的專案,然後點選「Delete」(刪除)。
- 在對話方塊中輸入專案 ID,然後按一下 [Shut down] (關閉) 以刪除專案。
刪除個別資源
如要保留您在本快速入門中使用的 Google Cloud 專案,請刪除個別資源:
- 前往 Dataflow 的「Jobs」(工作) 頁面:
前往「工作」 - 從工作清單中選取串流工作。
- 按一下導覽區中的「停止」。
- 在「Stop job」(停止工作) 對話方塊中,取消或排除管道,然後按一下「Stop job」(停止工作)。
- 前往「BigQuery」頁面
前往 BigQuery - 在「Explorer」面板中展開專案。
- 在要刪除的資料集旁邊,依序按一下「View actions」(查看動作) 和「Open」(開啟)。
- 在詳細資料面板中,按一下「刪除資料集」,然後按照指示操作。
- 前往 Google Cloud 控制台的 Cloud Storage「Buckets」(值區) 頁面。
- 按一下要刪除的值區旁的核取方塊。
- 如要刪除值區,請依序點選 「Delete」(刪除),然後按照指示操作。