Dataproc Persistent History Server

Übersicht

Der Dataproc Persistent History Server (PHS) bietet Weboberflächen, um den Jobverlauf für Jobs aufzurufen, die in aktiven oder gelöschten Dataproc-Clustern ausgeführt werden. Er ist in der Dataproc Image-Version 1.5 und höher verfügbar und wird in einem Dataproc-Cluster mit einem Knoten ausgeführt. Er bietet Weboberflächen für die folgenden Dateien und Daten:

  • MapReduce- und Spark-Jobverlaufsdateien

  • Flink-Jobverlaufsdateien (Informationen zum Erstellen eines Dataproc-Clusters zum Ausführen von Flink-Jobs finden Sie unter Optionale Dataproc-Flink-Komponente )

  • Anwendungszeitachsendatendateien, die von YARN Timeline Service v2 erstellt und in einer Bigtable-Instanz gespeichert wurden

  • YARN-Aggregationslogs

Der Persistent History Server greift auf Spark- und MapReduce-Jobverlaufs dateien, Flink-Jobverlaufsdateien und YARN-Logdateien zu, die während der Lebensdauer von Dataproc-Jobclustern in Cloud Storage geschrieben wurden, und zeigt sie an.

Beschränkungen

  • Die Image-Version des PHS-Clusters und die Image-Version des/der Dataproc Jobclusters müssen übereinstimmen. Sie können beispielsweise einen PHS-Cluster mit der Dataproc 2.0-Image-Version verwenden, um Jobverlaufsdateien von Jobs aufzurufen, die in Jobclustern mit der Dataproc 2.0-Image-Version ausgeführt wurden und sich in dem Projekt befanden, in dem sich der PHS-Cluster befindet.

  • Ein PHS-Cluster unterstützt Kerberos und die persönliche Authentifizierung nicht.

Dataproc-PHS-Cluster erstellen

Sie können den folgenden gcloud dataproc clusters create Befehl in einem lokalen Terminal oder in Cloud Shell mit den folgenden Flags und Clusterattributen ausführen, um einen Dataproc Persistent History Server-Cluster mit einem Knoten zu erstellen.

