轉換推送總覽

為提升資料管道的效能,您可以將部分轉換作業推送至 BigQuery,而非 Apache Spark。「轉換下推」是指一項設定,可讓 Cloud Data Fusion 資料管道中的作業推送至 BigQuery,做為執行引擎。因此,作業及其資料會移轉至 BigQuery,並在該處執行作業。

轉換下推功能可提升管道效能,這些管道具有多個複雜的 JOIN 作業或其他支援的轉換。在 BigQuery 中執行某些轉換作業,可能比在 Spark 中執行更快。

系統會在 Spark 中執行不支援的轉換和所有預覽轉換。

支援的轉換

轉換下推功能適用於 Cloud Data Fusion 6.5.0 以上版本,但下列部分轉換作業僅適用於後續版本。

JOIN 項作業

  • Cloud Data Fusion 6.5.0 以上版本支援 JOIN 作業的轉換下推功能。

  • 支援基本 (按鍵) 和進階 JOIN 作業。

  • 聯結必須有兩個輸入階段,才能在 BigQuery 中執行。

  • 如果設定聯結將一或多個輸入內容載入記憶體,系統會在 Spark 中執行聯結,而非 BigQuery,但下列情況除外:

    • 如果已將任何輸入內容推送至聯結。
    • 如果您已設定要在 SQL Engine 中執行的聯結 (請參閱「強制執行階段」選項)。

BigQuery 接收器

轉換下推功能適用於 Cloud Data Fusion 6.7.0 以上版本的 BigQuery Sink。

如果 BigQuery Sink 緊接在 BigQuery 中執行的階段之後,系統會直接在 BigQuery 中執行將記錄寫入 BigQuery 的作業。

如要使用這個接收器提升效能,您需要:

  • 服務帳戶必須有權在 BigQuery Sink 使用的資料集中建立及更新資料表。
  • 用於轉換下推的資料集和 BigQuery 接收器必須儲存在相同的位置
  • 作業必須是下列其中一項:
    • Insert (不支援 Truncate Table 選項)
    • Update
    • Upsert

GROUP BY 匯總

在 Cloud Data Fusion 6.7.0 以上版本中,GROUP BY 匯總作業可使用轉換下推功能。

BigQuery 中的 GROUP BY 匯總適用於下列作業:

  • Avg
  • Collect List (系統會從輸出陣列中移除空值)
  • Collect Set (系統會從輸出陣列中移除空值)
  • Concat
  • Concat Distinct
  • Count
  • Count Distinct
  • Count Nulls
  • Logical And
  • Logical Or
  • Max
  • Min
  • Standard Deviation
  • Sum
  • Sum of Squares
  • Corrected Sum of Squares
  • Variance
  • Shortest String
  • Longest String

在下列情況下,GROUP BY 匯總會在 BigQuery 中執行:

  • 該階段會接續在已向下推送的階段之後。
  • 您已將其設定為在 SQL 引擎中執行 (請參閱「強制執行階段」選項)。

簡化匯總

在 Cloud Data Fusion 6.7.0 以上版本中,下列作業可使用轉換下推功能,進行重複資料刪除匯總:

  • 未指定任何篩選作業
  • ANY (所需欄位的非空值)
  • MIN (指定欄位的最小值)
  • MAX (指定欄位的最大值)

系統不支援下列作業:

  • FIRST
  • LAST

在下列情況下,系統會在 SQL 引擎中執行重複資料刪除匯總:

  • 該階段會接續在已向下推送的階段之後。
  • 您已將其設定為在 SQL 引擎中執行 (請參閱「強制執行階段」選項)。

BigQuery 來源下推

BigQuery 來源下推功能適用於 Cloud Data Fusion 6.8.0 以上版本。

如果 BigQuery 來源接在與 BigQuery 下推作業相容的階段之後,管道就能在 BigQuery 中執行所有相容的階段。

Cloud Data Fusion 會複製執行 BigQuery 管道所需的記錄。

使用 BigQuery 來源下推時,系統會保留資料表分割和叢集屬性,讓您使用這些屬性進一步最佳化作業,例如聯結。

額外規定

如要使用 BigQuery 來源下推功能,必須符合下列條件:

  • 為 BigQuery 轉換下推設定的服務帳戶,必須具備讀取 BigQuery 來源資料集內資料表的權限。

  • BigQuery 來源中使用的資料集,以及為轉換下推設定的資料集,必須儲存在相同的位置

時段匯總

在 Cloud Data Fusion 6.9 以上版本中,轉換下推功能適用於視窗匯總。BigQuery 支援下列作業的視窗匯總:

  • Rank
  • Dense Rank
  • Percent Rank
  • N tile
  • Row Number
  • Median
  • Continuous Percentile
  • Lead
  • Lag
  • First
  • Last
  • Cumulative distribution
  • Accumulate

在下列情況下,系統會在 BigQuery 中執行視窗匯總:

  • 該階段會接續在已向下推送的階段之後。
  • 您已將其設定為在 SQL 引擎中執行 (請參閱「強制下推的階段」選項)。

Wrangler 篩選器下推

Wrangler 篩選條件下推功能適用於 Cloud Data Fusion 6.9 以上版本。

使用 Wrangler 外掛程式時,您可以將篩選器 (又稱 Precondition 作業) 推送至 BigQuery 執行,而非 Spark。

篩選器下推僅支援 6.9 版中發布的先決條件 SQL 模式。在此模式下,外掛程式會接受 ANSI 標準 SQL 中的前提條件運算式。

如果使用 SQL 模式設定前提條件,Wrangler 外掛程式會停用「指令」和「使用者定義指令」,因為 SQL 模式下的前提條件不支援這些指令。

