Apache Spark용 서버리스에서 데이터 계보 사용

이 문서에서는 프로젝트, 배치 워크로드 또는 대화형 세션 수준에서Google Cloud Apache Spark용 서버리스 배치 워크로드 및 대화형 세션에 데이터 계보를 사용 설정하는 방법을 설명합니다.

개요

데이터 계보는 시스템을 통해 데이터가 이동하는 방식, 즉 데이터의 출처, 데이터가 전달되는 위치, 데이터에 적용되는 변환을 추적할 수 있는 Dataplex Universal Catalog 기능입니다.

Google Cloud Apache Spark용 서버리스 워크로드 및 세션은 계보 이벤트를 캡처하여 Dataplex Universal Catalog Data Lineage API에 게시합니다. Apache Spark용 서버리스는 OpenLineage Spark 플러그인을 사용하여 OpenLineage를 통해 Data Lineage API와 통합됩니다.

계보 그래프Data Lineage API를 사용하면 Dataplex Universal Catalog를 통해 계보 정보에 액세스할 수 있습니다. 자세한 내용은 Dataplex Universal Catalog에서 계보 그래프 보기를 참고하세요.

가용성

BigQuery 및 Cloud Storage 데이터 소스를 지원하는 데이터 계보는 지원되는 Apache Spark용 서버리스 런타임 버전으로 실행되는 워크로드 및 세션에 사용할 수 있으며 다음과 같은 예외 및 제한사항이 적용됩니다.

  • SparkR 또는 Spark 스트리밍 워크로드나 세션에는 데이터 계보를 사용할 수 없습니다.

시작하기 전에

  1. Google Cloud 콘솔의 프로젝트 선택기 페이지에서 Apache Spark용 서버리스 워크로드 또는 세션에 사용할 프로젝트를 선택합니다.

    프로젝트 선택기로 이동

  2. Data Lineage API를 사용 설정합니다.

    API 사용 설정

    예정된 Spark 데이터 계보 변경사항 추가 프로젝트, 일괄 워크로드 또는 대화형 세션 설정 없이 데이터 계보 API를 사용 설정할 때 (서비스의 계보 수집 제어 참고) Spark 데이터 계보를 프로젝트, 일괄 워크로드, 대화형 세션에서 자동으로 사용할 수 있도록 하는 변경사항에 관한 공지는 Apache Spark용 서버리스 출시 노트를 참고하세요.

필요한 역할

일괄 워크로드가 기본 Apache Spark용 서버리스 서비스 계정을 사용하는 경우 데이터 계보에 필요한 권한이 포함된 Dataproc Worker 역할이 있습니다.

하지만 일괄 워크로드에서 커스텀 서비스 계정을 사용하여 데이터 계보를 사용 설정하는 경우 데이터 계보에 필요한 권한이 포함된 다음 단락에 나열된 역할 중 하나를 커스텀 서비스 계정에 부여해야 합니다.

Dataproc에서 데이터 계보를 사용하는 데 필요한 권한을 얻으려면 관리자에게 배치 워크로드 커스텀 서비스 계정에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.

역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

Spark 데이터 계보 사용 설정

프로젝트, 일괄 워크로드 또는 대화형 세션에 Spark 데이터 계보를 사용 설정할 수 있습니다.

프로젝트 수준에서 데이터 계보 사용 설정

프로젝트 수준에서 Spark 데이터 계보를 사용 설정하면 일괄 워크로드 또는 대화형 세션에서 실행되는 후속 Spark 작업에 Spark 데이터 계보가 사용 설정됩니다.

프로젝트에서 Spark 데이터 계보를 사용 설정하려면 다음 커스텀 프로젝트 메타데이터를 설정합니다.

DATAPROC_LINEAGE_ENABLED true

DATAPROC_LINEAGE_ENABLED 메타데이터를 false로 설정하여 프로젝트의 Spark 데이터 계보를 사용 중지할 수 있습니다.

Spark 일괄 워크로드에 데이터 계보 사용 설정

일괄 워크로드에서 데이터 계보를 사용 설정하려면 워크로드를 제출할 때 spark.dataproc.lineage.enabled 속성을 true로 설정합니다. 이 설정은 프로젝트 수준의 Spark 데이터 계보 설정을 재정의합니다. 프로젝트 수준에서 Spark 데이터 계보가 사용 중지되었지만 일괄 워크로드에 사용 설정된 경우 일괄 워크로드 설정이 우선합니다.

이 예시에서는 gcloud CLI를 사용하여 Spark 계보가 사용 설정된 일괄 lineage-example.py 워크로드를 제출합니다.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

다음 lineage-example.py 코드는 공개 BigQuery 테이블에서 데이터를 읽고 출력을 기존 BigQuery 데이터 세트의 새 테이블에 씁니다. 이는 임시 스토리지용 Cloud Storage 버킷을 사용합니다.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

다음을 바꿉니다.

  • REGION: 워크로드를 실행할 리전
  • BUCKET: 종속 항목을 저장할 기존 Cloud Storage 버킷의 이름
  • PROJECT_ID, DATASET, TABLE: 프로젝트 ID, 기존 BigQuery 데이터 세트의 이름, 데이터 세트에 만들 새 테이블의 이름 (테이블이 없어야 함)

Dataplex Universal Catalog UI에서 계보 그래프를 볼 수 있습니다.

Spark 계보 그래프

Spark 대화형 세션에 데이터 계보 사용 설정

Spark 대화형 세션에서 데이터 계보를 사용 설정하려면 세션 또는 세션 템플릿을 만들 때 spark.dataproc.lineage.enabled 속성을 true로 설정합니다. 이 설정은 프로젝트 수준의 Spark 데이터 계보 설정을 재정의합니다. 프로젝트 수준에서 Spark 데이터 계보가 사용 중지되었지만 Interactive 세션에서 사용 설정된 경우 Interactive 세션 설정이 우선 적용됩니다.

다음 PySpark 노트북 코드는 Spark 데이터 계보가 사용 설정된 Apache Spark용 서버리스 대화형 세션을 구성합니다. 그런 다음 공개 BigQuery Shakespeare 데이터 세트에서 단어 수 쿼리를 실행하고 기존 BigQuery 데이터 세트의 새 테이블에 출력을 쓰는 Spark Connect 세션을 만듭니다 (BigQuery Studio 노트북에서 Spark 세션 만들기 참고) .

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

다음을 바꿉니다.

  • PROJECT_ID, DATASET, TABLE: 프로젝트 ID, 기존 BigQuery 데이터 세트의 이름, 데이터 세트에 만들 새 테이블의 이름 (테이블이 없어야 함)

BigQuery 탐색기 페이지의 탐색 창에 나열된 대상 테이블 이름을 클릭한 다음 테이블 세부정보 창에서 계보 탭을 선택하여 데이터 계보 그래프를 볼 수 있습니다.

Spark 계보 그래프

Dataplex Universal Catalog에서 계보 보기

계보 그래프에는 프로젝트 리소스와 이를 만든 프로세스 간의 관계가 표시됩니다. Google Cloud 콘솔에서 데이터 계보 정보를 보거나 Data Lineage API에서 JSON 데이터로 정보를 가져올 수 있습니다.

다음 단계