Componente Flink facoltativo di Managed Service per Apache Spark

Puoi attivare componenti aggiuntivi come Flink quando crei un cluster Managed Service for Apache Spark utilizzando la funzionalità Componenti facoltativi. Questa pagina mostra come creare un cluster Managed Service for Apache Spark con il componente facoltativo Apache Flink attivato (un cluster Flink), quindi eseguire i job Flink sul cluster.

Puoi utilizzare il tuo cluster Flink per:

  1. Esegui job Flink utilizzando la risorsa JobsManaged Service per Apache Spark dalla console Google Cloud , da Google Cloud CLI o dall'API Managed Service per Apache Spark.

  2. Esegui job Flink utilizzando l'interfaccia a riga di comando flink in esecuzione sul nodo master del cluster Flink.

  3. Esegui job Apache Beam su Flink

  4. Esegui Flink su un cluster Kerberizzato

Puoi utilizzare la console Google Cloud , Google Cloud CLI o l'API Managed Service for Apache Spark per creare un cluster Managed Service for Apache Spark con il componente Flink attivato sul cluster.

Suggerimento:utilizza un cluster di VM standard con un master con il componente Flink. I cluster in modalità di alta affidabilità di Managed Service per Apache Spark (con 3 VM master) non supportano la modalità di alta affidabilità di Flink.

Puoi eseguire job Flink utilizzando la risorsa Managed Service for Apache Spark Jobs dalla consoleGoogle Cloud , da Google Cloud CLI o dall'API Managed Service for Apache Spark.

Console

