OpenLineage 是一个用于收集和分析数据沿袭信息的开放平台。OpenLineage 使用沿袭数据的开放标准,从使用 OpenLineage API 报告运行、作业和数据集的数据流水线组件捕获沿袭事件。
通过 Data Lineage API,您可以导入 OpenLineage 事件,以便在 Dataplex Universal Catalog 网页界面中与来自Google Cloud 服务(例如 BigQuery、Cloud Composer、Cloud Data Fusion 和 Dataproc)的沿袭信息一起显示。
如需导入使用 OpenLineage 规范的 OpenLineage 事件,请使用 ProcessOpenLineageRunEvent REST API 方法,并将 OpenLineage 分面映射到 Data Lineage API 属性。
限制
Data Lineage API 支持 OpenLineage 主要版本 1。
Data Lineage API 端点
ProcessOpenLineageRunEvent仅充当 OpenLineage 消息的使用者,而不是生产者。借助该 API,您可以将任何符合 OpenLineage 标准的工具或系统生成的沿袭信息发送到 Dataplex Universal Catalog。某些 Google Cloud 服务(例如 Dataproc 和 Cloud Composer)包含内置的 OpenLineage 生产者,这些生产者可以将事件发送到此端点,从而自动捕获这些服务的沿袭信息。Data Lineage API 不支持以下情况:
- 任何后续的 OpenLineage 版本(如果消息格式发生更改)
DatasetEventJobEvent
单个消息的大小上限为 5 MB。
输入和输出中每个完全限定名称的长度上限为 4,000 个字符。
链接按包含 100 个链接的事件进行分组。链接总数上限为 1,000。
Dataplex Universal Catalog 会为每次作业运行显示一个沿袭图,其中显示沿袭事件的输入和输出。它不支持较低级层的进程,例如 Spark 阶段。
OpenLineage 映射
REST API 方法 ProcessOpenLineageRunEvent 会将 OpenLineage 属性映射到 Data Lineage API 属性,如下所示:
| Data Lineage API 属性 | OpenLineage 属性 |
|---|---|
| Process.name | projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME |
| Process.displayName | Job.namespace + ":" + Job.name |
| Process.attributes | Job.facets(请参阅存储的数据) |
| Run.name | projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID |
| Run.displayName | Run.runId |
| Run.attributes | Run.facets(请参阅存储的数据) |
| Run.startTime | eventTime |
| Run.endTime | eventTime |
| Run.state | eventType |
| LineageEvent.name | projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID/lineageEvents/HASH_OF_JOB_RUN_INPUT_OUTPUTS_OF_EVENT(例如,projects/11111111/locations/us/processes/1234/runs/4321/lineageEvents/111-222-333) |
| LineageEvent.EventLinks.source | 输入(fqn 是命名空间和名称的串联) |
| LineageEvent.EventLinks.target | 输出(fqn 是命名空间和名称的串联) |
| LineageEvent.startTime | eventTime |
| LineageEvent.endTime | eventTime |
| requestId | 由方法用户定义 |
导入 OpenLineage 事件
如果您尚未设置 OpenLineage,请参阅使用入门。
如需将 OpenLineage 事件导入 Dataplex Universal Catalog,请调用 REST API 方法 ProcessOpenLineageRunEvent:
POST https://datalineage.googleapis.com/v1/projects/{project}/locations/{location}:processOpenLineageRunEvent \
--data '{"eventTime":"2023-04-04T13:21:16.098Z","eventType":"COMPLETE","inputs":[{"name":"somename","namespace":"somenamespace"}],"job":{"name":"somename","namespace":"somenamespace"},"outputs":[{"name":"somename","namespace":"somenamespace"}],"producer":"someproducer","run":{"runId":"somerunid"},"schemaURL":"https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"}'
用于发送 OpenLineage 消息的工具
为了简化向 Data Lineage API 发送事件的过程,您可以使用各种工具和库:
- Google Cloud Java Producer 库:Google 提供了一个开源 Java 库,可帮助构建 OpenLineage 事件并将其发送到 Data Lineage API。如需了解详情,请参阅博文 Producer java library for Data Lineage is now open source(数据沿袭的 Producer Java 库现已开源)。 该库可在 GitHub 和 Maven 上获取。
- OpenLineage GCP 传输:对于基于 Java 的 OpenLineage 生产者,可以使用专用 GcpLineage 传输。它通过最大限度地减少向 Data Lineage API 发送事件所需的代码,简化了与 Data Lineage API 的集成。
GcpLineageTransport可配置为任何现有 OpenLineage 生产者的事件接收器,例如 Airflow、Spark 和 Flink。如需了解详情和示例,请参阅 GcpLineage。
分析来自 OpenLineage 的信息
如需分析导入的 OpenLineage 事件,请参阅在 Dataplex Universal Catalog 界面中查看沿袭图。
数据存储
Data Lineage API 不会存储 OpenLineage 消息中的所有分面数据。Data Lineage API 会存储以下分面字段:
spark_versionopenlineage-spark-versionspark-version
- 所有
spark.logicalPlan.* environment-properties(自定义 Google Cloud 沿袭分面)- “
origin.sourcetype”和“origin.name” spark.app.idspark.app.namespark.batch.idspark.batch.uuidspark.cluster.namespark.cluster.regionspark.job.idspark.job.uuidspark.project.idspark.query.node.namespark.session.idspark.session.uuid
- “
Data Lineage API 会存储以下信息:
eventTimerun.runIdjob.namespacejob.name
后续步骤
- 不妨了解 Dataproc Serverless 和 Dataproc Hive 集成。
- 您可以在互动式实验中尝试此功能:使用数据沿袭和 OpenLineage 捕获和探索数据更新