使用工作建構工具建立自訂工作

工作建構工具可讓您建立自訂批次和串流 Dataflow 工作。您也可以將工作建構工具工作儲存為 Apache Beam YAML 檔案,以便分享及重複使用。

建立新的管道

如要在工作建構工具中建立新管道,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Jobs」(工作) 頁面。

    前往工作

  2. 按一下 「Create job from builder」(從建構工具建立工作)

  3. 在「工作名稱」部分,輸入工作名稱。

  4. 選取「批次」或「串流」

  5. 如果選取「串流」,請選取視窗模式。然後輸入視窗規格,如下所示:

    • 固定時間範圍:輸入時間範圍大小 (以秒為單位)。
    • 滑動視窗:輸入視窗大小和視窗週期 (以秒為單位)。
    • 間隔式時間區間:輸入工作階段間隔 (以秒為單位)。

    如要進一步瞭解視窗,請參閱「視窗和視窗函式」。

接著,請按照下列各節的說明,在管道中加入來源、轉換和接收器。

將來源新增至管道

管道至少須有一個來源。工作建構工具一開始會填入空白來源。如要設定來源,請按照下列步驟操作:

  1. 在「來源名稱」方塊中,輸入來源名稱或使用預設名稱。執行工作時,名稱會顯示在工作圖表中。

  2. 在「來源類型」清單中,選取資料來源類型。

  3. 視來源類型而定,提供額外的設定資訊。

    • 舉例來說,如果您選取 BigQuery,請指定要從哪個資料表讀取資料。
    • 如果選取 Pub/Sub,請指定訊息結構定義。輸入要從 Pub/Sub 訊息讀取的每個欄位名稱和資料類型。管道會捨棄結構定義中未指定的任何欄位。
    • 如果選取 Apache Iceberg,請指定 Iceberg REST 目錄 (IRC) 的連線詳細資料,例如 Iceberg 資料表 ID、目錄名稱、目錄類型、目錄 URI 和倉庫名稱。
  4. 選用:針對部分來源類型,您可以按一下「預覽來源資料」來預覽來源資料。

如要在管道中新增其他來源,請按一下「新增來源」。如要合併多個來源的資料,請在管道中新增 SQLJoin 轉換。

在管道中新增轉換

視需要將一或多個轉換新增至管道。您可以使用下列轉換指令,操控、匯總或彙整來源和其他轉換作業中的資料:

轉換類型 說明 Beam YAML 轉換作業資訊
篩選 (Python) 使用 Python 運算式篩選記錄。
SQL 轉換 透過 SQL 陳述式處理記錄或彙整多項輸入內容。
對應欄位 (Python) 運用 Python 運算式和函式,新增欄位或重新對應整份記錄。
對應欄位 (SQL) 透過 SQL 運算式新增或對應記錄欄位。
YAML 轉換作業:
  1. AssertEqual
  2. AssignTimestamps
  3. 合併
  4. 分割
  5. 篩選器
  6. Flatten
  7. 加入
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

使用 Beam YAML SDK 中的任何轉換作業。

YAML 轉換作業設定:提供 YAML 轉換作業設定參數,做為 YAML 對應。建立 Beam YAML 轉換之後,系統會將鍵/值組合填入該轉換作業的設定專區。如要查看各轉換作業類型支援的設定參數,請參閱 Beam YAML 轉換作業說明文件。設定參數範例:

合併
group_by:
combine:
加入
type:
equalities:
fields:
記錄 將記錄記錄至工作的工作站記錄檔。
分組依據 使用 count()sum() 等函式合併記錄。
加入 彙整相等欄位中的多項輸入內容。
Explode 攤平陣列欄位來分割記錄。

如要新增轉換作業,請按照下列步驟操作:

  1. 按一下「新增轉換作業」

  2. 在「轉換」名稱方塊中,輸入轉換名稱或使用預設名稱。執行工作時,名稱會顯示在工作圖表中。

  3. 在「轉換類型」清單中,選取轉換類型。

  4. 視轉換類型而定,提供其他設定資訊。舉例來說,如果您選取「篩選條件 (Python)」,請輸入要當做篩選條件的 Python 運算式。

  5. 選取轉換作業的輸入步驟。輸入步驟是來源或轉換,其輸出內容會提供此轉換的輸入內容。

將接收器新增至管道

管道至少須有一個接收器。工作建構工具一開始會填入空白的接收器。如要設定接收器,請執行下列步驟:

  1. 在「Sink name」(接收器名稱) 方塊中,輸入接收器名稱或使用預設名稱。 執行工作時,名稱會顯示在工作圖表中。

  2. 在「Sink type」(接收器類型) 清單中,選取接收器類型。

  3. 視接收器類型而定,提供額外的設定資訊。 舉例來說,如果您選取 BigQuery 接收器,請選取要寫入的 BigQuery 資料表。

  4. 選取接收器的輸入步驟。輸入步驟是來源或轉換,其輸出內容會提供這個轉換的輸入內容。

  5. 如要在管道中新增其他接收器,請按一下「新增接收器」

執行管道

如要從作業建構工具執行管道,請完成下列步驟:

  1. 選用:設定 Dataflow 工作選項。如要展開「Dataflow options」部分,請按一下 展開箭頭。

  2. 按一下「Run Job」(執行工作)。工作建構工具會前往已提交工作的工作圖表。您可以使用工作圖表監控工作狀態。

啟動前請先驗證管道

對於設定複雜的管道 (例如 Python 篩選器和 SQL 運算式),建議您先檢查管道設定是否有語法錯誤,再啟動管道。如要驗證管道語法,請完成下列步驟:

  1. 按一下「驗證」開啟 Cloud Shell,並啟動驗證服務。
  2. 按一下「開始驗證」
  3. 如果驗證時發現錯誤,系統會顯示紅色驚嘆號。
  4. 修正偵測到的錯誤,然後按一下「驗證」,確認修正內容。如果沒有發現錯誤,系統會顯示綠色勾號。

透過 gcloud CLI 執行

您也可以使用 gcloud CLI 執行 Beam YAML pipeline。如要使用 gcloud CLI 執行工作建構工具管道,請按照下列步驟操作:

  1. 按一下「儲存 YAML」開啟「儲存 YAML」視窗。

  2. 執行下列其中一項動作:

    • 如要儲存至 Cloud Storage,請輸入 Cloud Storage 路徑,然後按一下「儲存」
    • 如要下載本機檔案,請按一下「下載」
  3. 在殼層或終端機中執行下列指令:

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    請將 YAML_FILE_PATH 改成 YAML 檔案的路徑 (本機或 Cloud Storage)。

後續步驟