使用 Dataflow 範本建立串流管道

本快速入門導覽課程說明如何使用 Google 提供的 Dataflow 範本建立串流管道。具體來說,本快速入門導覽課程會以 Pub/Sub 到 BigQuery 範本為例。

Pub/Sub 到 BigQuery 範本為串流管道,可從 Pub/Sub 主題讀取 JSON 格式訊息,並將其寫入 BigQuery 資料表。


如要直接在 Google Cloud 控制台中,按照這項工作的逐步指南操作,請按一下「Guide me」(逐步引導)

「Guide me」(逐步引導)


事前準備

請先完成下列步驟,再執行管道。

設定專案

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

必要的角色

如要完成本快速入門導覽課程,您需要下列 Identity and Access Management (IAM) 角色。

如要取得完成本快速入門導覽課程所需的權限,請要求管理員在專案中授予您下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

您或許也能透過自訂角色或其他預先定義的角色,取得必要權限。

如要確保 Compute Engine 預設服務帳戶具備執行 Dataflow 工作所需的權限,請要求管理員授予專案的 Compute Engine 預設服務帳戶下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

管理員或許也能透過自訂角色或其他預先定義的角色,將必要權限授予 Compute Engine 預設服務帳戶。

建立 Cloud Storage 值區

您必須先建立 Cloud Storage 值區,才能執行管道。

  1. 建立 Cloud Storage bucket:

    1. 前往 Google Cloud 控制台的 Cloud Storage「Buckets」(值區) 頁面。

      前往「Buckets」(值區) 頁面

    2. 點選 「Create」(建立)
    3. 在「建立 bucket」頁面中,輸入 bucket 資訊。如要前往下一個步驟,請按「繼續」
      1. 在「Name your bucket」(為 bucket 命名) 欄位中,輸入 bucket 的專屬名稱。請勿在 bucket 名稱中加入任何機密資訊,因為 bucket 命名空間全域通用並會公開顯示。
      2. 在「Choose where to store your data」(選擇資料的儲存位置) 專區中,執行下列操作:
        1. 選取「位置類型」
        2. 從「位置類型」下拉式選單中,選擇要永久儲存 bucket 資料的位置。
          • 如果您選取「雙區域」位置類型,也可以使用相關核取方塊啟用強化型複製
        3. 如要設定跨值區複製,請選取「透過 Storage 移轉服務新增跨值區複製作業」,然後按照下列步驟操作:

          設定跨 bucket 複製作業

          1. 在「Bucket」選單中選取 bucket。
          2. 在「複製設定」部分,按一下「設定」,設定複製作業的設定。

            系統隨即會顯示「設定跨 bucket 複製作業」窗格。

            • 如要依物件名稱前置字串篩選要複製的物件,請輸入要納入或排除物件的前置字串,然後按一下「新增前置字串」
            • 如要為複製的物件設定儲存空間級別,請從「儲存空間級別」選單中選取儲存空間級別。如果略過這個步驟,複製的物件預設會使用目標值區的儲存空間級別。
            • 按一下 [完成]
      3. 在「選擇資料儲存方式」部分,執行下列操作:
        1. 在「設定預設類別」部分,選取「Standard」
        2. 如要啟用階層命名空間,請在「為資料密集型工作負載提供最理想的儲存空間」部分,選取「為這個值區啟用階層命名空間」
      4. 在「選取如何控制物件的存取權」部分,選取 bucket 是否要強制執行禁止公開存取,並為 bucket 的物件選取存取權控管方法
      5. 在「選擇保護物件資料的方式」部分,執行下列操作:
        • 在「資料保護」下方,選取要為 bucket 設定的選項。
          • 如要啟用虛刪除,請按一下「虛刪除政策 (用於資料復原)」核取方塊,並指定要保留物件的天數 (刪除後)。
          • 如要設定「物件版本管理」,請按一下「物件版本管理 (用於版本管控)」核取方塊,並指定每個物件的版本數量上限,以及非現行版本失效的天數。
          • 如要為物件和 bucket 啟用保留政策,請勾選「保留 (符合法規)」核取方塊,然後執行下列操作:
            • 如要啟用 Object Retention Lock,請按一下「啟用物件保留功能」核取方塊。
            • 如要啟用「Bucket Lock」,請勾選「Set bucket retention policy」(設定值區保留政策) 核取方塊,然後選擇保留期限的時間單位和長度。
        • 如要選擇物件資料的加密方式,請展開「資料加密」部分 (),然後選取「資料加密」方法
    4. 點選「建立」
  2. 複製下列內容,因為後續章節會用到:

    • Cloud Storage bucket 名稱。
    • 您的 Google Cloud 專案 ID。

    如要找出這個 ID,請參閱「識別專案」。

虛擬私有雲網路

根據預設,每個新專案一開始都是使用預設網路。如果專案的預設網路已停用或刪除,您必須在專案中建立網路,並將Compute 網路使用者角色 (roles/compute.networkUser) 指派給使用者帳戶。

建立 BigQuery 資料集與資料表

使用 Google Cloud 控制台,為 Pub/Sub 主題建立具備適當結構定義的 BigQuery 資料集和資料表。

