Integración con OpenLineage

En este documento, se explica cómo integrar OpenLineage con Knowledge Catalog (antes Dataplex Universal Catalog) para importar y visualizar el linaje de datos de sistemas externos. Al actuar como consumidor de OpenLineage con la API de REST de ProcessOpenLineageRunEvent, Knowledge Catalog te permite unificar el linaje de canalizaciones personalizadas junto con el linaje integrado de los servicios de Google Cloud .

Descripción general

OpenLineage es una plataforma abierta para recopilar y analizar información sobre el linaje de datos. OpenLineage, que usa un estándar abierto para los datos de linaje, captura eventos de linaje de los componentes de la canalización de datos que usan una API de OpenLineage para informar sobre ejecuciones, trabajos y conjuntos de datos.

A través de la API de Data Lineage, puedes importar eventos de OpenLineage para mostrarlos en la interfaz web de Knowledge Catalog junto con la información de linaje de los servicios deGoogle Cloud , como BigQuery, Managed Service para Apache Airflow, Cloud Data Fusion y Managed Service para Apache Spark.

Para importar eventos de OpenLineage que usan la especificación de OpenLineage, usa el método de la ProcessOpenLineageRunEvent API de REST y asigna facetas de OpenLineage a atributos de la API de Data Lineage.

Limitaciones de la integración de OpenLineage

  • Versiones compatibles: La API de Data Lineage admite la versión principal 1 de OpenLineage.

  • Acciones de la API: El extremo de API de Data Lineage ProcessOpenLineageRunEvent solo actúa como consumidor de mensajes de OpenLineage, no como productor. La API te permite enviar información de linaje generada por cualquier herramienta o sistema compatible con OpenLineage a Knowledge Catalog. Algunos Google Cloud servicios, como Managed Service para Apache Spark y Managed Airflow, incluyen productores integrados de OpenLineage que pueden enviar eventos a este extremo, lo que automatiza la captura del linaje de esos servicios.

  • Funciones no compatibles: La API de Data Lineage no admite lo siguiente:

    • Cualquier versión posterior de OpenLineage con cambios en el formato de los mensajes
    • DatasetEvent
    • JobEvent
  • Tamaño del mensaje: El tamaño máximo de un solo mensaje es de 5 MB.

  • Longitud del nombre: La longitud de cada nombre completo en las entradas y salidas se limita a 4,000 caracteres.

  • Límites de vínculos: Los vínculos se agrupan por eventos, con un máximo de 100 vínculos por evento. La cantidad máxima agregada de vínculos a nivel de la tabla es de 1,000. Si un mensaje contiene más de 1,500 vínculos a nivel de la columna, se omite la información a nivel de la columna.

  • Alcance del gráfico: Knowledge Catalog muestra un gráfico de linaje para cada ejecución de trabajo, en el que se muestran las entradas y salidas de los eventos de linaje. No admite procesos de nivel inferior, como las etapas de Spark.

Asignación de atributos de facetas de OpenLineage

Para obtener información sobre la asignación de OpenLineage, consulta Asignación de OpenLineage.

Importa un evento de OpenLineage

Si aún no configuraste OpenLineage, consulta la guía de introducción.

Para importar un evento de OpenLineage a Knowledge Catalog, llama al método de la API ProcessOpenLineageRunEvent.

C#

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración para C# que encontrarás en la guía de inicio rápido de Data Lineage sobre cómo usar bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Data Lineage C#.

Para autenticarte en Data Lineage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

using Google.Cloud.DataCatalog.Lineage.V1;
using Google.Protobuf.WellKnownTypes;

public sealed partial class GeneratedLineageClientSnippets
{
    /// <summary>Snippet for ProcessOpenLineageRunEvent</summary>
    /// <remarks>
    /// This snippet has been automatically generated and should be regarded as a code template only.
    /// It will require modifications to work:
    /// - It may require correct/in-range values for request initialization.
    /// - It may require specifying regional endpoints when creating the service client as shown in
    ///   https://cloud.google.com/dotnet/docs/reference/help/client-configuration#endpoint.
    /// </remarks>
    public void ProcessOpenLineageRunEventRequestObject()
    {
        // Create client
        LineageClient lineageClient = LineageClient.Create();
        // Initialize request argument(s)
        ProcessOpenLineageRunEventRequest request = new ProcessOpenLineageRunEventRequest
        {
            Parent = "",
            OpenLineage = new Struct(),
        };
        // Make the request
        ProcessOpenLineageRunEventResponse response = lineageClient.ProcessOpenLineageRunEvent(request);
    }
}

