與 OpenLineage 整合

OpenLineage 是開放式平台,用於收集及分析資料歷程資訊。OpenLineage 會使用開放式標準處理歷程資料,並從資料管道元件擷取歷程事件,這些元件會使用 OpenLineage API 報告執行作業、工作和資料集。

透過 Data Lineage API,您可以匯入 OpenLineage 事件,在 Dataplex Universal Catalog 網頁介面中,與來自Google Cloud 服務 (例如 BigQuery、Cloud Composer、Cloud Data Fusion 和 Dataproc) 的歷程資訊一併顯示。

如要匯入使用 OpenLineage 規格的 OpenLineage 事件,請使用 ProcessOpenLineageRunEvent REST API 方法,並將 OpenLineage 層面對應至 Data Lineage API 屬性。

限制

  • Data Lineage API 支援 OpenLineage 主要版本 1。

  • Data Lineage API 端點 ProcessOpenLineageRunEvent 僅做為 OpenLineage 訊息的消費者,而非生產者。您可以透過這個 API,將任何符合 OpenLineage 標準的工具或系統所產生的歷程資訊,傳送至 Dataplex Universal Catalog。部分 Google Cloud 服務 (例如 DataprocCloud Composer) 內建 OpenLineage producer,可將事件傳送至這個端點,自動擷取這些服務的歷程。

  • Data Lineage API 不支援下列項目:

    • 後續任何 OpenLineage 版本,只要訊息格式有變更
    • DatasetEvent
    • JobEvent
  • 單一訊息大小上限為 5 MB。

  • 輸入和輸出內容中每個完整合格名稱的長度上限為 4000 個字元。

  • 連結 會按事件分組,每個事件最多 100 個連結。連結總數上限為 1000 個。

  • Dataplex Universal Catalog 會顯示每個工作執行的沿革圖,其中包含沿革事件的輸入和輸出內容。不支援較低層級的程序,例如 Spark 階段。

OpenLineage 對應

REST API 方法 ProcessOpenLineageRunEvent 會將 OpenLineage 屬性對應至 Data Lineage API 屬性,如下所示:

Data Lineage API 屬性 OpenLineage 屬性
Process.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME
Process.displayName Job.namespace + ":" + Job.name
Process.attributes Job.facets (請參閱儲存的資料)
Run.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID
Run.displayName Run.runId
Run.attributes Run.facets (請參閱「儲存的資料」)
Run.startTime eventTime
Run.endTime eventTime
Run.state eventType
LineageEvent.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID/lineageEvents/HASH_OF_JOB_RUN_INPUT_OUTPUTS_OF_EVENT (例如 projects/11111111/locations/us/processes/1234/runs/4321/lineageEvents/111-222-333)
LineageEvent.EventLinks.source 輸入 (fqn 是命名空間和名稱的串連)
LineageEvent.EventLinks.target 輸出 (fqn 是命名空間和名稱的串連)
LineageEvent.startTime eventTime
LineageEvent.endTime eventTime
requestId 由方法使用者定義

匯入 OpenLineage 事件

如果尚未設定 OpenLineage,請參閱「開始使用」。

如要將 OpenLineage 事件匯入 Dataplex Universal Catalog,請呼叫 REST API 方法 ProcessOpenLineageRunEvent

POST https://datalineage.googleapis.com/v1/projects/{project}/locations/{location}:processOpenLineageRunEvent \
--data '{"eventTime":"2023-04-04T13:21:16.098Z","eventType":"COMPLETE","inputs":[{"name":"somename","namespace":"somenamespace"}],"job":{"name":"somename","namespace":"somenamespace"},"outputs":[{"name":"somename","namespace":"somenamespace"}],"producer":"someproducer","run":{"runId":"somerunid"},"schemaURL":"https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"}'

傳送 OpenLineage 訊息的工具

如要簡化將事件傳送至 Data Lineage API 的程序,可以使用各種工具和程式庫:

  • Google Cloud Java Producer 程式庫:Google 提供開放原始碼 Java 程式庫,協助建構 OpenLineage 事件並傳送至 Data Lineage API。詳情請參閱「資料歷程的 Producer Java 程式庫現已開放原始碼」網誌文章。您可以在 GitHubMaven 取得這個程式庫。
  • OpenLineage GCP 傳輸:對於以 Java 為基礎的 OpenLineage 產生器,系統提供專用的 GcpLineage 傳輸。這個套件可減少將事件傳送至 Data Lineage API 時所需的程式碼,簡化與 Data Lineage API 的整合。GcpLineageTransport 可設定為任何現有 OpenLineage 生產者 (例如 Airflow、Spark 和 Flink) 的事件接收器。如需更多資訊和範例,請參閱 GcpLineage

分析 OpenLineage 的資訊

如要分析匯入的 OpenLineage 事件,請參閱「在 Dataplex Universal Catalog UI 中查看歷程圖」。

儲存的資料量

Data Lineage API 不會儲存 OpenLineage 訊息中的所有構面資料。Data Lineage API 會儲存下列層面欄位:

  • spark_version
    • openlineage-spark-version
    • spark-version
  • 所有 spark.logicalPlan.*
  • environment-properties (自訂 Google Cloud 沿襲層面)
    • origin.sourcetypeorigin.name
    • spark.app.id
    • spark.app.name
    • spark.batch.id
    • spark.batch.uuid
    • spark.cluster.name
    • spark.cluster.region
    • spark.job.id
    • spark.job.uuid
    • spark.project.id
    • spark.query.node.name
    • spark.session.id
    • spark.session.uuid

Data Lineage API 會儲存下列資訊:

  • eventTime
  • run.runId
  • job.namespace
  • job.name

後續步驟