gcloud dataproc clusters create CLUSTER_NAME \
    --project=PROJECT \
    --region=REGION \
    --single-node \
    --enable-component-gateway \
    --optional-components=COMPONENT \
    --properties=PROPERTIES
  • CLUSTER_NAME: Geben Sie den Namen des PHS-Clusters an.
  • PROJECT: Geben Sie das Projekt an, das mit dem PHS-Cluster verknüpft werden soll. Dieses Projekt sollte mit dem Projekt übereinstimmen, das mit dem Cluster verknüpft ist, in dem Ihre Jobs ausgeführt werden (siehe Dataproc-Jobcluster erstellen).
  • REGION: Geben Sie eine Compute Engine-Region an, in der sich der PHS-Cluster befinden soll.
  • --single-node: Ein PHS-Cluster ist ein Dataproc Cluster mit einem Knoten.
  • --enable-component-gateway: Mit diesem Flag werden die Weboberflächen des Component Gateway im PHS-Cluster aktiviert.
  • COMPONENT: Mit diesem Flag können Sie eine oder mehrere optionale Komponenten im Cluster installieren. Sie müssen die FLINK optionale Komponente angeben, um den Flink HistoryServer-Webdienst im PHS-Cluster auszuführen und Flink-Jobverlaufsdateien aufzurufen.
  • PROPERTIES: Geben Sie ein oder mehrere Clusterattribute an.
  • Optional können Sie das --image-version Flag hinzufügen, um die Image-Version des PHS-Clusters anzugeben. Die PHS-Image-Version muss mit der Image-Version des/der Dataproc-Jobclusters übereinstimmen. Weitere Informationen finden Sie unter Beschränkungen.

    Hinweise:

    • In den Beispielen für Attributwerte in diesem Abschnitt wird ein Sternchen (*) als Platzhalter verwendet, damit der PHS mehrere Verzeichnisse im angegebenen Bucket abgleichen kann, in die verschiedene Jobcluster geschrieben wurden. Weitere Informationen finden Sie unter Überlegungen zur Effizienz von Platzhaltern.
    • In den folgenden Beispielen werden separate --properties Flags verwendet, um die Lesbarkeit zu verbessern. Die empfohlene Vorgehensweise beim Erstellen eines Dataproc in Compute Engine Clusters mit gcloud dataproc clusters create besteht darin, ein --properties Flag zu verwenden, um eine Liste durch Kommas getrennter Attribute anzugeben (siehe Formatierung von Clusterattributen).

    Attribute :

    • yarn:yarn.nodemanager.remote-app-log-dir=gs://bucket-name/*/yarn-logs: Fügen Sie dieses Attribut hinzu, um den Cloud Storage-Speicherort anzugeben, an dem der PHS auf YARN-Logs zugreift, die von Jobclustern geschrieben werden.
    • spark:spark.history.fs.logDirectory=gs://bucket-name/*/spark-job-history: Fügen Sie dieses Attribut hinzu, um den nichtflüchtigen Spark-Jobverlauf zu aktivieren. Dieses Attribut gibt den Speicherort an, an dem der PHS auf Spark-Jobverlaufslogs zugreift, die von Jobclustern geschrieben wurden.

      In Dataproc 2.0 und höher-Clustern müssen auch die folgenden beiden Attribute festgelegt werden, um PHS-Spark-Verlaufslogs zu aktivieren (siehe Konfigurationsoptionen für den Spark-Verlaufsserver). Der spark.history.custom.executor.log.url Wert ist ein Literalwert, der enthält {{PLATZHALTER}} für Variablen, die vom Persistent History Server festgelegt werden. Diese Variablen werden nicht von Nutzern festgelegt. Übergeben Sie den Attributwert wie gezeigt.

      --properties=spark:spark.history.custom.executor.log.url.applyIncompleteApplication=false
      
      --properties=spark:spark.history.custom.executor.log.url={{YARN_LOG_SERVER_URL}}/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}
      

    • mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://bucket-name/*/mapreduce-job-history/done: Fügen Sie dieses Attribut hinzu, um den nichtflüchtigen MapReduce-Jobverlauf zu aktivieren. Dieses Attribut gibt den Cloud Storage-Speicherort an, an dem der PHS auf MapReduce-Jobverlaufslogs zugreift, die von Jobclustern geschrieben werden.

    • dataproc:yarn.atsv2.bigtable.instance=projects/project-id/instance_id/bigtable-instance-id: Nachdem Sie Yarn Timeline Service v2 konfiguriert haben, fügen Sie dieses Attribut hinzu, um den PHS-Cluster zu verwenden, um Zeitachsendaten auf den YARN Application Timeline Service V2 und Tez Weboberflächen aufzurufen (siehe Weboberflächen des Component Gateway).

    • flink:historyserver.archive.fs.dir=gs://bucket-name/*/flink-job-history/completed-jobs: Mit diesem Attribut können Sie den Flink HistoryServer so konfigurieren, dass er eine durch Kommas getrennte Liste von Verzeichnissen überwacht.

    Beispiele für Attribute :

    --properties=spark:spark.history.fs.logDirectory=gs://bucket-name/*/spark-job-history
    
    --properties=mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://bucket-name/*/mapreduce-job-history/done
    
    --properties=flink:flink.historyserver.archive.fs.dir=gs://bucket-name/*/flink-job-history/completed-jobs
    

Dataproc-Jobcluster erstellen

Sie können den folgenden Befehl in einem lokalen Terminal oder in Cloud Shell ausführen, um einen Dataproc-Jobcluster zu erstellen, der Jobs ausführt und Jobverlaufsdateien in einen Persistent History Server (PHS) schreibt.