在這個範例中,資料集的名稱為 taxirides,資料表的名稱為 realtime。如要建立這個資料集和資料表,請按照下列步驟操作:

  1. 前往「BigQuery」BigQuery頁面。
    前往 BigQuery
  2. 在「Explorer」面板中,找到要建立資料集的專案,然後依序點選 「查看動作」和「建立資料集」
  3. 在「建立資料集」面板中,按照下列步驟操作:
    1. 在「Dataset ID」(資料集 ID) 中輸入 taxirides。 每個 Google Cloud 專案的資料集 ID 皆不得重複。
    2. 針對「Location type」(位置類型) 選取「Multi-region」(多區域),然後選取「US (multiple regions in United States)」(us (多個美國區域))。公開資料集儲存在 US 多地區位置。為簡單起見,建議您將資料集放在同一個區域。
    3. 保留其他預設設定,然後按一下「建立資料集」
  4. 「Explorer」面板中展開專案。
  5. taxirides 資料集旁邊,依序按一下「View actions」(查看動作) 和「Create table」(建立資料表)
  6. 在「Create table」(建立資料表) 面板中按照下列步驟操作:
    1. 在「Source」(來源) 區段中,針對「Create table from」(使用下列資料建立資料表),選取「Empty table」(空白資料表)
    2. 在「Destination」(目的地) 區段中,在「Table」(資料表) 輸入 realtime
    3. 在「Schema」(結構定義) 區段中,按一下「Edit as text」(以文字形式編輯) 切換鈕,並在輸入框中貼上以下的結構定義:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. 在「Partition and cluster settings」(分區與叢集設定) 區段的「Partitioning」(分區) 中,選取「Timestamp」(時間戳記) 欄位。
  7. 保留其他預設設定,然後按一下「建立資料表」

執行管道

使用 Google 提供的「Pub/Sub 到 BigQuery」範本執行串流管道。管道會從輸入主題取得輸入資料。

  1. 前往 Dataflow 的「Jobs」(工作) 頁面:
    前往「Jobs」(工作)
  2. 按一下「Create job from template」(利用範本建立工作)
  3. 在 Dataflow 工作的「Job name」(工作名稱) 中輸入 taxi-data
  4. 在「Dataflow template」(Dataflow 範本) 欄位中,選取 「Pub/Sub to BigQuery」(Pub/Sub 到 BigQuery) 範本。
  5. 在「BigQuery output table」(BigQuery 輸出資料表) 中輸入下列資訊:
    PROJECT_ID:taxirides.realtime

    PROJECT_ID 替換為您建立 BigQuery 資料集的專案 ID。

  6. 在「Optional source parameters」(選用來源參數) 專區的「Input Pub/Sub topic」(輸入 Pub/Sub 主題) 欄位中,按一下「Enter topic manually」(手動輸入主題)
  7. 在「Topic name」(主題名稱) 對話方塊中輸入以下內容,然後按一下「Save」(儲存)
    projects/pubsub-public-data/topics/taxirides-realtime

    這個公開的 Pub/Sub 主題是以紐約市計程車暨禮車管理局的公開資料集為基礎,以下是這個主題的訊息範例,格式為 JSON:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  8. 在「Temp location」(臨時位置) 中輸入下列內容:
    gs://BUCKET_NAME/temp/

    請將 BUCKET_NAME 改成您的 Cloud Storage 值區名稱。temp 資料夾會儲存暫存檔案,例如暫存管道工作。

  9. 如果專案沒有預設網路,請輸入「網路」和「子網路」。詳情請參閱指定網路和子網路
  10. 按一下「Run Job」(執行工作)

查看結果

如要查看寫入 realtime 資料表的資料,請按照下列步驟操作:

  1. 前往「BigQuery」頁面

    前往 BigQuery

  2. 按一下「撰寫新查詢」。系統會隨即開啟新的「Editor」(編輯器) 分頁。

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    PROJECT_ID 替換為您建立 BigQuery 資料集的專案 ID。最多可能需要五分鐘,資料才會顯示於資料表。

  3. 按一下「執行」

    查詢會傳回過去 24 小時內新增至資料表的資料列。您也可以使用標準 SQL 執行查詢。

清除所用資源

為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請按照下列步驟操作。

刪除專案

如要避免付費,最簡單的方法就是刪除您為快速入門導覽課程建立的 Google Cloud 專案。

  1. 前往 Google Cloud 控制台的「Manage resources」(管理資源) 頁面。

    前往「Manage resources」(管理資源)

  2. 在專案清單中選取要刪除的專案,然後點選「Delete」(刪除)
  3. 在對話方塊中輸入專案 ID,然後按一下 [Shut down] (關閉) 以刪除專案。

刪除個別資源

如要保留您在本快速入門中使用的 Google Cloud 專案,請刪除個別資源:

  1. 前往 Dataflow 的「Jobs」(工作) 頁面:
    前往「工作」
  2. 從工作清單中選取串流工作。
  3. 按一下導覽區中的「停止」
  4. 在「Stop job」(停止工作) 對話方塊中,取消排除管道,然後按一下「Stop job」(停止工作)
  5. 前往「BigQuery」頁面
    前往 BigQuery
  6. 在「Explorer」面板中展開專案。
  7. 在要刪除的資料集旁邊,依序按一下「View actions」(查看動作) 和「Open」(開啟)
  8. 在詳細資料面板中,按一下「刪除資料集」,然後按照指示操作。
  9. 前往 Google Cloud 控制台的 Cloud Storage「Buckets」(值區) 頁面。

    前往「Buckets」(值區) 頁面

  10. 按一下要刪除的值區旁的核取方塊。
  11. 如要刪除值區,請依序點選 「Delete」(刪除),然後按照指示操作。

後續步驟