Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird beschrieben, wie Sie mit Cloud Composer 2 Serverless for Apache Spark-Arbeitslasten in Google Cloudausführen.
In den Beispielen in den folgenden Abschnitten wird gezeigt, wie Sie Operatoren zum Verwalten von Serverless for Apache Spark-Batcharbeitslasten verwenden. Sie verwenden diese Operatoren in DAGs, die eine Serverless for Apache Spark-Batcharbeitslast erstellen, löschen, auflisten und abrufen:
DAGs für Operatoren erstellen, die mit Serverless for Apache Spark-Batcharbeitslasten arbeiten:
DAGs erstellen, die benutzerdefinierte Container und Dataproc Metastore verwenden.
Persistent History Server für diese DAGs konfigurieren.
Hinweis
Aktivieren Sie die Dataproc API:
Console
Aktivieren Sie die Dataproc API.
Erforderliche Rollen zum Aktivieren von APIs
Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (
roles/serviceusage.serviceUsageAdmin), die die Berechtigungserviceusage.services.enableenthält. Rollen zuweisen.gcloud
Aktivieren Sie die Dataproc API:
Erforderliche Rollen zum Aktivieren von APIs
Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (
roles/serviceusage.serviceUsageAdmin), die die Berechtigungserviceusage.services.enableenthält. Rollen zuweisen.gcloud services enable dataproc.googleapis.com
Wählen Sie den Speicherort für die Batcharbeitslastdatei aus. Sie haben folgende Möglichkeiten:
- Erstellen Sie einen Cloud Storage-Bucket, in dem diese Datei gespeichert wird.
- Verwenden Sie den Bucket Ihrer Umgebung. Da Sie diese Datei nicht mit Airflow synchronisieren müssen, können Sie einen separaten Unterordner außerhalb der Ordner
/dagsoder/dataerstellen. Beispiel:/batches. - Verwenden Sie einen vorhandenen Bucket.
Dateien und Airflow-Variablen einrichten
In diesem Abschnitt wird gezeigt, wie Sie Dateien einrichten und Airflow-Variablen für diese Anleitung konfigurieren.
Serverless for Apache Spark-ML-Arbeitslastdatei in einen Bucket hochladen
Die Arbeitslast in dieser Anleitung führt ein PySpark-Skript aus:
Speichern Sie ein beliebiges PySpark-Skript in einer lokalen Datei mit dem Namen
spark-job.py. Sie können beispielsweise das PySpark-Beispielskript verwenden.Laden Sie die Datei hoch an den Speicherort, den Sie unter Hinweis ausgewählt haben.
Airflow-Variablen festlegen
In den Beispielen in den folgenden Abschnitten werden Airflow-Variablen verwendet. Sie legen Werte für diese Variablen in Airflow fest. Anschließend kann Ihr DAG-Code auf diese Werte zugreifen.
In den Beispielen in dieser Anleitung werden die folgenden Airflow-Variablen verwendet. Sie können sie je nach verwendetem Beispiel nach Bedarf festlegen.
Legen Sie die folgenden Airflow-Variablen für die Verwendung in Ihrem DAG-Code fest:
project_id: Projekt-ID.bucket_name: URI eines Buckets, in dem sich die Haupt-Python-Datei der Arbeitslast (spark-job.py) befindet. Sie haben diesen Speicherort unter Hinweis ausgewählt.phs_cluster: Name des Persistent History Server-Clusters. Sie legen diese Variable fest wenn Sie einen Persistent History Server erstellen.image_name: Name und Tag des benutzerdefinierten Container-Images (image:tag). Sie legen diese Variable fest, wenn Sie ein benutzerdefiniertes Container-Image mit DataprocCreateBatchOperator verwenden.metastore_cluster: Name des Dataproc Metastore-Dienstes. Sie legen diese Variable fest, wenn Sie den Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden.region_name: Region, in der sich der Dataproc Metastore-Dienst befindet. Sie legen diese Variable fest, wenn Sie den Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden.
Jede Airflow-Variable über die Google Cloud Console und die Airflow-UI festlegen
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Link Airflow für Ihre Umgebung. Die Airflow-UI wird geöffnet.
Wählen Sie in der Airflow-UI Admin > Variablen aus.
Klicken Sie auf Neuen Eintrag hinzufügen.
Geben Sie den Namen der Variablen im Feld Schlüssel an und legen Sie den Wert im Feld Wert fest.
Klicken Sie auf Speichern.
Persistent History Server erstellen
Mit einem Persistent History Server (PHS) können Sie Spark-Verlaufsdateien Ihrer Batcharbeitslasten ansehen:
- Erstellen Sie einen Persistent History Server.
- Achten Sie darauf, dass Sie den Namen des PHS-Clusters in der
phs_clusterAirflow-Variablen angegeben haben.
DataprocCreateBatchOperator
Der folgende DAG startet eine Serverless for Apache Spark-Batcharbeitslast.
Weitere Informationen zu den DataprocCreateBatchOperator Argumenten finden Sie im
Quellcode des Operators.
Weitere Informationen zu Attributen, die Sie im batch
Parameter von DataprocCreateBatchOperator übergeben können, finden Sie in der
Beschreibung der Batch-Klasse.
Benutzerdefiniertes Container-Image mit DataprocCreateBatchOperator verwenden
Das folgende Beispiel zeigt, wie Sie ein benutzerdefiniertes Container-Image verwenden, um Ihre Arbeitslasten auszuführen. Sie können beispielsweise einen benutzerdefinierten Container verwenden, um Python-Abhängigkeiten hinzuzufügen, die nicht im Standard-Container-Image enthalten sind.
So verwenden Sie ein benutzerdefiniertes Container-Image:
Erstellen Sie ein benutzerdefiniertes Container-Image und laden Sie es in Container Registry hoch.
Geben Sie das Image in der
image_nameAirflow-Variablen an.Verwenden Sie DataprocCreateBatchOperator mit Ihrem benutzerdefinierten Image:
Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden
So verwenden Sie einen Dataproc Metastore-Dienst aus einem DAG:
Prüfen Sie, ob Ihr Metastore-Dienst bereits gestartet wurde.
Informationen zum Starten eines Metastore-Dienstes finden Sie unter Dataproc Metastore aktivieren und deaktivieren.
Ausführliche Informationen zum Batch-Operator zum Erstellen der Konfiguration finden Sie unter PeripheralsConfig.
Sobald der Metastore-Dienst ausgeführt wird, geben Sie seinen Namen in der
metastore_clusterVariablen und seine Region in derregion_nameAirflow-Variablen an.Verwenden Sie den Metastore-Dienst in DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Mit DataprocDeleteBatchOperator können Sie einen Batch anhand der Batch-ID der Arbeitslast löschen.
DataprocListBatchesOperator
DataprocDeleteBatchOperator listet Batches auf, die in einer bestimmten `project_id` und Region vorhanden sind.
DataprocGetBatchOperator
DataprocGetBatchOperator ruft eine bestimmte Batcharbeitslast ab.