gcloud dataproc clusters create CLUSTER_NAME \
    --project=PROJECT \
    --region=REGION \
    --optional-components=COMPONENT \
    --enable-component-gateway \
    --properties=PROPERTIES \
    other args ...
  • CLUSTER_NAME: Geben Sie den Namen des Jobclusters an.
  • PROJECT: Geben Sie das Projekt an, das mit dem Jobcluster verknüpft ist.
  • REGION: Geben Sie die Compute Engine-Region an, in der sich der Jobcluster befinden soll.
  • --enable-component-gateway: Mit diesem Flag werden die Weboberflächen des Component Gateway im Jobcluster aktiviert.
  • COMPONENT: Mit diesem Flag können Sie eine oder mehrere optionale Komponenten im Cluster installieren. Geben Sie die optionale Komponente FLINK an, um Flink-Jobs im Cluster auszuführen.
  • PROPERTIES: Fügen Sie eines oder mehrere der folgenden Clusterattribute hinzu, um nicht standardmäßige Cloud Storage-Speicherorte für den PHS und andere Jobclusterattribute festzulegen.

    Hinweise:

    • In den Beispielen für Attributwerte in diesem Abschnitt wird ein Sternchen (*) als Platzhalter verwendet, damit der PHS mehrere Verzeichnisse im angegebenen Bucket abgleichen kann, in die verschiedene Jobcluster geschrieben wurden. Weitere Informationen finden Sie unter Überlegungen zur Effizienz von Platzhaltern.
    • In den folgenden Beispielen werden separate --properties Flags verwendet, um die Lesbarkeit zu verbessern. Die empfohlene Vorgehensweise beim Erstellen eines Dataproc in Compute Engine Clusters mit gcloud dataproc clusters create besteht darin, ein --properties Flag zu verwenden, um eine Liste durch Kommas getrennter Attribute anzugeben (siehe Formatierung von Clusterattributen).

    Attribute :

    • yarn:yarn.nodemanager.remote-app-log-dir: Standardmäßig sind aggregierte YARN-Logs in Dataproc-Job clustern aktiviert und werden in den temporären Bucket des Clusters geschrieben. Fügen Sie dieses Attribut hinzu, um einen anderen Cloud Storage-Speicherort anzugeben, an dem der Cluster Aggregationslogs für den Zugriff durch den Persistent History Server schreibt.
      --properties=yarn:yarn.nodemanager.remote-app-log-dir=gs://bucket-name/directory-name/yarn-logs
      
    • spark:spark.history.fs.logDirectory und spark:spark.eventLog.dir: Standardmäßig werden Spark-Jobverlaufsdateien im Cluster temp bucket im Verzeichnis /spark-job-history gespeichert. Sie können diese Attribute hinzufügen, um verschiedene Cloud Storage-Speicherorte für diese Dateien anzugeben. Wenn beide Attribute verwendet werden, müssen sie auf Verzeichnisse im selben Bucket verweisen.
      --properties=spark:spark.history.fs.logDirectory=gs://bucket-name/directory-name/spark-job-history
      
      --properties=spark:spark.eventLog.dir=gs://bucket-name/directory-name/spark-job-history
      
    • mapred:mapreduce.jobhistory.done-dir und mapred:mapreduce.jobhistory.intermediate-done-dir: Standardmäßig werden MapReduce-Jobverlaufsdateien im Cluster temp bucket in den Verzeichnissen /mapreduce-job-history/done und /mapreduce-job-history/intermediate-done gespeichert. Der Zwischenspeicherort mapreduce.jobhistory.intermediate-done-dir ist ein temporärer Speicherort. Zwischendateien werden nach Abschluss des MapReduce-Jobs an den Speicherort mapreduce.jobhistory.done-dir verschoben. Sie können diese Attribute hinzufügen, um verschiedene Cloud Storage Speicherorte für diese Dateien anzugeben. Wenn beide Attribute verwendet werden, müssen sie auf Verzeichnisse im selben Bucket verweisen.
      --properties=mapred:mapreduce.jobhistory.done-dir=gs://bucket-name/directory-name/mapreduce-job-history/done
      
      --properties=mapred:mapreduce.jobhistory.intermediate-done-dir=gs://bucket-name/directory-name/mapreduce-job-history/intermediate-done
      
    • spark:spark.history.fs.gs.outputstream.type: Dieses Attribut gilt für 2.0 und 2.1 Image Version-Cluster, die die Cloud Storage-Connector-Version 2.0.x verwenden (die Standard-Connector-Version für 2.0 und 2.1 Image-Version-Cluster). Es steuert, wie Spark-Jobs Daten an Cloud Storage senden. Die Standardeinstellung ist BASIC. Dabei werden Daten nach Abschluss des Jobs an Cloud Storage gesendet. Wenn die Einstellung FLUSHABLE_COMPOSITE festgelegt ist, werden Daten in Cloud Storage in regelmäßigen Abständen kopiert, während der Job ausgeführt wird. Das Intervall wird durch spark:spark.history.fs.gs.outputstream.sync.min.interval.ms festgelegt.
      --properties=spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE
      
    • spark:spark.history.fs.gs.outputstream.sync.min.interval.ms: Dieses Attribut gilt für Cluster mit den Image Versionen 2.0 und 2.1, die die Cloud Storage-Connector-Version 2.0.x verwenden (die Standard-Connector-Version für Cluster mit den Image-Versionen 2.0 und 2.1). Es steuert die Häufigkeit in Millisekunden, mit der Daten an Cloud Storage übertragen werden wenn spark:spark.history.fs.gs.outputstream.type auf FLUSHABLE_COMPOSITEgesetzt ist. Das Standardzeitintervall ist 5000ms. Der Wert für das Zeitintervall in Millisekunden kann mit oder ohne das ms Suffix angegeben werden.
      --properties=spark:spark.history.fs.gs.outputstream.sync.min.interval.ms=INTERVALms
      
    • spark:spark.history.fs.gs.outputstream.sync.min.interval: Dieses Attribut gilt für Cluster mit den Image-Versionen 2.2 und höher, die die Cloud Storage-Connector-Version 3.0.x verwenden (die Standard-Connector-Version für Cluster mit der Image-Version 2.2). Es ersetzt das vorherige spark:spark.history.fs.gs.outputstream.sync.min.interval.ms Attribut und unterstützt Werte mit Zeitsuffixen wie ms, s und m. Es steuert die Häufigkeit, mit der Daten an Cloud Storage übertragen werden, wenn spark:spark.history.fs.gs.outputstream.type auf FLUSHABLE_COMPOSITEgesetzt ist.
      --properties=spark:spark.history.fs.gs.outputstream.sync.min.interval=INTERVAL
      
    • dataproc:yarn.atsv2.bigtable.instance: Nachdem Sie Yarn Timeline Service v2 konfiguriert haben, fügen Sie dieses Attribut hinzu, um YARN-Zeitachsendaten in die angegebene Bigtable-Instanz zu schreiben, damit sie auf den Weboberflächen YARN Application Timeline Service V2 und Tez des PHS-Clusters angezeigt werden können. Hinweis: Die Clustererstellung schlägt fehl, wenn die Bigtable-Instanz nicht vorhanden ist.
      --properties=dataproc:yarn.atsv2.bigtable.instance=projects/project-id/instance_id/bigtable-instance-id
      
    • flink:jobhistory.archive.fs.dir: Der Flink JobManager archiviert abgeschlossene Flink-Jobs, indem er archivierte Jobinformationen in ein Dateisystemverzeichnis hochlädt. Mit diesem Attribut können Sie das Archivverzeichnis in flink-conf.yaml festlegen.
      --properties=flink:jobmanager.archive.fs.dir=gs://bucket-name/job-cluster-1/flink-job-history/completed-jobs
      

