Managed Airflow (terza generazione) | Managed Airflow (seconda generazione) | Managed Airflow (prima generazione legacy)
Questa pagina descrive come utilizzare Managed Airflow (seconda generazione) per eseguire i workload di Managed Service for Apache Spark su Google Cloud.
Gli esempi nelle sezioni seguenti mostrano come utilizzare gli operatori per la gestione dei workload batch di Managed Service for Apache Spark. Questi operatori vengono utilizzati nei DAG che creano, eliminano, elencano e recuperano un workload batch di Managed Service for Apache Spark:
Crea DAG per gli operatori che funzionano con i workload batch di Managed Service for Apache Spark:
Crea DAG che utilizzano container personalizzati e Dataproc Metastore.
Configura il server di cronologia permanente per questi DAG.
Prima di iniziare
Abilita l'API Dataproc:
Console
Abilita l'API Managed Service for Apache Spark.
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo servizi (
roles/serviceusage.serviceUsageAdmin), che contiene l'autorizzazioneserviceusage.services.enable. Scopri come concedere i ruoli.gcloud
Abilita l'API Managed Service for Apache Spark:
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo servizi (
roles/serviceusage.serviceUsageAdmin), che contiene l'serviceusage.services.enableautorizzazione. Scopri come concedere i ruoli.gcloud services enable dataproc.googleapis.com
Seleziona la località del file del workload batch. Puoi utilizzare una delle seguenti opzioni:
- Crea un bucket Cloud Storage che archivia questo file.
- Utilizza il bucket del tuo ambiente. Poiché non devi sincronizzare questo file con Airflow, puoi creare una sottocartella separata al di fuori delle cartelle
/dagso/data. Ad esempio,/batches. - Utilizza un bucket esistente.
Configurare i file e le variabili Airflow
Questa sezione mostra come configurare i file e le variabili Airflow per questo tutorial.
Caricare un file di workload ML di Managed Service for Apache Spark in un bucket
Il workload in questo tutorial esegue uno script PySpark:
Salva qualsiasi script PySpark in un file locale denominato
spark-job.py. Ad esempio, puoi utilizzare lo script PySpark di esempio.Carica il file nella località selezionata in Prima di iniziare.
Impostare le variabili Airflow
Gli esempi nelle sezioni seguenti utilizzano le variabili Airflow. Imposta i valori di queste variabili in Airflow, quindi il codice DAG può accedere a questi valori.
Gli esempi in questo tutorial utilizzano le seguenti variabili Airflow. Puoi impostarle in base alle esigenze, a seconda dell'esempio che utilizzi.
Imposta le seguenti variabili Airflow da utilizzare nel codice DAG:
project_id: ID progetto.bucket_name: URI di un bucket in cui si trova il file Python principale del workload (spark-job.py). Hai selezionato questa località in Prima di iniziare.phs_cluster: nome del cluster del server di cronologia permanente. Imposta questa variabile quando crei un server di cronologia permanente.image_name: nome e tag dell'immagine container personalizzata (image:tag). Imposta questa variabile quando utilizzi l'immagine container personalizzata con DataprocCreateBatchOperator.metastore_cluster: nome del servizio Dataproc Metastore. Imposta questa variabile quando utilizzi il servizio Dataproc Metastore con DataprocCreateBatchOperator.region_name: regione in cui si trova il servizio Dataproc Metastore. Imposta questa variabile quando utilizzi il servizio Dataproc Metastore con DataprocCreateBatchOperator.
Utilizzare la Google Cloud console e l'UI di Airflow per impostare ogni variabile Airflow
Nella Google Cloud console, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul link Airflow per il tuo ambiente. Si apre l'UI di Airflow.
Nell'UI di Airflow, seleziona Amministratore > Variabili.
Fai clic su Aggiungi un nuovo record.
Specifica il nome della variabile nel campo Chiave e imposta il valore nel campo Val.
Fai clic su Salva.
Creare un server di cronologia permanente
Utilizza un server di cronologia permanente (PHS) per visualizzare i file di cronologia Spark dei workload batch:
- Crea un server di cronologia permanente.
- Assicurati di aver specificato il nome del cluster PHS nella
phs_clustervariabile Airflow.
DataprocCreateBatchOperator
Il seguente DAG avvia un workload batch di Managed Service for Apache Spark.
Per ulteriori informazioni sugli argomenti di DataprocCreateBatchOperator, consulta
il codice sorgente dell'operatore.
Per ulteriori informazioni sugli attributi che puoi passare nel batch
parametro di DataprocCreateBatchOperator, consulta la
descrizione della classe Batch.
Utilizzare l'immagine container personalizzata con DataprocCreateBatchOperator
L'esempio seguente mostra come utilizzare un'immagine container personalizzata per eseguire i workload. Puoi utilizzare un container personalizzato, ad esempio, per aggiungere dipendenze Python non fornite dall'immagine container predefinita.
Per utilizzare un'immagine container personalizzata:
Crea un'immagine container personalizzata e caricala in Container Registry.
Specifica l'immagine nella
image_namevariabile Airflow.Utilizza DataprocCreateBatchOperator con l'immagine personalizzata:
Utilizzare il servizio Dataproc Metastore con DataprocCreateBatchOperator
Per utilizzare un servizio Dataproc Metastore da un DAG:
Verifica che il servizio metastore sia già avviato.
Per scoprire come avviare un servizio metastore, consulta Abilitare e disabilitare Dataproc Metastore.
Per informazioni dettagliate sull'operatore Batch per la creazione della configurazione, consulta PeripheralsConfig.
Una volta che il servizio metastore è attivo e in esecuzione, specifica il suo nome in la variabile
metastore_clustere la sua regione nellaregion_namevariabile Airflow.Utilizza il servizio metastore in DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Puoi utilizzare DataprocDeleteBatchOperator per eliminare un batch in base all'ID batch del workload.
DataprocListBatchesOperator
DataprocDeleteBatchOperator elenca i batch esistenti in un determinato project_id e regione.
DataprocGetBatchOperator
DataprocGetBatchOperator recupera un particolare workload batch.