Datenherkunft mit Serverless for Apache Spark verwenden

In diesem Dokument wird beschrieben, wie Sie die Datenherkunft für Google Cloud Serverless for Apache Spark-Batcharbeitslasten und interaktive Sitzungen auf Projekt-, Batcharbeitslast- oder interaktiver Sitzungs-Ebene aktivieren.

Übersicht

Die Datenherkunft ist eine Dataplex Universal Catalog Funktion, mit der Sie verfolgen können, wie sich Daten durch Ihre Systeme bewegen – woher sie kommen, wohin sie übergeben werden und welche Transformationen auf sie angewendet werden.

Google Cloud Serverless for Apache Spark-Arbeitslasten und -Sitzungen erfassen Lineage-Ereignisse und veröffentlichen sie in der Dataplex Universal Catalog Data Lineage API. Serverless for Apache Spark wird über OpenLineage in die Data Lineage API eingebunden. Dazu wird das OpenLineage Spark-Plug-in verwendet.

Sie können über Dataplex Universal Catalog auf Lineage-Informationen zugreifen. Dazu verwenden Sie Lineage-Diagramme und die Data Lineage API. Weitere Informationen finden Sie unter Lineage-Diagramme in Dataplex Universal Catalog ansehen.

Verfügbarkeit, Funktionen und Einschränkungen

Die Datenherkunft, die BigQuery- und Cloud Storage Datenquellen unterstützt, ist für Arbeitslasten und Sitzungen verfügbar, die mit den Serverless for Apache Spark-Laufzeitversionen 1.2, 2.2, 2.3, und 3.0, ausgeführt werden. Es gelten die folgenden Ausnahmen und Einschränkungen:

  • Die Datenherkunft ist für SparkR- oder Spark-Streaming-Arbeitslasten oder -Sitzungen nicht verfügbar.

Hinweis

  1. Wählen Sie auf der Projektauswahlseite in der Google Cloud Console das Projekt aus, das Sie für Ihre Serverless for Apache Spark-Arbeitslasten oder -Sitzungen verwenden möchten.

    Zur Projektauswahl

  2. Aktivieren Sie die Data Lineage API.

    APIs aktivieren

Erforderliche Rollen

Wenn Ihre Batcharbeitslast das Standard-Serverless for Apache Spark-Dienstkontoverwendet, hat sie die Rolle Dataproc Worker, die die Datenherkunft ermöglicht. Keine zusätzliche Aktion ist notwendig.

Wenn Ihre Batcharbeitslast jedoch ein benutzerdefiniertes Dienstkonto verwendet, um die Datenherkunft zu aktivieren, müssen Sie dem benutzerdefinierten Dienstkonto eine erforderliche Rolle zuweisen, wie im folgenden Absatz beschrieben.

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für das benutzerdefinierte Dienstkonto Ihrer Batcharbeitslast zuzuweisen, um die Berechtigungen zu erhalten, die Sie zur Verwendung der Datenherkunft mit Dataproc benötigen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Datenherkunft auf Projektebene aktivieren

Sie können die Datenherkunft auf Projektebene aktivieren. Wenn sie auf Projektebene aktiviert ist, ist die Spark-Lineage für alle nachfolgenden Batcharbeitslasten und interaktiven Sitzungen aktiviert, die Sie im Projekt ausführen.

Datenherkunft auf Projektebene aktivieren

Legen Sie die folgenden benutzerdefinierten Projektmetadaten fest, um die Datenherkunft auf Projektebene zu aktivieren. set the following custom project metadata.

Schlüssel Wert
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Sie können die Datenherkunft auf Projektebene deaktivieren, indem Sie die DATAPROC_LINEAGE_ENABLED Metadaten auf false setzen.

Datenherkunft für eine Spark-Batcharbeitslast aktivieren

Sie können die Datenherkunft für eine Batcharbeitslast aktivieren, indem Sie die spark.dataproc.lineage.enabled Eigenschaft auf true setzen, wenn Sie die Arbeitslast senden.

Beispiel für eine Batcharbeitslast

In diesem Beispiel wird eine Batcharbeitslast lineage-example.py mit aktivierter Spark-Lineage gesendet.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py liest Daten aus einer öffentlichen BigQuery Tabelle und schreibt die Ausgabe dann in eine neue Tabelle in einem vorhandenen BigQuery Dataset. Ein Cloud Storage-Bucket wird für die temporäre Speicherung verwendet.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

Ersetzen Sie die folgenden Werte:

  • REGION: Wählen Sie eine Region aus, in der Ihre Arbeitslast ausgeführt werden soll.

  • BUCKET: Der Name eines vorhandenen Cloud Storage-Buckets zum Speichern von Abhängigkeiten.

  • PROJECT_ID, DATASET, und TABLE: Fügen Sie Ihre Projekt-ID, den Namen eines vorhandenen BigQuery-Datasets und den Namen einer neuen Tabelle ein, die im Dataset erstellt werden soll (die Tabelle darf nicht vorhanden sein).

Sie können das Lineage-Diagramm in der Dataplex Universal Catalog-UI ansehen.

Spark-Herkunftsdiagramm

Datenherkunft für eine interaktive Spark-Sitzung aktivieren

Sie können die Datenherkunft für eine interaktive Spark-Sitzung aktivieren, indem Sie die spark.dataproc.lineage.enabled Eigenschaft auf true setzen, wenn Sie die Sitzung oder Sitzungsvorlage erstellen.

Beispiel für eine interaktive Sitzung

Der folgende PySpark-Notebook-Code konfiguriert eine interaktive Serverless for Apache Spark -Sitzung mit aktivierter Spark-Datenherkunft. Anschließend wird eine Spark Connect -Sitzung erstellt, in der eine Wordcount-Abfrage für ein öffentliches BigQuery -Shakespeare-Dataset ausgeführt und die Ausgabe in eine neue Tabelle in einem vorhandenen BigQuery-Dataset geschrieben wird.

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Ersetzen Sie die folgenden Werte:

  • PROJECT_ID, DATASET, und TABLE: Fügen Sie Ihre Projekt-ID, den Namen eines vorhandenen BigQuery-Datasets und den Namen einer neuen Tabelle ein, die im Dataset erstellt werden soll (die Tabelle darf nicht vorhanden sein).

Sie können das Datenherkunftsdiagramm aufrufen, indem Sie im Navigationsbereich auf der Seite Explorer von BigQuery auf den Namen der Zieltabelle klicken und dann im Bereich mit den Tabellendetails den Tab „Lineage“ auswählen.

Spark-Herkunftsdiagramm

Lineage in Dataplex Universal Catalog ansehen

Ein Lineage-Diagramm zeigt Beziehungen zwischen Ihren Projekt ressourcen und den Prozessen, mit denen sie erstellt wurden. Sie können Informationen zur Datenherkunft in der Google Cloud Console ansehen oder die Informationen als JSON-Daten aus der Data Lineage API abrufen.

Nächste Schritte