PHS mit Spark-Batcharbeitslasten verwenden

So verwenden Sie den Persistent History Server mit Dataproc Serverless für Spark-Batcharbeitslasten:

  1. Erstellen Sie einen PHS-Cluster.

  2. Wählen Sie den PHS-Cluster aus oder geben Sie ihn an, wenn Sie eine Spark-Batcharbeitslast senden.

PHS mit Dataproc in Google Kubernetes Engine verwenden

So verwenden Sie den Persistent History Server mit Dataproc in GKE:

  1. Erstellen Sie einen PHS-Cluster.

  2. Wählen Sie den PHS-Cluster aus oder geben Sie ihn an, wenn Sie einen virtuellen Dataproc in GKE-Cluster erstellen.

Weboberflächen des Component Gateway

Klicken Sie in der Google Cloud console auf der Seite Dataproc Cluster auf den Namen des PHS-Clusters , um die Seite Clusterdetails zu öffnen. Wählen Sie auf dem Tab Weboberflächen die Component Gateway-Links aus, um Weboberflächen zu öffnen, die im PHS-Cluster ausgeführt werden.

Weboberfläche des Spark-Verlaufsservers

Der folgende Screenshot zeigt die Weboberfläche des Spark-Verlaufsservers mit Links zu Spark-Jobs, die auf job-cluster-1 und job-cluster-2 ausgeführt werden, nachdem die Speicherorte spark.history.fs.logDirectory und spark:spark.eventLog.dir der Jobcluster und spark.history.fs.logDirectory des PHS-Clusters wie folgt eingerichtet wurden:

job-cluster-1 gs://example-cloud-storage-bucket/job-cluster-1/spark-job-history
job-cluster-2 gs://example-cloud-storage-bucket/job-cluster-2/spark-job-history
phs-cluster gs://example-cloud-storage-bucket/*/spark-job-history

Sie können Jobs nach Anwendungsname auf der Spark-Verlaufsserver-Weboberfläche auflisten, indem Sie einen Anwendungsnamen in das Suchfeld eingeben. App-Namen können auf eine der folgenden Arten festgelegt werden (nach Priorität aufgelistet):

  1. Wird im Anwendungscode beim Erstellen des Spark-Kontexts festgelegt
  2. Wird vom spark.app.name Attribut festgelegt, wenn der Job gesendet wird
  3. Setzen Sie von Dataproc den vollständigen REST-Ressourcennamen für Job (projects/project-id/regions/region/jobs/job-id).

Nutzer können einen Begriff für einen App- oder Ressourcennamen in das Feld Suchen eingeben , um Jobs zu finden und aufzulisten.

Ereignisprotokolle

Die Weboberfläche des Spark-Verlaufsservers bietet eine Schaltfläche Ereignislog , auf die Sie klicken können, um Spark-Ereignislogs herunterzuladen. Diese Logs sind nützlich, um den Lebenszyklus der Spark-Anwendung zu untersuchen.

Spark-Jobs

Spark-Anwendungen sind in mehrere Jobs unterteilt, die weiter in mehrere Phasen unterteilt werden. Jede Phase kann mehrere Aufgaben enthalten, die auf Executor-Knoten (Workern) ausgeführt werden.

  • Klicken Sie in der Weboberfläche auf eine Spark-Anwendungs-ID, um die Seite „Spark-Jobs“ zu öffnen, die eine Ereigniszeitachse und eine Zusammenfassung der Jobs innerhalb der Anwendung enthält.

  • Klicken Sie auf einen Job, um die Seite „Jobdetails“ mit einem gerichteten azyklischen Graphen (Directed Acyclic Graph, DAG) und einer Zusammenfassung der Jobphasen zu öffnen.

  • Klicken Sie auf eine Phase oder verwenden Sie den Tab „Phasen“, um eine Phase auszuwählen, um die Seite „Phasen Details“ zu öffnen.

    Die Seite „Phasendetails“ enthält eine DAG-Visualisierung, eine Ereigniszeitachse und Messwerte für die Aufgaben innerhalb der Phase. Auf dieser Seite können Sie Probleme mit strangulierten Aufgaben, Planerverzögerungen und Speichermangel beheben. Der DAG-Visualisierer zeigt die Codezeile, aus der die Phase abgeleitet ist, sodass Sie Probleme wieder im Code verfolgen können.

  • Klicken Sie auf den Tab „Executors“, um Informationen zu den Treiber- und Executor-Knoten der Spark-Anwendung zu erhalten.

    Wichtige Informationen auf dieser Seite umfassen die Anzahl der Kerne und die Anzahl der Aufgaben, die auf jedem Executor ausgeführt wurden.