Go

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración para Go que encontrarás en la guía de inicio rápido de Data Lineage sobre cómo usar bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Data Lineage Go.

Para autenticarte en Data Lineage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


//go:build examples

package main

import (
	"context"

	lineage "cloud.google.com/go/datacatalog/lineage/apiv1"
	lineagepb "cloud.google.com/go/datacatalog/lineage/apiv1/lineagepb"
)

func main() {
	ctx := context.Background()
	// This snippet has been automatically generated and should be regarded as a code template only.
	// It will require modifications to work:
	// - It may require correct/in-range values for request initialization.
	// - It may require specifying regional endpoints when creating the service client as shown in:
	//   https://pkg.go.dev/cloud.google.com/go#hdr-Client_Options
	c, err := lineage.NewClient(ctx)
	if err != nil {
		// TODO: Handle error.
	}
	defer c.Close()

	req := &lineagepb.ProcessOpenLineageRunEventRequest{
		// TODO: Fill request struct fields.
		// See https://pkg.go.dev/cloud.google.com/go/datacatalog/lineage/apiv1/lineagepb#ProcessOpenLineageRunEventRequest.
	}
	resp, err := c.ProcessOpenLineageRunEvent(ctx, req)
	if err != nil {
		// TODO: Handle error.
	}
	// TODO: Use resp.
	_ = resp
}

Java

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración para Java que encontrarás en la guía de inicio rápido de Data Lineage sobre cómo usar bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Data Lineage Java.

Para autenticarte en Data Lineage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.cloud.datacatalog.lineage.v1.LineageClient;
import com.google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEventRequest;
import com.google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEventResponse;
import com.google.protobuf.Struct;

public class SyncProcessOpenLineageRunEvent {

  public static void main(String[] args) throws Exception {
    syncProcessOpenLineageRunEvent();
  }

  public static void syncProcessOpenLineageRunEvent() throws Exception {
    // This snippet has been automatically generated and should be regarded as a code template only.
    // It will require modifications to work:
    // - It may require correct/in-range values for request initialization.
    // - It may require specifying regional endpoints when creating the service client as shown in
    // https://cloud.google.com/java/docs/setup#configure_endpoints_for_the_client_library
    try (LineageClient lineageClient = LineageClient.create()) {
      ProcessOpenLineageRunEventRequest request =
          ProcessOpenLineageRunEventRequest.newBuilder()
              .setParent("parent-995424086")
              .setOpenLineage(Struct.newBuilder().build())
              .setRequestId("requestId693933066")
              .build();
      ProcessOpenLineageRunEventResponse response =
          lineageClient.processOpenLineageRunEvent(request);
    }
  }
}

Python

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración para Python que encontrarás en la guía de inicio rápido de Data Lineage sobre cómo usar bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Data Lineage Python.

Para autenticarte en Data Lineage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

# This snippet has been automatically generated and should be regarded as a
# code template only.
# It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
#   client as shown in:
#   https://googleapis.dev/python/google-api-core/latest/client_options.html
from google.cloud import datacatalog_lineage_v1


def sample_process_open_lineage_run_event():
    # Create a client
    client = datacatalog_lineage_v1.LineageClient()

    # Initialize request argument(s)
    request = datacatalog_lineage_v1.ProcessOpenLineageRunEventRequest(
        parent="parent_value",
    )

    # Make the request
    response = client.process_open_lineage_run_event(request=request)

    # Handle the response
    print(response)

Ruby

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración para Ruby que encontrarás en la guía de inicio rápido de Data Lineage sobre cómo usar bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Data Lineage Ruby.

Para autenticarte en Data Lineage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

require "google/cloud/data_catalog/lineage/v1"

