Der Dataproc Spark Connect-Client ist ein Wrapper des Apache Spark Connect-Clients. Damit können Anwendungen über das Spark Connect-Protokoll mit einer Remote-Sitzung von Serverless for Apache Spark kommunizieren. In diesem Dokument wird beschrieben, wie Sie den Client installieren, konfigurieren und verwenden.
Hinweise
Sie benötigen die IAM-Rollen (Identity and Access Management), die die Berechtigungen zum Verwalten interaktiver Sitzungen und Sitzungsvorlagen enthalten.
Wenn Sie den Client außerhalb von Google Cloudausführen, müssen Sie Authentifizierungsdaten angeben. Legen Sie die Umgebungsvariable
GOOGLE_APPLICATION_CREDENTIALSauf den Pfad Ihrer Dienstkontoschlüsseldatei fest.
Client installieren oder deinstallieren
Sie können das dataproc-spark-connect-Paket mit pip installieren oder deinstallieren.
Installieren
Führen Sie den folgenden Befehl aus, um die neueste Version des Clients zu installieren:
pip install -U dataproc-spark-connect
Deinstallieren
Führen Sie den folgenden Befehl aus, um den Client zu deinstallieren:
pip uninstall dataproc-spark-connect
Client konfigurieren
Geben Sie das Projekt und die Region für Ihre Sitzung an. Sie können diese Werte mit Umgebungsvariablen oder mit der Builder API in Ihrem Code festlegen.
Umgebungsvariablen
Legen Sie die Umgebungsvariablen GOOGLE_CLOUD_PROJECT und GOOGLE_CLOUD_REGION fest.
Builder API
Verwenden Sie die Methoden .projectId() und .location().
spark = DataprocSparkSession.builder.projectId("my-project").location("us-central1").getOrCreate()
Spark-Sitzung starten
Wenn Sie eine Spark-Sitzung starten möchten, fügen Sie Ihrer PySpark-Anwendung oder Ihrem Notebook die erforderlichen Importe hinzu und rufen Sie dann die DataprocSparkSession.builder.getOrCreate() API auf.
Importieren Sie die Klasse
DataprocSparkSession.Rufen Sie die Methode
getOrCreate()auf, um die Sitzung zu starten.from google.cloud.dataproc_spark_connect import DataprocSparkSession spark = DataprocSparkSession.builder.getOrCreate()
Spark-Attribute konfigurieren
Wenn Sie Spark-Attribute konfigurieren möchten, verketten Sie eine oder mehrere .config()-Methoden mit dem Builder.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.config('spark.executor.memory', '48g').config('spark.executor.cores', '8').getOrCreate()
Erweiterte Konfiguration verwenden
Für die erweiterte Konfiguration verwenden Sie die Klasse Session, um Einstellungen wie das Subnetz oder die Laufzeitversion anzupassen.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session_config = Session()
session_config.environment_config.execution_config.subnetwork_uri = 'SUBNET'
session_config.runtime_config.version = '3.0'
spark = DataprocSparkSession.builder.projectId('my-project').location('us-central1').dataprocSessionConfig(session_config).getOrCreate()
Benannte Sitzung wiederverwenden
Mit benannten Sitzungen können Sie eine einzelne Spark-Sitzung für mehrere Notebooks freigeben und so wiederholte Verzögerungen beim Starten von Sitzungen vermeiden.
Erstellen Sie in Ihrem ersten Notebook eine Sitzung mit einer benutzerdefinierten ID.
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(1, 'data')], ['id', 'value']) df.show()In einem anderen Notebook können Sie die Sitzung wiederverwenden, indem Sie dieselbe Sitzungs-ID angeben.
from google.cloud.dataproc_spark_connect import DataprocSparkSession session_id = 'my-ml-pipeline-session' spark = DataprocSparkSession.builder.dataprocSessionId(session_id).getOrCreate() df = spark.createDataFrame([(2, 'more-data')], ['id', 'value']) df.show()
Sitzungs-IDs müssen 4 bis 63 Zeichen lang sein, mit einem Kleinbuchstaben beginnen und dürfen nur Kleinbuchstaben, Ziffern und Bindestriche enthalten. Die ID darf nicht mit einem Bindestrich enden. Eine Sitzung mit einer ID im Status TERMINATED kann nicht wiederverwendet werden.
Spark SQL-Magic-Befehle verwenden
Das Paket unterstützt die sparksql-magic-Bibliothek zum Ausführen von Spark SQL-Abfragen in Jupyter-Notebooks. Magic-Befehle sind eine optionale Funktion.
Installieren Sie die erforderlichen Abhängigkeiten.
pip install IPython sparksql-magicLaden Sie die Magic-Erweiterung.
%load_ext sparksql_magicOptional: Konfigurieren Sie die Standardeinstellungen.
%config SparkSql.limit=20SQL-Abfragen ausführen.
%%sparksql SELECT * FROM your_table
Wenn Sie erweiterte Optionen verwenden möchten, fügen Sie dem %%sparksql-Befehl Flags hinzu. Wenn Sie beispielsweise Ergebnisse im Cache speichern und eine Ansicht erstellen möchten, führen Sie den folgenden Befehl aus:
%%sparksql --cache --view result_view df
SELECT * FROM your_table WHERE condition = true
Folgende Optionen sind verfügbar:
--cacheoder-c: Der DataFrame wird im Cache gespeichert.--eageroder-e: Caches mit Eager Loading.--view VIEWoder-v VIEW: Erstellt eine temporäre Ansicht.--limit Noder-l N: Überschreibt das Standardlimit für die Zeilenanzeige.variable_name: Speichert das Ergebnis in einer Variablen.