Questa pagina descrive come eseguire un job Dataflow utilizzando un modello Flex. I modelli flessibili consentono di pacchettizzare una pipeline Dataflow in modo da poterla eseguire senza un ambiente di sviluppo Apache Beam.
Autorizzazioni obbligatorie
Quando esegui un modello flessibile, Dataflow crea un job per te. Per creare il job, l'account di servizio Dataflow deve disporre della seguente autorizzazione:
dataflow.serviceAgent
Quando utilizzi Dataflow per la prima volta, il servizio assegna questo ruolo per te, quindi non devi concedere questa autorizzazione.
Per impostazione predefinita, il account di servizio Compute Engine viene utilizzato per le VM di avvio e le VM worker. Il account di servizio deve avere i seguenti ruoli e capacità:
- Storage Object Admin (
roles/storage.objectAdmin) - Visualizzatore (
roles/viewer) - Dataflow Worker (
roles/dataflow.worker) - Accesso in lettura e scrittura al bucket di staging
- Accesso in lettura all'immagine del modello flessibile
Per concedere l'accesso in lettura e scrittura al bucket di staging, puoi utilizzare il ruolo Storage Object Admin (roles/storage.objectAdmin). Per saperne di più, consulta Ruoli IAM per Cloud Storage.
Per concedere l'accesso in lettura all'immagine del modello flessibile, puoi utilizzare il ruolo Storage Object Viewer (roles/storage.objectViewer). Per saperne di più, consulta Configurazione del controllo dell'accesso.
Esegui un modello flessibile
Per eseguire un modello flessibile, utilizza il comando gcloud dataflow flex-template run:
gcloud dataflow flex-template run JOB_ID \ --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \ --region REGION \ --staging-location STAGING_LOCATION \ --temp-location TEMP_LOCATION \ --parameters PARAMETERS \ --additional-user-labels LABELS \
Sostituisci quanto segue:
JOB_ID: l'ID del jobTEMPLATE_FILE_LOCATION: la posizione del file modello in Cloud StorageREGION: la regione in cui eseguire il job DataflowSTAGING_LOCATION: il percorso di Cloud Storage per preparare i file localiTEMP_LOCATION: il percorso di Cloud Storage in cui scrivere i file temporanei. Se non viene impostato, il valore predefinito è la posizione di staging.PARAMETERS: parametri della pipeline per il jobLABELS: (Facoltativo) Etichette associate al job, utilizzando il formatoKEY_1=VALUE_1,KEY_2=VALUE_2,....
Durante il passaggio di gestione temporanea del lancio di un modello, Dataflow scrive i file nella posizione di gestione temporanea. Dataflow legge questi file di staging per creare il grafico del job. Durante il passaggio di esecuzione, Dataflow scrive i file nella posizione temporanea.
Impostare le opzioni pipeline
Per impostare le opzioni della pipeline quando esegui un modello flessibile, utilizza i seguenti flag
nel comando gcloud dataflow flex-template run:
parameters: utilizza questo flag per impostare i seguenti tipi di opzione pipeline:Opzioni pipeline supportate dai modelli flessibili. Per un elenco delle opzioni supportate dai modelli flessibili, consulta Opzioni della pipeline.
Opzioni della pipeline dichiarate nei metadati del modello.
additional-pipeline-options: Utilizza questo flag per impostare altre opzioni della pipeline Apache Beam non supportate direttamente dai modelli flessibili.additional-experiments: utilizza questo flag per impostare opzioni della pipeline sperimentali (equivalente all'opzioneexperiments).
gcloud
Includi le opzioni della pipeline utilizzando il flag
parameters.Includi esperimenti di runtime e opzioni della pipeline utilizzando i flag
additional-experimentseadditional-pipeline-options.
Quando passi parametri di tipo List o Map, potrebbe essere necessario definire i parametri in un file YAML e utilizzare il flag flags-file.
API
Includi le opzioni della pipeline utilizzando il campo
parameters.Includi esperimenti di runtime e opzioni della pipeline utilizzando i campi
additionalExperimentseadditionalPipelineOptions.
L'esempio seguente mostra come includere opzioni della pipeline, esperimenti e opzioni aggiuntive in un corpo della richiesta:
{
"jobName": "my-flex-template-job",
"parameters": {
"option_defined_in_metadata": "value"
},
"environment": {
"additionalExperiments": [
"use_runner_v2"
],
"additionalPipelineOptions": {
"common_pipeline_option": "value"
}
}
}
Quando utilizzi i modelli flessibili, puoi configurare alcune opzioni della pipeline durante l'inizializzazione, ma altre non possono essere modificate. Se gli argomenti della riga di comando richiesti dal modello flessibile vengono sovrascritti, il job potrebbe ignorare, sostituire o eliminare le opzioni della pipeline trasmesse dal launcher del modello. Il job potrebbe non essere avviato oppure potrebbe essere avviato un job che non utilizza il modello flessibile. Per saperne di più, vedi Impossibile leggere il file del job.
Durante l'inizializzazione della pipeline, non modificare le seguenti opzioni della pipeline:
Java
runnerprojectjobNametemplateLocationregion
Python
runnerprojectjob_nametemplate_locationregion
Vai
runnerprojectjob_nametemplate_locationregion
Blocca le chiavi SSH del progetto dalle VM che utilizzano chiavi SSH basate su metadati
Puoi impedire alle VM di accettare le chiavi SSH archiviate nei metadati del progetto
bloccando le chiavi SSH del progetto dalle VM. Utilizza il flag additional-experiments con l'opzione di servizio block_project_ssh_keys:
--additional-experiments=block_project_ssh_keys
Per ulteriori informazioni, consulta Opzioni del servizio Dataflow.
Aggiornare un job modello flessibile
La seguente richiesta di esempio mostra come aggiornare un job di streaming del modello utilizzando il metodo projects.locations.flexTemplates.launch. Se vuoi utilizzare gcloud CLI, vedi Aggiornare una pipeline esistente.
Se vuoi aggiornare un modello classico, utilizza projects.locations.templates.launch in alternativa.
Segui i passaggi per creare un job di streaming da un modello flessibile. Invia la seguente richiesta HTTP POST con i valori modificati:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch { "launchParameter": { "update": true "jobName": "JOB_NAME", "parameters": { "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "output_table": "PROJECT_ID:DATASET.TABLE_NAME" }, "containerSpecGcsPath": "STORAGE_PATH" }, }- Sostituisci
PROJECT_IDcon l'ID progetto. - Sostituisci
REGIONcon la regione Dataflow del job che stai aggiornando. - Sostituisci
JOB_NAMEcon il nome esatto del job che vuoi aggiornare. - Imposta
parameterssull'elenco di coppie chiave-valore. I parametri elencati sono specifici di questo esempio di modello. Se utilizzi un modello personalizzato, modifica i parametri in base alle esigenze. Se utilizzi il modello di esempio, sostituisci le seguenti variabili.- Sostituisci
SUBSCRIPTION_NAMEcon il nome dell'abbonamento Pub/Sub. - Sostituisci
DATASETcon il nome del tuo set di dati BigQuery. - Sostituisci
TABLE_NAMEcon il nome della tabella BigQuery.
- Sostituisci
- Sostituisci
STORAGE_PATHcon la posizione Cloud Storage del file modello. La posizione deve iniziare congs://.
- Sostituisci
Utilizza il parametro
environmentper modificare le impostazioni dell'ambiente. Per ulteriori informazioni, consultaFlexTemplateRuntimeEnvironment.(Facoltativo) Per inviare la richiesta utilizzando curl (Linux, macOS o Cloud Shell), salva la richiesta in un file JSON, quindi esegui il seguente comando:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launchSostituisci FILE_PATH con il percorso del file JSON contenente il corpo della richiesta.
Utilizza l'interfaccia di monitoraggio di Dataflow per verificare che sia stato creato un nuovo job con lo stesso nome. Questo lavoro ha lo stato Aggiornato.
Passaggi successivi
- Scopri come creare un modello flessibile per la pipeline Apache Beam.
- Per saperne di più sui modelli classici, sui modelli flessibili e sui relativi scenari di utilizzo, consulta Modelli Dataflow.
- Per informazioni sulla risoluzione dei problemi relativi ai modelli flessibili, consulta Risolvere i problemi di timeout dei modelli flessibili.
- Per ulteriori architetture di riferimento, diagrammi e best practice, esplora il Cloud Architecture Center.