Per inviare un job di conteggio delle parole Flink di esempio dalla console:

  1. Apri la pagina Invia un job di Managed Service per Apache Spark nella consoleGoogle Cloud nel browser.

  2. Compila i campi nella pagina Invia un job:

    1. Seleziona il nome del tuo cluster dall'elenco dei cluster.
    2. Imposta Tipo di prestazione su Flink.
    3. Imposta Classe principale o jar su org.apache.flink.examples.java.wordcount.WordCount.
    4. Imposta File jar su file:///usr/lib/flink/examples/batch/WordCount.jar.
      • file:/// indica un file che si trova sul cluster. Managed Service per Apache Spark ha installato WordCount.jar durante la creazione del cluster Flink.
      • Questo campo accetta anche un percorso Cloud Storage (gs://BUCKET/JARFILE) o un percorso Hadoop Distributed File System (HDFS) (hdfs://PATH_TO_JAR).
  3. Fai clic su Invia.

    • L'output del driver del job viene visualizzato nella pagina Dettagli job.
    • I job Flink sono elencati nella pagina Job di Managed Service per Apache Spark nella Google Cloud console.
    • Fai clic su Interrompi o Elimina dalla pagina Job o Dettagli del job per interrompere o eliminare un job.

gcloud

Per inviare un job Flink a un cluster Managed Service for Apache Spark Flink, esegui il comando gcloud CLI gcloud dataproc jobs submit localmente in una finestra del terminale o in Cloud Shell.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

Note:

  • CLUSTER_NAME: specifica il nome del cluster Managed Service per Apache Spark Flink a cui inviare il job.
  • REGION: specifica una regione Compute Engine in cui si trova il cluster.
  • MAIN_CLASS: specifica la classe main della tua applicazione Flink, ad esempio:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: specifica il file JAR dell'applicazione Flink. Puoi specificare:
    • Un file jar installato sul cluster, utilizzando il prefisso file:///` :
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Un file JAR in Cloud Storage: gs://BUCKET/JARFILE
    • Un file jar in HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: (facoltativo) aggiungi gli argomenti del job dopo il doppio trattino (--).

  • Dopo aver inviato il job, l'output del driver del job viene visualizzato nel terminale locale o di Cloud Shell.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

Questa sezione mostra come inviare un job Flink a un cluster Managed Service for Apache Spark Flink utilizzando l'API jobs.submit di Managed Service for Apache Spark.

Prima di utilizzare i dati della richiesta, apporta le sostituzioni seguenti:

  • PROJECT_ID: Google Cloud ID progetto
  • REGION: regione del cluster
  • CLUSTER_NAME: specifica il nome del cluster Managed Service per Apache Spark Flink a cui inviare il job

Metodo HTTP e URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

Corpo JSON della richiesta:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

Per inviare la richiesta, espandi una di queste opzioni:

Dovresti ricevere una risposta JSON simile alla seguente:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • I job Flink sono elencati nella pagina Job di Managed Service per Apache Spark nella Google Cloud console.
  • Puoi fare clic su Interrompi o Elimina dalla pagina Job o Dettagli job nella Google Cloud console per interrompere o eliminare un job.

Anziché eseguire job Flink utilizzando la risorsa Managed Service per Apache Spark Jobs, puoi eseguire job Flink sul nodo master del cluster Flink utilizzando la CLI flink.

Le sezioni seguenti descrivono diversi modi per eseguire un job dell'interfaccia a riga di comando flink sul cluster Managed Service per Apache Spark Flink.

  1. Accedi tramite SSH al nodo master:utilizza l'utilità SSH per aprire una finestra del terminale sulla VM master del cluster.

  2. Imposta il classpath:inizializza il classpath di Hadoop dalla finestra del terminale SSH nella VM master del cluster Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Esegui job Flink:puoi eseguire job Flink in diverse modalità di deployment su YARN: modalità applicazione, per job e sessione.

    1. Modalità applicazione:la modalità applicazione di Flink è supportata da Managed Service per Apache Spark versione 2.0 e successive. Questa modalità esegue il metodo main() del job su YARN Job Manager. Il cluster si arresta al termine del job.

      Esempio di invio del job:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      Elenca i job in esecuzione:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      Annulla un job in esecuzione:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. Modalità per job:questa modalità Flink esegue il metodo main() del job sul lato client.

      Esempio di invio del job:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. Modalità sessione:avvia una sessione Flink YARN a esecuzione prolungata, quindi invia uno o più job alla sessione.

      1. Avvia una sessione:puoi avviare una sessione Flink in uno dei seguenti modi:

        1. Crea un cluster Flink aggiungendo il flag --metadata flink-start-yarn-session=true al comando gcloud dataproc clusters create (vedi Creare un cluster Dataproc Flink). Se questo flag è abilitato, dopo la creazione del cluster, Managed Service for Apache Spark esegue /usr/bin/flink-yarn-daemon per avviare una sessione Flink sul cluster.

          L'ID applicazione YARN della sessione viene salvato in /tmp/.yarn-properties-${USER}. Puoi elencare l'ID con il comando yarn application -list.

        2. Esegui lo script Flink yarn-session.sh, preinstallato sulla VM master del cluster, con impostazioni personalizzate:

          Esempio con impostazioni personalizzate:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Esegui lo script wrapper /usr/bin/flink-yarn-daemon di Flink con le impostazioni predefinite:

          . /usr/bin/flink-yarn-daemon
          
      2. Invia un job a una sessione: esegui questo comando per inviare un job Flink alla sessione.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: l'URL, inclusi host e porta, della VM master Flink in cui vengono eseguiti i job. Rimuovi http:// prefix dall'URL. Questo URL è elencato nell'output del comando quando avvii una sessione di Flink. Puoi eseguire questo comando per elencare questo URL nel campo Tracking-URL:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. Elencare i job in una sessione:per elencare i job Flink in una sessione, esegui una delle seguenti operazioni:

        • Esegui flink list senza argomenti. Il comando cerca l'ID applicazione YARN della sessione in /tmp/.yarn-properties-${USER}.

        • Ottieni l'ID applicazione YARN della sessione da /tmp/.yarn-properties-${USER} o dall'output di yarn application -list, quindi esegui <code>flink list -yid YARN_APPLICATION_ID.

        • Esegui flink list -m FLINK_MASTER_URL.

      4. Arresta una sessione: per arrestare la sessione, ottieni l'ID applicazione YARN della sessione da /tmp/.yarn-properties-${USER} o dall'output di yarn application -list, quindi esegui uno dei seguenti comandi:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

Puoi eseguire i job Apache Beam su Managed Service per Apache Spark utilizzando FlinkRunner.

Puoi eseguire i job Beam su Flink nei seguenti modi:

  1. Job Java Beam
  2. Job Portable Beam

Job Java Beam

Pacchettizza i job Beam in un file JAR. Fornisci il file JAR in bundle con le dipendenze necessarie per eseguire il job.

L'esempio seguente esegue un job Java Beam dal nodo master del cluster Managed Service per Apache Spark.

  1. Crea un cluster Managed Service per Apache Spark con il componente Flink abilitato.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: la versione dell'immagine del cluster, che determina la versione di Flink installata sul cluster (ad esempio, vedi le versioni dei componenti Apache Flink elencate per le quattro versioni di rilascio dell'immagine 2.0.x più recenti e precedenti).
    • --region: una regione Managed Service per Apache Spark supportata.
    • --enable-component-gateway: attiva l'accesso all'interfaccia utente di Flink Job Manager.
    • --scopes: attiva l'accesso alle Google Cloud API da parte del cluster (vedi Best practice per gli ambiti). L'ambito cloud-platform è attivato per impostazione predefinita (non è necessario includere questa impostazione del flag) quando crei un cluster che utilizza la versione immagine 2.1 o successive di Managed Service per Apache Spark.
  2. Utilizza l'utilità SSH per aprire una finestra del terminale sul nodo master del cluster Flink.

  3. Avvia una sessione Flink YARN sul nodo master del cluster Managed Service per Apache Spark.

    . /usr/bin/flink-yarn-daemon
    

    Prendi nota della versione di Flink sul tuo cluster Managed Service per Apache Spark.

    flink --version
    
  4. Sulla tua macchina locale, genera l'esempio canonico di conteggio delle parole di Beam in Java.

    Scegli una versione di Beam compatibile con la versione di Flink sul tuo cluster Managed Service per Apache Spark. Consulta la tabella Compatibilità delle versioni di Flink che elenca la compatibilità delle versioni di Beam-Flink.

    Apri il file POM generato. Controlla la versione del runner Beam Flink specificata dal tag <flink.artifact.name>. Se la versione di Beam Flink Runner nel nome dell'artefatto Flink non corrisponde alla versione di Flink sul cluster, aggiorna il numero di versione in modo che corrisponda.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Pacchettizza l'esempio di conteggio delle parole.

    mvn package -Pflink-runner
    
  6. Carica il file JAR uber compresso, word-count-beam-bundled-0.1.jar (~135 MB) sul nodo master del cluster Managed Service per Apache Spark. Puoi utilizzare gcloud storage cp per trasferire più rapidamente i file al cluster Managed Service per Apache Spark da Cloud Storage.

    1. Nel terminale locale, crea un bucket Cloud Storage e carica l'uber JAR.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Sul nodo master di Managed Service per Apache Spark, scarica l'uber JAR.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Esegui il job Java Beam sul nodo master del cluster Managed Service for Apache Spark.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Verifica che i risultati siano stati scritti nel bucket Cloud Storage.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Arresta la sessione Flink YARN.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Portable Beam Jobs

