이 문서에서는 프로젝트 또는 클러스터 수준에서 Dataproc Spark 작업에 대해 데이터 계보를 사용 설정하는 방법을 설명합니다.
데이터 계보는 시스템을 통해 데이터가 이동하는 방식, 즉 데이터의 출처, 데이터가 전달되는 위치, 데이터에 적용되는 변환을 추적할 수 있는 Dataplex Universal Catalog 기능입니다.
데이터 계보는 SparkR 및 Spark 스트리밍 작업을 제외한 모든 Dataproc Spark 작업에 사용할 수 있으며, BigQuery 및 Cloud Storage 데이터 소스를 지원합니다. 이 기능은 Compute Engine 2.0.74+, 2.1.22+, 2.2.50, 이상 이미지 버전의 Dataproc에 포함되어 있습니다.
Dataproc 클러스터에서 이 기능을 사용 설정하면 Dataproc Spark 작업이 데이터 계보 이벤트를 캡처하고 이를 Dataplex Universal Catalog Data Lineage API에 게시합니다. Dataproc은 OpenLineage Spark 플러그인을 사용하여 OpenLineage를 통해 Data Lineage API와 통합됩니다.
다음을 사용하여 Dataplex Universal Catalog를 통해 데이터 계보 정보에 액세스할 수 있습니다.
시작하기 전에
Google Cloud 콘솔의 프로젝트 선택기 페이지에서 계보를 추적할 Dataproc 클러스터가 포함된 프로젝트를 선택합니다.
Data Lineage API를 사용 설정합니다.
필요한 역할
기본 VM 서비스 계정을 사용하여 Dataproc 클러스터를 만드는 경우 데이터 계보를 사용 설정하는 Dataproc Worker
역할이 있습니다. 추가 작업은 필요하지 않습니다.
하지만 커스텀 서비스 계정을 사용하는 Dataproc 클러스터를 만드는 경우 클러스터에서 데이터 계보를 사용 설정하려면 다음 단락에 설명된 대로 커스텀 서비스 계정에 필요한 역할을 부여해야 합니다.
Dataproc에서 데이터 계보를 사용하는 데 필요한 권한을 얻으려면 관리자에게 클러스터의 커스텀 서비스 계정에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.
- 다음 역할 중 하나를 부여합니다.
-
Dataproc 작업자(
roles/dataproc.worker
) -
데이터 계보 편집자(
roles/datalineage.editor
) -
데이터 계보 생성자(
roles/datalineage.producer
) -
데이터 계보 관리자(
roles/datalineage.admin
)
-
Dataproc 작업자(
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.
프로젝트 수준에서 Spark 데이터 계보 사용 설정
프로젝트 수준에서 Spark 데이터 계보를 사용 설정할 수 있습니다. 프로젝트에서 데이터 계보가 사용 설정되면 이후 생성된 클러스터에서 실행되는 지원되는 Spark 작업도 데이터 계보가 사용 설정됩니다. 프로젝트 수준에서 데이터 계보를 사용 설정하기 전에 생성된 기존 클러스터에서 실행되는 작업에는 데이터 계보가 사용 설정되지 않습니다.
프로젝트 수준에서 Spark 데이터 계보 사용 설정
프로젝트 수준에서 Spark 데이터 계보를 사용 설정하려면 다음 커스텀 프로젝트 메타데이터를 설정합니다.
키 | 값 |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
DATAPROC_LINEAGE_ENABLED
메타데이터를 false
로 설정하여 프로젝트 수준에서 Spark 데이터 계보를 사용 중지할 수 있습니다.
클러스터 수준에서 Spark 데이터 계보 사용 설정
클러스터에 제출된 모든 지원되는 Spark 작업에 데이터 계보가 사용 설정되도록 클러스터를 만들 때 Spark 데이터 계보를 사용 설정할 수 있습니다.
클러스터 수준에서 Spark 데이터 계보 사용 설정
클러스터에서 Spark 데이터 계보를 사용 설정하려면 dataproc:dataproc.lineage.enabled
클러스터 속성을 true
로 설정하여 Dataproc 클러스터를 만듭니다.
2.0 이미지 버전 클러스터: Spark 데이터 계보에 Dataproc 클러스터 VM 액세스 cloud-platform
범위가 필요합니다. 이미지 버전 2.1 이상으로 생성된 Dataproc 이미지 버전 클러스터에는 cloud-platform
이 사용 설정됩니다. 클러스터를 만들 때 Dataproc 이미지 버전 2.0
을 지정할 경우 범위를 cloud-platform
으로 설정합니다.
gcloud CLI 예시:
gcloud dataproc clusters create CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--properties 'dataproc:dataproc.lineage.enabled=true'
작업에서 Spark 데이터 계보 사용 중지
클러스터 수준에서 Spark 데이터 계보를 사용 설정할 경우에는 작업을 제출할 때 빈 값("")을 사용해서 spark.extraListeners
속성을 전달하여 특정 작업에서 Spark 데이터 계보를 사용 중지할 수 있습니다.
사용 설정된 후에는 클러스터에서 Spark 데이터 계보를 사용 중지할 수 없습니다. 모든 클러스터 작업에서 Spark 데이터 계보를 없애려면 dataproc:dataproc.lineage.enabled
속성 없이 클러스터를 다시 만들면 됩니다.
Spark 작업 제출
Spark 데이터 계보가 사용 설정된 상태로 생성된 Dataproc 클러스터에서 Spark 작업을 제출하면 Dataproc이 데이터 계보 정보를 캡처하고 Data Lineage API에 보고합니다.
gcloud dataproc jobs submit spark \
--cluster=CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME
참고:
- 작업을 고유하게 식별하는 데 사용되는
spark.openlineage.namespace
및spark.openlineage.appName
속성 추가는 선택사항입니다. 이러한 속성을 추가하지 않으면 Dataproc에 다음 기본값이 사용됩니다.spark.openlineage.namespace
기본값: PROJECT_IDspark.openlineage.appName
기본값:spark.app.name
Dataplex Universal Catalog에서 계보 보기
계보 그래프에는 프로젝트 리소스와 이를 만든 프로세스 간의 관계가 표시됩니다. Google Cloud 콘솔에서 데이터 계보 정보를 보거나 JSON 데이터 형식으로 Data Lineage API에서 검색할 수 있습니다.
PySpark 예시 코드:
다음 PySpark 작업은 공개 BigQuery 테이블에서 데이터를 읽고 출력을 기존 BigQuery 데이터 세트의 새 테이블에 씁니다. 이는 임시 스토리지용 Cloud Storage 버킷을 사용합니다.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
다음을 바꿉니다.
BUCKET: 기존 Cloud Storage 버킷의 이름입니다.
PROJECT_ID, DATASET, TABLE: 프로젝트 ID, 기존 BigQuery 데이터 세트의 이름, 데이터 세트에 만들 새 테이블의 이름을 삽입합니다(테이블이 없어야 함).
Dataplex Universal Catalog UI에서 계보 그래프를 볼 수 있습니다.
다음 단계
- 데이터 계보 자세히 알아보기