Intégrer à OpenLineage

OpenLineage est une plate-forme ouverte permettant de collecter et d'analyser des informations sur la traçabilité des données. OpenLineage utilise une norme ouverte pour les données de traçabilité. Il capture les événements de traçabilité à partir des composants du pipeline de données qui utilisent une API OpenLineage pour générer des rapports sur les exécutions, les jobs et les ensembles de données.

Grâce à l'API Data Lineage, vous pouvez importer des événements OpenLineage pour les afficher dans l'interface Web Dataplex Universal Catalog, à côté des informations de traçabilité des servicesGoogle Cloud , tels que BigQuery, Cloud Composer, Cloud Data Fusion et Dataproc.

Pour importer des événements OpenLineage qui utilisent la spécification OpenLineage, utilisez la méthode d'API REST ProcessOpenLineageRunEvent et mappez les facettes OpenLineage aux attributs de l'API Data Lineage.

Limites

  • L'API Data Lineage est compatible avec la version 1 d'OpenLineage.

  • Le point de terminaison de l'API Data Lineage ProcessOpenLineageRunEvent ne sert que de consommateur de messages OpenLineage, et non de producteur. L'API vous permet d'envoyer des informations de traçabilité générées par n'importe quel outil ou système compatible avec OpenLineage dans Dataplex Universal Catalog. Certains services Google Cloud , tels que Dataproc et Cloud Composer, incluent des producteurs OpenLineage intégrés qui peuvent envoyer des événements à ce point de terminaison, ce qui automatise la capture de l'héritage à partir de ces services.

  • L'API Data Lineage n'est pas compatible avec les éléments suivants :

    • Toute version ultérieure d'OpenLineage avec des modifications du format de message
    • DatasetEvent
    • JobEvent
  • La taille maximale d'un message est de 5 Mo.

  • La longueur de chaque nom complet dans les entrées et les sorties est limitée à 4 000 caractères.

  • Les liens sont regroupés par événements avec 100 liens. Le nombre total maximal de liens est de 1 000.

  • Dataplex Universal Catalog affiche un graphique de traçabilité pour chaque exécution de job, qui indique les entrées et les sorties des événements de traçabilité. Il n'est pas compatible avec les processus de niveau inférieur, comme les étapes Spark.

Mappage OpenLineage

La méthode d'API REST ProcessOpenLineageRunEvent mappe les attributs OpenLineage aux attributs de l'API Data Lineage comme suit :

Attributs de l'API Data Lineage Attributs OpenLineage
Process.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME
Process.displayName Job.namespace + ":" + Job.name
Process.attributes Job.facets (voir Données stockées)
Run.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID
Run.displayName Run.runId
Run.attributes Run.facets (voir Données stockées)
Run.startTime eventTime
Run.endTime eventTime
Run.state eventType
LineageEvent.name projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID/lineageEvents/HASH_OF_JOB_RUN_INPUT_OUTPUTS_OF_EVENT (par exemple, projects/11111111/locations/us/processes/1234/runs/4321/lineageEvents/111-222-333)
LineageEvent.EventLinks.source inputs (fqn est la concaténation de l'espace de noms et du nom)
LineageEvent.EventLinks.target sorties (fqn est la concaténation de l'espace de noms et du nom)
LineageEvent.startTime eventTime
LineageEvent.endTime eventTime
requestId Défini par l'utilisateur de la méthode

Importer un événement OpenLineage

Si vous n'avez pas encore configuré OpenLineage, consultez Premiers pas.

Pour importer un événement OpenLineage dans Dataplex Universal Catalog, appelez la méthode de l'API REST ProcessOpenLineageRunEvent :

POST https://datalineage.googleapis.com/v1/projects/{project}/locations/{location}:processOpenLineageRunEvent \
--data '{"eventTime":"2023-04-04T13:21:16.098Z","eventType":"COMPLETE","inputs":[{"name":"somename","namespace":"somenamespace"}],"job":{"name":"somename","namespace":"somenamespace"},"outputs":[{"name":"somename","namespace":"somenamespace"}],"producer":"someproducer","run":{"runId":"somerunid"},"schemaURL":"https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"}'

Outils pour envoyer des messages OpenLineage

Pour simplifier l'envoi d'événements à l'API Data Lineage, vous pouvez utiliser différents outils et bibliothèques :

  • Bibliothèque de producteurs Java Google Cloud : Google fournit une bibliothèque Java Open Source pour vous aider à créer et à envoyer des événements OpenLineage à l'API Data Lineage. Pour en savoir plus, consultez l'article de blog La bibliothèque Java Producer pour la traçabilité des données est désormais Open Source. La bibliothèque est disponible sur GitHub et Maven.
  • Transport OpenLineage GCP : un transport GcpLineage dédié est disponible pour les producteurs OpenLineage basés sur Java. Elle simplifie l'intégration à l'API Data Lineage en minimisant le code nécessaire pour envoyer des événements à l'API Data Lineage. GcpLineageTransport peut être configuré comme récepteur d'événements pour tout producteur OpenLineage existant, tel qu'Airflow, Spark et Flink. Pour en savoir plus et obtenir des exemples, consultez GcpLineage.

Analyser les informations d'OpenLineage

Pour analyser les événements OpenLineage importés, consultez Afficher les graphiques de traçabilité dans l'UI Dataplex Universal Catalog.

Données stockées

L'API Data Lineage ne stocke pas toutes les données de facettes des messages OpenLineage. L'API Data Lineage stocke les champs de facette suivants :

  • spark_version
    • openlineage-spark-version
    • spark-version
  • toutes les spark.logicalPlan.*
  • environment-properties (facette de lignée personnalisée Google Cloud )
    • origin.sourcetype et origin.name
    • spark.app.id
    • spark.app.name
    • spark.batch.id
    • spark.batch.uuid
    • spark.cluster.name
    • spark.cluster.region
    • spark.job.id
    • spark.job.uuid
    • spark.project.id
    • spark.query.node.name
    • spark.session.id
    • spark.session.uuid

L'API Data Lineage stocke les informations suivantes :

  • eventTime
  • run.runId
  • job.namespace
  • job.name

Étape suivante