Esegui modelli flessibili in Dataflow

Questa pagina descrive come eseguire un job Dataflow utilizzando un modello Flex. I modelli flessibili consentono di pacchettizzare una pipeline Dataflow in modo da poterla eseguire senza un ambiente di sviluppo Apache Beam.

Autorizzazioni obbligatorie

Quando esegui un modello flessibile, Dataflow crea un job per te. Per creare il job, l'account di servizio Dataflow deve disporre della seguente autorizzazione:

  • dataflow.serviceAgent

Quando utilizzi Dataflow per la prima volta, il servizio assegna questo ruolo per te, quindi non devi concedere questa autorizzazione.

Per impostazione predefinita, il account di servizio Compute Engine viene utilizzato per le VM di avvio e le VM worker. Il account di servizio deve avere i seguenti ruoli e capacità:

  • Storage Object Admin (roles/storage.objectAdmin)
  • Visualizzatore (roles/viewer)
  • Dataflow Worker (roles/dataflow.worker)
  • Accesso in lettura e scrittura al bucket di staging
  • Accesso in lettura all'immagine del modello flessibile

Per concedere l'accesso in lettura e scrittura al bucket di staging, puoi utilizzare il ruolo Storage Object Admin (roles/storage.objectAdmin). Per saperne di più, consulta Ruoli IAM per Cloud Storage.

Per concedere l'accesso in lettura all'immagine del modello flessibile, puoi utilizzare il ruolo Storage Object Viewer (roles/storage.objectViewer). Per saperne di più, consulta Configurazione del controllo dell'accesso.

Esegui un modello flessibile

Per eseguire un modello flessibile, utilizza il comando gcloud dataflow flex-template run:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \
  --region REGION \
  --staging-location STAGING_LOCATION \
  --temp-location TEMP_LOCATION \
  --parameters  PARAMETERS \
  --additional-user-labels LABELS \

Sostituisci quanto segue:

  • JOB_ID: l'ID del job

  • TEMPLATE_FILE_LOCATION: la posizione del file modello in Cloud Storage

  • REGION: la regione in cui eseguire il job Dataflow

  • STAGING_LOCATION: il percorso di Cloud Storage per preparare i file locali

  • TEMP_LOCATION: il percorso di Cloud Storage in cui scrivere i file temporanei. Se non viene impostato, il valore predefinito è la posizione di staging.

  • PARAMETERS: parametri della pipeline per il job

  • LABELS: (Facoltativo) Etichette associate al job, utilizzando il formato KEY_1=VALUE_1,KEY_2=VALUE_2,....

Durante il passaggio di gestione temporanea del lancio di un modello, Dataflow scrive i file nella posizione di gestione temporanea. Dataflow legge questi file di staging per creare il grafico del job. Durante il passaggio di esecuzione, Dataflow scrive i file nella posizione temporanea.

Impostare le opzioni pipeline

Per impostare le opzioni della pipeline quando esegui un modello flessibile, utilizza i seguenti flag nel comando gcloud dataflow flex-template run:

gcloud

Quando passi parametri di tipo List o Map, potrebbe essere necessario definire i parametri in un file YAML e utilizzare il flag flags-file.

API

L'esempio seguente mostra come includere opzioni della pipeline, esperimenti e opzioni aggiuntive in un corpo della richiesta:

{
  "jobName": "my-flex-template-job",
  "parameters": {
    "option_defined_in_metadata": "value"
  },
  "environment": {
    "additionalExperiments": [
      "use_runner_v2"
    ],
    "additionalPipelineOptions": {
      "common_pipeline_option": "value"
    }
  }
}

Quando utilizzi i modelli flessibili, puoi configurare alcune opzioni della pipeline durante l'inizializzazione, ma altre non possono essere modificate. Se gli argomenti della riga di comando richiesti dal modello flessibile vengono sovrascritti, il job potrebbe ignorare, sostituire o eliminare le opzioni della pipeline trasmesse dal launcher del modello. Il job potrebbe non essere avviato oppure potrebbe essere avviato un job che non utilizza il modello flessibile. Per saperne di più, vedi Impossibile leggere il file del job.

Durante l'inizializzazione della pipeline, non modificare le seguenti opzioni della pipeline:

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Vai

  • runner
  • project
  • job_name
  • template_location
  • region

Blocca le chiavi SSH del progetto dalle VM che utilizzano chiavi SSH basate su metadati

Puoi impedire alle VM di accettare le chiavi SSH archiviate nei metadati del progetto bloccando le chiavi SSH del progetto dalle VM. Utilizza il flag additional-experiments con l'opzione di servizio block_project_ssh_keys:

--additional-experiments=block_project_ssh_keys

Per ulteriori informazioni, consulta Opzioni del servizio Dataflow.

Aggiornare un job modello flessibile

La seguente richiesta di esempio mostra come aggiornare un job di streaming del modello utilizzando il metodo projects.locations.flexTemplates.launch. Se vuoi utilizzare gcloud CLI, vedi Aggiornare una pipeline esistente.

Se vuoi aggiornare un modello classico, utilizza projects.locations.templates.launch in alternativa.

  1. Segui i passaggi per creare un job di streaming da un modello flessibile. Invia la seguente richiesta HTTP POST con i valori modificati:

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci REGION con la regione Dataflow del job che stai aggiornando.
    • Sostituisci JOB_NAME con il nome esatto del job che vuoi aggiornare.
    • Imposta parameters sull'elenco di coppie chiave-valore. I parametri elencati sono specifici di questo esempio di modello. Se utilizzi un modello personalizzato, modifica i parametri in base alle esigenze. Se utilizzi il modello di esempio, sostituisci le seguenti variabili.
      • Sostituisci SUBSCRIPTION_NAME con il nome dell'abbonamento Pub/Sub.
      • Sostituisci DATASET con il nome del tuo set di dati BigQuery.
      • Sostituisci TABLE_NAME con il nome della tabella BigQuery.
    • Sostituisci STORAGE_PATH con la posizione Cloud Storage del file modello. La posizione deve iniziare con gs://.
  2. Utilizza il parametro environment per modificare le impostazioni dell'ambiente. Per ulteriori informazioni, consulta FlexTemplateRuntimeEnvironment.

  3. (Facoltativo) Per inviare la richiesta utilizzando curl (Linux, macOS o Cloud Shell), salva la richiesta in un file JSON, quindi esegui il seguente comando:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    Sostituisci FILE_PATH con il percorso del file JSON contenente il corpo della richiesta.

  4. Utilizza l'interfaccia di monitoraggio di Dataflow per verificare che sia stato creato un nuovo job con lo stesso nome. Questo lavoro ha lo stato Aggiornato.

Passaggi successivi