OpenLineage 映射

Data Lineage API 可以从与 OpenLineage(一种用于沿袭信息收集的开放标准)集成的系统中注入沿袭信息。当您使用 ProcessOpenLineageRunEvent 方法将 OpenLineage 格式的事件发送到 Data Lineage API 时,Data Lineage API 会将 OpenLineage 消息中的属性映射到 Data Lineage API 中的相应属性。

本文档提供了这些映射的参考表格。

属性映射

ProcessOpenLineageRunEvent REST API 方法会将 OpenLineage 属性映射到 Data Lineage API 属性,如下所示:

Data Lineage API 属性 OpenLineage 属性
Process.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME
Process.displayName Job.namespace + ":" + Job.name
Process.attributes Job.facets(请参阅存储的数据
Run.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID
Run.displayName Run.runId
Run.attributes Run.facets(请参阅存储的数据
Run.startTime eventTime
Run.endTime eventTime
Run.state eventType
LineageEvent.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID/lineageEvents/HASH_OF_JOB_RUN_INPUT_OUTPUTS_OF_EVENT(例如,projects/11111111/locations/us/processes/1234/runs/4321/lineageEvents/111-222-333)
LineageEvent.EventLinks.source 输入(fqn 是命名空间和名称的串联)
LineageEvent.EventLinks.target 输出(fqn 是命名空间和名称的串联)
LineageEvent.startTime eventTime
LineageEvent.endTime eventTime
requestId 由方法用户定义

FQN 映射

下表提供了各种系统的 OpenLineage 命名空间和名称对示例,以及它们对应的 Dataplex Universal Catalog 完全限定名称 (FQN):

系统 OpenLineage 命名空间 OpenLineage 名称 Dataplex Universal Catalog FQN
Athena awsathena://athena.{region_name}.amazonaws.com
  • {catalog}
  • {catalog}.{database}
  • {catalog}.{database}.{table}
  • athena:{catalogId}.{region}
  • athena:{catalogId}.{region}.{databaseId}
  • athena:{catalogId}.{region}.{databaseId}.{tableId}
AWS Glue arn:aws:glue:{region}:{account id} table/{database name}/{table name} aws_glue:table:{region}.{account id}.{database name}.{table name}
Azure Cosmos DB azurecosmos://{host}/dbs/{database} colls/{table}
  • cosmos-db:{host}.{database}
  • cosmos-db:{host}.{database}.{table}
Azure Data Explorer azurekusto://{host}.kusto.windows.net {database}/{table}
  • kusto:{host}.{region}.{database}
  • kusto:{host}.{region}.{database}.{table}
Azure Synapse sqlserver://{host}:{port}
  • {database}
  • {database}.{schema}
  • {database}.{schema}.{table}
  • sqlserver:{hostWithPort}.{databaseId}
  • sqlserver:{hostWithPort}.{databaseId}.{schemaId}
  • sqlserver:{hostWithPort}.{databaseId}.{schemaId}.{tableId}
BigQuery bigquery
  • {project id}.{dataset name}
  • {project id}.{dataset name}.{table name}
  • bigquery:{projectId}.{datasetId}
  • bigquery:{projectId}.{datasetId}.{assetId}
Cassandra cassandra://{host}:{port}
  • {keyspace}
  • {keyspace}.{table}
  • cassandra:{hostWithPort}.{keyspaceId}
  • cassandra:{hostWithPort}.{keyspaceId}.{tableId}
MySQL mysql://{host}:{port}
  • {database}
  • {database}.{table}
  • mysql:{hostWithPort}.{databaseId}
  • mysql:{hostWithPort}.{databaseId}.{tableId}
CrateDB crate://{host}:{port} {database}.{schema}.{table} 不支持
DB2 db2://{host}:{port}
  • {database}
  • {database}.{schema}
  • {database}.{schema}.{table}
  • db2:{dns}.{databaseId}
  • db2:{dns}.{databaseId}.{schemaId}
  • db2:{dns}.{databaseId}.{schemaId}.{tableId}
Hive hive://{host}:{port} {database}.{table} 不支持
MSSQL mssql://{host}:{port} {database}.{schema}.{table} 不支持
OceanBase oceanbase://{host}:{port} {database}.{table} 不支持
Oracle oracle://{host}:{port} {serviceName}.{schema}.{table} or {sid}.{schema}.{table}
  • oracle:{hostWithPort}.{databaseId}
  • oracle:{hostWithPort}.{databaseId}.{schemaId}
  • oracle:{hostWithPort}.{databaseId}.{schemaId}.{tableId}
Postgres postgres://{host}:{port}
  • {database}
  • {database}.{schema}
  • {database}.{schema}.{table}
  • postgresql:{hostWithPort}.{databaseId}
  • postgresql:{hostWithPort}.{databaseId}.{schemaId}
  • postgresql:{hostWithPort}.{databaseId}.{schemaId}.{tableId}
TeraData teradata://{host}:{port} {database}.{table} 不支持
Redshift redshift://{cluster_identifier}.{region_name}:{port}
  • {database}
  • {database}.{schema}
  • {database}.{schema}.{table}
  • redshift:{clusterId}.{region}.{port}.{databaseId}
  • redshift:{clusterId}.{region}.{port}.{databaseId}.{schemaId}
  • redshift:{clusterId}.{region}.{port}.{databaseId}.{schemaId}.{tableId}
Snowflake snowflake://{organization name}-{account name} or snowflake://{account-locator}(.{compliance})(.{cloud_region_id})(.{cloud})
  • {database}
  • {database}.{schema}
  • {database}.{schema}.{table}
  • snowflake:{accountName}.{databaseId}
  • snowflake:{accountName}.{databaseId}.{schemaId}
  • snowflake:{accountName}.{databaseId}.{schemaId}.{tableId}
Spanner spanner://{projectId}:{instanceId} {database}.{schema}.{table} Dataplex Universal Catalog 支持,但数据沿袭不支持
Trino trino://{host}:{port}
  • {catalog}
  • {catalog}.{schema}
  • {catalog}.{schema}.{table}
  • trino:{hostWithPort}.{catalogId}
  • trino:{hostWithPort}.{catalogId}.{schemaId}
  • trino:{hostWithPort}.{catalogId}.{schemaId}.{tableId}
ABFSS (Azure Data Lake Gen2) abfss://{container name}@{service name}.dfs.core.windows.net {path}
  • abs:{serviceName}.{containerName}
  • abs:{serviceName}.{containerName}.{path}
DBFS(Databricks 文件系统) dbfs://{workspace name} {path}
  • dbfs:{workspace}
  • dbfs:{workspace}.{path}
Cloud Storage gs://{bucket name} {object key}
  • gcs:{bucketName}
  • gcs:{bucketName}.{virtualPath}
HDFS hdfs://{namenode host}:{namenode port} {path}
  • hdfs:{namenodeHostWithPort}
  • hdfs:{namenodeHostWithPort}.{path}
Kafka kafka://{bootstrap server host}:{port} {topic} kafka:{serverHostWithPort}.{topicId}
本地文件系统 file {path} filesystem:localhost.{path}
远程文件系统 file://{host} {path} filesystem:{hostWithPort}.{path}
S3 s3://{bucket name} {object key}
  • s3:{bucketName}
  • s3:{bucketName}.{objectKey}

命名空间前缀 s3as3n 也可接受,并会转换为 s3
WASBS (Azure Blob Storage) wasbs://{container name}@{service name}.dfs.core.windows.net {object key}
  • abs:{serviceName}.{containerName}
  • abs:{serviceName}.{containerName}.{objectKey}
Pub/Sub 主题 pubsub topic:{projectId}:{topicId} pubsub:topic:{projectId}.{topicId}
Pub/Sub 订阅 pubsub subscription:{projectId}:{subscriptionId} pubsub:subscription:{projectId}.{subscriptionId}

其他可接受的格式

虽然 OpenLineage 未针对以下系统定义标准 namespace/name 对,但 Data Lineage API 接受以如下表中所述格式设置的这些系统的沿袭事件。在 OpenLineage 消息中引用且命名空间为 custom 的资源会被解读为自定义的完全限定名称。

系统 OpenLineage 命名空间 OpenLineage 名称 Dataplex Universal Catalog FQN
自定义 FQN custom {some reference} custom:{someReference}
Dataproc Metastore dataproc_metastore
  • dataproc_metastore:{projectId}.{location}.{instanceId}
  • dataproc_metastore:{projectId}.{location}.{instanceId}.{databaseId}
  • dataproc_metastore:{projectId}.{location}.{instanceId}.{databaseId}.{tableId}
  • dataproc_metastore:{projectId}.{location}.{instanceId}
  • dataproc_metastore:{projectId}.{location}.{instanceId}.{databaseId}
  • dataproc_metastore:{projectId}.{location}.{instanceId}.{databaseId}.{tableId}

后续步骤