工作建構工具可讓您建立自訂批次和串流 Dataflow 工作。您也可以將工作建構工具工作儲存為 Apache Beam YAML 檔案,以便分享及重複使用。
建立新的管道
如要在工作建構工具中建立新管道,請按照下列步驟操作:
前往 Google Cloud 控制台的「Jobs」(工作) 頁面。
按一下 「Create job from builder」(從建構工具建立工作)。
在「工作名稱」部分,輸入工作名稱。
選取「批次」或「串流」。
如果選取「串流」,請選取視窗模式。然後輸入視窗規格,如下所示:
- 固定時間範圍:輸入時間範圍大小 (以秒為單位)。
- 滑動視窗:輸入視窗大小和視窗週期 (以秒為單位)。
- 間隔式時間區間:輸入工作階段間隔 (以秒為單位)。
如要進一步瞭解視窗,請參閱「視窗和視窗函式」。
接著,請按照下列各節的說明,在管道中加入來源、轉換和接收器。
將來源新增至管道
管道至少須有一個來源。工作建構工具一開始會填入空白來源。如要設定來源,請按照下列步驟操作:
在「來源名稱」方塊中,輸入來源名稱或使用預設名稱。執行工作時,名稱會顯示在工作圖表中。
在「來源類型」清單中,選取資料來源類型。
視來源類型而定,提供額外的設定資訊。
- 舉例來說,如果您選取 BigQuery,請指定要從哪個資料表讀取資料。
- 如果選取 Pub/Sub,請指定訊息結構定義。輸入要從 Pub/Sub 訊息讀取的每個欄位名稱和資料類型。管道會捨棄結構定義中未指定的任何欄位。
- 如果選取 Apache Iceberg,請指定 Iceberg REST 目錄 (IRC) 的連線詳細資料,例如 Iceberg 資料表 ID、目錄名稱、目錄類型、目錄 URI 和倉庫名稱。
選用:針對部分來源類型,您可以按一下「預覽來源資料」來預覽來源資料。
如要在管道中新增其他來源,請按一下「新增來源」。如要合併多個來源的資料,請在管道中新增 SQL 或 Join 轉換。
在管道中新增轉換
視需要將一或多個轉換新增至管道。您可以使用下列轉換指令,操控、匯總或彙整來源和其他轉換作業中的資料:
| 轉換類型 | 說明 | Beam YAML 轉換作業資訊 |
|---|---|---|
| 篩選 (Python) | 使用 Python 運算式篩選記錄。 | |
| SQL 轉換 | 透過 SQL 陳述式處理記錄或彙整多項輸入內容。 | |
| 對應欄位 (Python) | 運用 Python 運算式和函式,新增欄位或重新對應整份記錄。 | |
| 對應欄位 (SQL) | 透過 SQL 運算式新增或對應記錄欄位。 | |
YAML 轉換作業:
|
使用 Beam YAML SDK 中的任何轉換作業。 YAML 轉換作業設定:提供 YAML 轉換作業設定參數,做為 YAML 對應。建立 Beam YAML 轉換之後,系統會將鍵/值組合填入該轉換作業的設定專區。如要查看各轉換作業類型支援的設定參數,請參閱 Beam YAML 轉換作業說明文件。設定參數範例: 合併group_by: combine: 加入type: equalities: fields: |
|
| 記錄 | 將記錄記錄至工作的工作站記錄檔。 | |
| 分組依據 |
使用 count() 和 sum() 等函式合併記錄。 |
|
| 加入 | 彙整相等欄位中的多項輸入內容。 | |
| Explode | 攤平陣列欄位來分割記錄。 |
如要新增轉換作業,請按照下列步驟操作:
按一下「新增轉換作業」。
在「轉換」名稱方塊中,輸入轉換名稱或使用預設名稱。執行工作時,名稱會顯示在工作圖表中。
在「轉換類型」清單中,選取轉換類型。
視轉換類型而定,提供其他設定資訊。舉例來說,如果您選取「篩選條件 (Python)」,請輸入要當做篩選條件的 Python 運算式。
選取轉換作業的輸入步驟。輸入步驟是來源或轉換,其輸出內容會提供此轉換的輸入內容。
將接收器新增至管道
管道至少須有一個接收器。工作建構工具一開始會填入空白的接收器。如要設定接收器,請執行下列步驟:
在「Sink name」(接收器名稱) 方塊中,輸入接收器名稱或使用預設名稱。 執行工作時,名稱會顯示在工作圖表中。
在「Sink type」(接收器類型) 清單中,選取接收器類型。
視接收器類型而定,提供額外的設定資訊。 舉例來說,如果您選取 BigQuery 接收器,請選取要寫入的 BigQuery 資料表。
選取接收器的輸入步驟。輸入步驟是來源或轉換,其輸出內容會提供這個轉換的輸入內容。
如要在管道中新增其他接收器,請按一下「新增接收器」。
執行管道
如要從作業建構工具執行管道,請完成下列步驟:
選用:設定 Dataflow 工作選項。如要展開「Dataflow options」部分,請按一下 展開箭頭。
按一下「Run Job」(執行工作)。工作建構工具會前往已提交工作的工作圖表。您可以使用工作圖表監控工作狀態。
啟動前請先驗證管道
對於設定複雜的管道 (例如 Python 篩選器和 SQL 運算式),建議您先檢查管道設定是否有語法錯誤,再啟動管道。如要驗證管道語法,請完成下列步驟:
- 按一下「驗證」開啟 Cloud Shell,並啟動驗證服務。
- 按一下「開始驗證」。
- 如果驗證時發現錯誤,系統會顯示紅色驚嘆號。
- 修正偵測到的錯誤,然後按一下「驗證」,確認修正內容。如果沒有發現錯誤,系統會顯示綠色勾號。
透過 gcloud CLI 執行
您也可以使用 gcloud CLI 執行 Beam YAML pipeline。如要使用 gcloud CLI 執行工作建構工具管道,請按照下列步驟操作:
按一下「儲存 YAML」開啟「儲存 YAML」視窗。
執行下列其中一項動作:
- 如要儲存至 Cloud Storage,請輸入 Cloud Storage 路徑,然後按一下「儲存」。
- 如要下載本機檔案,請按一下「下載」。
在殼層或終端機中執行下列指令:
gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH請將
YAML_FILE_PATH改成 YAML 檔案的路徑 (本機或 Cloud Storage)。
後續步驟
- 使用 Dataflow 工作監控介面。
- 在工作建構工具中儲存及載入 YAML 工作定義。
- 進一步瞭解 Beam YAML。