OpenLineage는 데이터 계보 정보를 수집하고 분석할 수 있는 개방형 플랫폼입니다. 계보 데이터의 개방형 표준을 사용하는 OpenLineage는 OpenLineage API를 사용하여 실행, 작업, 데이터 세트를 보고하는 데이터 파이프라인 구성요소에서 계보 이벤트를 캡처합니다.
Data Lineage API를 통해 OpenLineage 이벤트를 가져와 BigQuery, Cloud Composer, Cloud Data Fusion, Dataproc과 같은Google Cloud 서비스의 계보 정보와 함께 Dataplex Universal Catalog 웹 인터페이스에 표시할 수 있습니다.
OpenLineage 사양을 사용하는 OpenLineage 이벤트를 가져오려면 ProcessOpenLineageRunEvent REST API 메서드를 사용하고 OpenLineage 패싯을 Data Lineage API 속성에 매핑합니다.
제한사항
Data Lineage API는 OpenLineage 주 버전 1을 지원합니다.
Data Lineage API 엔드포인트
ProcessOpenLineageRunEvent는 OpenLineage 메시지의 프로듀서가 아닌 소비자 역할만 합니다. 이 API를 사용하면 OpenLineage를 준수하는 도구나 시스템에서 생성된 계보 정보를 Dataplex Universal Catalog로 전송할 수 있습니다. Dataproc, Cloud Composer와 같은 일부 Google Cloud 서비스에는 이벤트를 이 엔드포인트로 보내 해당 서비스의 계보를 자동으로 캡처할 수 있는 기본 제공 OpenLineage 생성자가 포함되어 있습니다.Data Lineage API는 다음을 지원하지 않습니다.
- 메시지 형식이 변경된 모든 이후 OpenLineage 출시 버전
DatasetEventJobEvent
단일 메시지 최대 크기는 5MB입니다.
입력 및 출력의 각 정규화된 이름 길이는 4,000자(영문 기준)로 제한됩니다.
링크는 링크 100개가 있는 이벤트별로 그룹화됩니다. 링크의 최대 집계 수는 1,000개입니다.
Dataplex Universal Catalog는 계보 이벤트의 입력과 출력을 보여주는 각 작업 실행의 계보 그래프를 표시합니다. Spark 스테이지와 같은 하위 프로세스는 지원되지 않습니다.
OpenLineage 매핑
REST API 메서드 ProcessOpenLineageRunEvent는 다음과 같이 OpenLineage 속성을 Data Lineage API 속성에 매핑합니다.
| Data Lineage API 속성 | OpenLineage 속성 |
|---|---|
| Process.name | projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME |
| Process.displayName | Job.namespace + ":" + Job.name |
| Process.attributes | Job.facets(저장된 데이터 참조) |
| Run.name | projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID |
| Run.displayName | Run.runId |
| Run.attributes | Run.facets(저장된 데이터 참조) |
| Run.startTime | eventTime |
| Run.endTime | eventTime |
| Run.state | eventType |
| LineageEvent.name | projects/PROJECT_NUMBER/locations/LOCATION/processes/HASH_OF_NAMESPACE_AND_NAME/runs/HASH_OF_RUNID/lineageEvents/HASH_OF_JOB_RUN_INPUT_OUTPUTS_OF_EVENT(예: projects/11111111/locations/us/processes/1234/runs/4321/lineageEvents/111-222-333) |
| LineageEvent.EventLinks.source | inputs(fqn은 네임스페이스 및 이름 연결임) |
| LineageEvent.EventLinks.target | outputs(fqn은 네임스페이스 및 이름 연결임) |
| LineageEvent.startTime | eventTime |
| LineageEvent.endTime | eventTime |
| requestId | 메서드 사용자가 정의함 |
OpenLineage 이벤트 가져오기
아직 OpenLineage를 설정하지 않았으면 시작하기를 참조하세요.
OpenLineage 이벤트를 Dataplex Universal Catalog로 가져오려면 REST API 메서드 ProcessOpenLineageRunEvent를 호출합니다.
POST https://datalineage.googleapis.com/v1/projects/{project}/locations/{location}:processOpenLineageRunEvent \
--data '{"eventTime":"2023-04-04T13:21:16.098Z","eventType":"COMPLETE","inputs":[{"name":"somename","namespace":"somenamespace"}],"job":{"name":"somename","namespace":"somenamespace"},"outputs":[{"name":"somename","namespace":"somenamespace"}],"producer":"someproducer","run":{"runId":"somerunid"},"schemaURL":"https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"}'
OpenLineage 메시지 전송 도구
Data Lineage API로 이벤트를 간단하게 전송하려면 다양한 도구와 라이브러리를 사용하면 됩니다.
- Google Cloud Java 프로듀서 라이브러리: Google은 OpenLineage 이벤트를 구성하고 Data Lineage API로 전송하는 데 도움이 되는 오픈소스 Java 라이브러리를 제공합니다. 자세한 내용은 데이터 계보용 프로듀서 Java 라이브러리가 이제 오픈소스입니다 블로그 게시물을 참조하세요. 이 라이브러리는 GitHub 및 Maven에서 제공됩니다.
- OpenLineage GCP 전송: Java 기반 OpenLineage 프로듀서의 경우 전용 GcpLineage 전송을 사용할 수 있습니다. Data Lineage API로 이벤트를 전송하는 데 필요한 코드를 최소화하여 Data Lineage API와의 통합을 간소화합니다.
GcpLineageTransport는 Airflow, Spark, Flink와 같은 기존 OpenLineage 프로듀서의 이벤트 싱크로 구성될 수 있습니다. 자세한 내용과 예시는 GcpLineage를 참조하세요.
OpenLineage의 정보 분석
가져온 OpenLineage 이벤트를 분석하려면 Dataplex Universal Catalog UI에서 계보 그래프 보기를 참조하세요.
저장된 데이터
Data Lineage API는 OpenLineage 메시지의 모든 패싯 데이터를 저장하지 않습니다. Data Lineage API는 다음 패싯 필드를 저장합니다.
spark_versionopenlineage-spark-versionspark-version
- 모든
spark.logicalPlan.* environment-properties(커스텀 Google Cloud 계보 패싯)origin.sourcetype및origin.namespark.app.idspark.app.namespark.batch.idspark.batch.uuidspark.cluster.namespark.cluster.regionspark.job.idspark.job.uuidspark.project.idspark.query.node.namespark.session.idspark.session.uuid
Data Lineage API는 다음 정보를 저장합니다.
eventTimerun.runIdjob.namespacejob.name
다음 단계
- Dataproc Serverless 및 Dataproc Hive 통합 확인하기