Per eseguire job Beam scritti in Python, Go e altri linguaggi supportati, puoi utilizzare FlinkRunner e PortableRunner come descritto nella pagina Flink Runner di Beam (vedi anche Roadmap del framework di portabilità).

L'esempio seguente esegue un job Beam portatile in Python dal nodo master del cluster Managed Service per Apache Spark.

  1. Crea un cluster Managed Service per Apache Spark con i componenti Flink e Docker abilitati.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Note:

    • --optional-components: Flink e Docker.
    • --image-version: la versione dell'immagine del cluster, che determina la versione di Flink installata sul cluster (ad esempio, vedi le versioni dei componenti Apache Flink elencate per le quattro versioni più recenti e precedenti della release dell'immagine 2.0.x).
    • --region: una regione di Managed Service per Apache Spark disponibile.
    • --enable-component-gateway: attiva l'accesso all'interfaccia utente di Flink Job Manager.
    • --scopes: attiva l'accesso alle API da parte del cluster (vedi Best practice per gli ambiti). Google Cloud L'ambito cloud-platform è attivato per impostazione predefinita (non è necessario includere questa impostazione del flag) quando crei un cluster che utilizza la versione immagine 2.1 o successive di Managed Service per Apache Spark.
  2. Utilizza gcloud CLI localmente o in Cloud Shell per creare un bucket Cloud Storage. Specificherai BUCKET_NAME quando esegui un programma di conteggio delle parole di esempio.

    gcloud storage buckets create BUCKET_NAME
    
  3. In una finestra del terminale della VM del cluster, avvia una sessione Flink YARN. Prendi nota dell'URL master di Flink, l'indirizzo del master di Flink in cui vengono eseguiti i job. Specificherai il FLINK_MASTER_URL quando esegui un programma di conteggio delle parole di esempio.

    . /usr/bin/flink-yarn-daemon
    

    Visualizza e annotati la versione di Flink in esecuzione nel cluster Managed Service per Apache Spark. Specificherai il FLINK_VERSION quando esegui un programma di conteggio delle parole di esempio.

    flink --version
    
  4. Installa le librerie Python necessarie per il job sul nodo master del cluster.

  5. Installa una versione di Beam compatibile con la versione di Flink sul cluster.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Esegui l'esempio di conteggio delle parole sul nodo master del cluster.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Note:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, come indicato in precedenza.
    • --flink_master: FLINK_MASTER_URL, come indicato in precedenza.
    • --flink_submit_uber_jar: utilizza l'uber JAR per eseguire il job Beam.
    • --output: BUCKET_NAME, creato in precedenza.
  7. Verifica che i risultati siano stati scritti nel bucket.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Arresta la sessione Flink YARN.

    1. Recupera l'ID applicazione.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Il componente Managed Service per Apache Spark Flink supporta i cluster Kerberized. Per inviare e rendere persistente un job Flink o per avviare un cluster Flink è necessario un ticket Kerberos valido. Per impostazione predefinita, un ticket Kerberos rimane valido per sette giorni.

L'interfaccia web di Flink Job Manager è disponibile mentre è in esecuzione un job Flink o un cluster di sessione Flink. Per utilizzare l'interfaccia web:

  1. Crea un cluster Managed Service per Apache Spark Flink.
  2. Dopo la creazione del cluster, fai clic sul link al gateway dei componenti YARN ResourceManager nella scheda Interfaccia web della pagina Dettagli cluster nella console Google Cloud .
  3. Nell'interfaccia utente di YARN Resource Manager, identifica la voce dell'applicazione del cluster Flink. A seconda dello stato di completamento di un job, verrà visualizzato un link ApplicationMaster o History.
  4. Per un job di streaming a esecuzione prolungata, fai clic sul link ApplicationManager per aprire la dashboard di Flink; per un job completato, fai clic sul link Cronologia per visualizzare i dettagli del job.