##
# Snippet for the process_open_lineage_run_event call in the Lineage service
#
# This snippet has been automatically generated and should be regarded as a code
# template only. It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
# client as shown in https://cloud.google.com/ruby/docs/reference.
#
# This is an auto-generated example demonstrating basic usage of
# Google::Cloud::DataCatalog::Lineage::V1::Lineage::Client#process_open_lineage_run_event.
#
def process_open_lineage_run_event
  # Create a client object. The client can be reused for multiple calls.
  client = Google::Cloud::DataCatalog::Lineage::V1::Lineage::Client.new

  # Create a request. To set request fields, pass in keyword arguments.
  request = Google::Cloud::DataCatalog::Lineage::V1::ProcessOpenLineageRunEventRequest.new

  # Call the process_open_lineage_run_event method.
  result = client.process_open_lineage_run_event request

  # The returned object is of type Google::Cloud::DataCatalog::Lineage::V1::ProcessOpenLineageRunEventResponse.
  p result
end

REST

Para importar un evento de OpenLineage, usa el método processOpenLineageRunEvent.

Antes de usar cualquiera de los datos de solicitud a continuación, realiza los siguientes reemplazos:

  • PROJECT_ID: Es el ID del proyecto de Google Cloud .
  • LOCATION_ID: La ubicación Google Cloud , como us-central1.

Método HTTP y URL:

POST https://datalineage.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID:processOpenLineageRunEvent

Cuerpo JSON de la solicitud:

{
  "eventTime": "2023-04-04T13:21:16.098Z",
  "eventType": "COMPLETE",
  "inputs": [
    {
      "name": "somename",
      "namespace": "customnamespace"
    }
  ],
  "job": {
    "name": "somename",
    "namespace": "customnamespace"
  },
  "outputs": [
    {
      "name": "somename",
      "namespace": "customnamespace"
    }
  ],
  "producer": "someproducer",
  "run": {
    "runId": "somerunid"
  },
  "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
}

Para enviar tu solicitud, expande una de estas opciones:

Deberías recibir una respuesta JSON similar a la que se muestra a continuación:

{
  "process": "projects/my-project/locations/us-central1/processes/my-process",
  "run": "projects/my-project/locations/us-central1/processes/my-process/runs/my-run",
  "lineageEvents": [
    "projects/my-project/locations/us-central1/processes/my-process/runs/my-run/lineageEvents/my-lineage-event"
  ]
}

Herramientas para enviar mensajes de OpenLineage

Para simplificar el envío de eventos a la API de Data Lineage, puedes usar varias herramientas y bibliotecas:

  • Bibliotecas cliente de Google Cloud para Data Lineage: Google proporciona bibliotecas cliente para interactuar con la API de Data Lineage de forma programática. Si deseas obtener instrucciones de instalación, consulta Bibliotecas cliente.
  • Biblioteca de Java Producer de Google Cloud: Google proporciona una biblioteca de Java de código abierto para ayudar a construir y enviar eventos de OpenLineage a la API de Data Lineage. Para obtener más información, consulta la entrada de blog La biblioteca de Java del productor para Data Lineage ahora es de código abierto. La biblioteca está disponible en GitHub y Maven.
  • Transporte de OpenLineage para GCP: Para los productores de OpenLineage basados en Java, hay disponible un transporte de GcpLineage dedicado. Simplifica la integración con la API de Data Lineage, ya que minimiza el código necesario para enviar eventos a la API de Data Lineage. El GcpLineageTransport se puede configurar como el receptor de eventos para cualquier productor de OpenLineage existente, como Airflow, Spark y Flink. Para obtener más información y ejemplos, consulta GcpLineage.

Analiza la información de OpenLineage

Para analizar los eventos de OpenLineage importados, consulta Cómo ver gráficos de linaje en la IU de Knowledge Catalog.

Datos de facetas de OpenLineage almacenados

La API de Data Lineage no almacena todos los datos de las facetas de los mensajes de OpenLineage. La API de Data Lineage almacena los siguientes campos de facetas:

  • spark_version
    • openlineage-spark-version
    • spark-version
  • todos los spark.logicalPlan.*
  • environment-properties (faceta de linaje Google Cloud personalizada)
    • origin.sourcetype y origin.name
    • spark.app.id
    • spark.app.name
    • spark.batch.id
    • spark.batch.uuid
    • spark.cluster.name
    • spark.cluster.region
    • spark.job.id
    • spark.job.uuid
    • spark.project.id
    • spark.query.node.name
    • spark.session.id
    • spark.session.uuid

La API de Data Lineage almacena la siguiente información:

  • eventTime
  • run.runId
  • job.namespace
  • job.name

¿Qué sigue?