Tez-Weboberfläche

Tez ist die Standard-Ausführungs-Engine für Hive und Pig in Dataproc. Wenn Sie einen Hive-Job in einem Dataproc-Jobcluster senden, wird eine Tez-Anwendung gestartet.

Wenn Sie Yarn Timeline Service v2 konfiguriert und das Attribut dataproc:yarn.atsv2.bigtable.instance beim Erstellen der PHS- und Dataproc-Jobcluster festgelegt haben, schreibt YARN generierte Hive- und Pig-Jobzeitachsendaten in die angegebene Bigtable-Instanz, damit sie auf der Tez-Weboberfläche abgerufen und angezeigt werden können, die auf dem PHS-Server ausgeführt wird.

Weboberfläche von YARN Application Timeline V2

Wenn Sie Yarn Timeline Service v2 konfiguriert und das Attribut dataproc:yarn.atsv2.bigtable.instance beim Erstellen der PHS- und Dataproc-Jobcluster festgelegt haben, schreibt YARN generierte Jobzeitachsendaten in die angegebene Bigtable-Instanz, damit sie abgerufen und auf der Weboberfläche von YARN Application Timeline Service angezeigt werden können, die auf dem PHS-Server ausgeführt wird. Dataproc-Jobs werden auf der Weboberfläche auf dem Tab **Flow Activity** (Ablaufaktivität) aufgeführt.

Yarn Timeline Service v2 konfigurieren

Um Yarn Timeline Service v2 zu konfigurieren, richten Sie eine Bigtable-Instanz ein und prüfen Sie bei Bedarf die Rollen des Dienstkontos:

  1. Bigtable-Instanz erstellen.

  2. Prüfen Sie bei Bedarf die Rollen des Dienstkontos. Das Standard VM-Dienstkonto , das von Dataproc-Cluster-VMs verwendet wird, hat die Berechtigungen, die zum Erstellen und Konfigurieren der Bigtable-Instanz für den YARN Timeline Service erforderlich sind. Wenn Sie Ihren Job- oder PHS-Cluster mit einem benutzerdefinierten VM-Dienstkonto erstellen, muss das Konto entweder die Bigtable-Administrator oder Bigtable User Rolle haben.

Erforderliches Tabellenschema

Für die Dataproc-PHS-Unterstützung für YARN Timeline Service v2 ist ein bestimmtes Schema erforderlich, das in der Bigtable-Instanz erstellt wurde. Dataproc erstellt das erforderliche Schema , wenn ein Jobcluster oder PHS-Cluster mit dem dataproc:yarn.atsv2.bigtable.instance Attribut erstellt wird, das auf die Bigtable-Instanz verweist.

Das erforderliche Bigtable-Instanzschema ist wie folgt:

Tabellen Spaltenfamilien
prod.timelineservice.application c,i,m
prod.timelineservice.app_flow m
prod.timelineservice.entity c,i,m
prod.timelineservice.flowactivity i
prod.timelineservice.flowrun i
prod.timelineservice.subapplication c,i,m

Bigtable-Speicherbereinigung

Sie können die altersbasierte Bigtable-Speicherbereinigung für ATSv2-Tabellen konfigurieren:

  • Installieren Sie cbt, (einschließlich der Erstellung der .cbrtc file).

  • Erstellen Sie die altersbasierte Speicherbereinigungsrichtlinie für ATSv2:

export NUMBER_OF_DAYS = number \
cbt setgcpolicy prod.timelineservice.application c maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.application i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.application m maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.app_flow m maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.entity c maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.entity i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.entity m maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.flowactivity i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.flowrun i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.subapplication c maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.subapplication i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.subapplication m maxage=${NUMBER_OF_DAYS}

Hinweise:

NUMBER_OF_DAYS: Die maximale Anzahl von Tagen ist 30d.