如果啟用轉換下推功能,Wrangler 外掛程式有多個輸入內容時,系統不支援前置條件的 SQL 模式。如果搭配多個輸入內容使用,系統會在 Spark 中執行這個包含 SQL 篩選條件的 Wrangler 階段。

在下列情況下,系統會在 BigQuery 中執行篩選器:

  • 該階段會接續在已向下推送的階段之後。
  • 您已將其設定為在 SQL 引擎中執行 (請參閱「強制下推的階段」選項)。

指標

如要進一步瞭解 Cloud Data Fusion 為在 BigQuery 中執行的 pipeline 部分提供的指標,請參閱「BigQuery 下推 pipeline 指標」。

使用轉換下推的時機

在 BigQuery 中執行轉換作業時,需要進行下列步驟:

  1. 將記錄寫入 BigQuery,適用於管道中的支援階段。
  2. 在 BigQuery 中執行支援的階段。
  3. 執行支援的轉換後,從 BigQuery 讀取記錄,除非後續有 BigQuery Sink

視資料集大小而定,網路負擔可能相當大,如果啟用轉換下推功能,可能會對整體管道執行時間造成負面影響。

由於網路負擔過重,我們建議在下列情況下使用轉換下推:

  • 系統會依序執行多項支援的作業 (階段之間沒有步驟)。
  • 相較於 Spark,BigQuery 執行轉換作業的效能提升幅度,大於資料移入和可能移出 BigQuery 的延遲時間。

運作方式

執行使用「轉換下推」的管道時,Cloud Data Fusion 會在 BigQuery 中執行支援的轉換階段。管道中的所有其他階段都會在 Spark 中執行。

執行轉換時:

  1. Cloud Data Fusion 會將記錄寫入 Cloud Storage,然後執行 BigQuery 載入工作,將輸入資料集載入 BigQuery。

  2. JOIN 作業和支援的轉換作業隨後會以 SQL 陳述式做為 BigQuery 工作執行。

  3. 如果工作執行完畢後需要進一步處理,可以將記錄從 BigQuery 匯出至 Spark。不過,如果啟用「嘗試直接複製到 BigQuery 接收器」選項,且 BigQuery 接收器接在 BigQuery 中執行的階段之後,記錄會直接寫入目的地 BigQuery 接收器資料表。

下圖顯示轉換下推功能如何在 BigQuery 中執行支援的轉換,而非在 Spark 中執行。

在 Cloud Data Fusion 管道中,將轉換下推至 BigQuery。

最佳做法

調整叢集和執行器大小

如要最佳化管道中的資源管理,請執行下列操作:

  • 為工作負載使用適當數量的叢集工作站 (節點)。換句話說,您可充分運用執行個體可用的 CPU 和記憶體,充分發揮佈建的 Managed Service for Apache Spark 叢集效用,同時享有 BigQuery 的執行速度,處理大型作業。

  • 使用自動調度資源叢集,提升管道的平行處理能力。

  • 在管道執行期間,於從 BigQuery 推送或提取記錄的管道階段中,調整資源設定。

建議:嘗試增加執行器資源的 CPU 核心數 (最多可增加至 worker 節點使用的 CPU 核心數)。當資料進出 BigQuery 時,執行器會在序列化和還原序列化步驟中,盡量減少 CPU 使用量。詳情請參閱「叢集大小」。

在 BigQuery 中執行轉換作業的好處是,管道可以在較小的 Managed Service for Apache Spark 叢集上執行。如果聯結是管道中最耗用資源的作業,您可以嘗試使用較小的叢集大小,因為現在的繁重 JOIN 作業是在 BigQuery 中執行,因此您可能會降低整體運算費用。

使用 BigQuery Storage Read API 更快擷取資料

BigQuery 執行轉換作業後,管道可能會有其他階段要在 Spark 中執行。在 Cloud Data Fusion 6.7.0 以上版本中,轉換下推支援 BigQuery Storage Read API,可縮短延遲時間,並加快讀取至 Spark 的作業。這有助於縮短管道的整體執行時間。

API 會平行讀取記錄,因此建議您相應調整執行器大小。如果在 BigQuery 中執行耗用大量資源的作業,請減少執行器的記憶體配置,以提升管道執行時的平行處理能力 (請參閱「調整叢集和執行器大小」)。

BigQuery Storage Read API 預設為停用。您可以在安裝 Scala 2.12 的執行環境中啟用這項功能,包括 Managed Service for Apache Spark 2.0 和 Managed Service for Apache Spark 1.5。

考量資料集大小

請考量 JOIN 作業中的資料集大小。對於會產生大量輸出記錄的作業 (例如類似於交叉 JOIN 作業的作業),產生的資料集大小可能會比輸入資料集大上好幾個數量級。JOIN此外,在整體管道效能的脈絡下,如果這些記錄需要額外的 Spark 處理 (例如轉換或接收器),請考慮將這些記錄拉回 Spark 的負擔。

減少資料偏誤

如果資料嚴重傾斜,JOIN 作業可能會導致 BigQuery 工作超出資源用量限制,進而導致 JOIN 作業失敗。為避免發生這種情況,請前往 Joiner 外掛程式設定,並在「Skewed Input Stage」(輸入階段偏差) 欄位中找出偏差的輸入內容。這樣一來,Cloud Data Fusion 就能安排輸入內容,降低 BigQuery 陳述式超出限制的風險。

在 Joiner 外掛程式設定中,找出「Skewed Input Stage」(輸入階段偏差) 欄位中的資料偏差。

後續步驟