Esegui un DAG di Apache Airflow in Managed Airflow (terza generazione) (Google Cloud CLI)

Managed Airflow (terza generazione) | Managed Airflow (seconda generazione) | Managed Airflow (prima generazione legacy)

Questa guida rapida mostra come creare un ambiente Managed Service for Apache Airflow ed eseguire un DAG di Apache Airflow in Managed Airflow (terza generazione).

Prima di iniziare

  1. Accedi al tuo Google Cloud account. Se non hai mai utilizzato Google Cloud, crea un account per valutare il rendimento dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. Installa Google Cloud CLI.

  3. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  4. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  5. Crea o seleziona un Google Cloud progetto.

    Ruoli richiesti per selezionare o creare un progetto

    • Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto su cui ti è stato concesso un ruolo.
    • Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto (roles/resourcemanager.projectCreator), che contiene l' resourcemanager.projects.create autorizzazione. Scopri come concedere i ruoli.
    • Crea un Google Cloud progetto:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il Google Cloud progetto che stai creando.

    • Seleziona il Google Cloud progetto che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del Google Cloud progetto.

  6. Verifica che la fatturazione sia abilitata per il tuo Google Cloud progetto.

  7. Installa Google Cloud CLI.

  8. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  9. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  10. Crea o seleziona un Google Cloud progetto.

    Ruoli richiesti per selezionare o creare un progetto

    • Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto su cui ti è stato concesso un ruolo.
    • Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto (roles/resourcemanager.projectCreator), che contiene l' resourcemanager.projects.create autorizzazione. Scopri come concedere i ruoli.
    • Crea un Google Cloud progetto:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il Google Cloud progetto che stai creando.

    • Seleziona il Google Cloud progetto che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del Google Cloud progetto.

  11. Verifica che la fatturazione sia abilitata per il tuo Google Cloud progetto.

  12. Abilita l'API Managed Airflow:

    Ruoli richiesti per abilitare le API

    Per abilitare le API, devi disporre del ruolo IAM Service Usage Admin (roles/serviceusage.serviceUsageAdmin), che contiene l' serviceusage.services.enable autorizzazione. Scopri come concedere i ruoli.

    gcloud services enable composer.googleapis.com
  13. Per ottenere le autorizzazioni necessarie per completare questa guida rapida, chiedi all'amministratore di concederti i seguenti ruoli IAM sul tuo progetto:

    Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

    Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Crea il account di servizio di un ambiente

Quando crei un ambiente, devi specificare un account di servizio. Questo service account è chiamato service account dell'ambiente. L'ambiente utilizza questo account di servizio per eseguire la maggior parte delle operazioni.

Il account di servizio per l'ambiente non è un account utente. Un account di servizio è un particolare tipo di account utilizzato da un'applicazione o da un'istanza di macchina virtuale (VM), non da una persona.

Per creare un account di servizio per l'ambiente:

  1. Crea un nuovo service account, come descritto in la documentazione di Identity and Access Management.

  2. Concedi un ruolo, come descritto nella documentazione di Identity and Access Management. Il ruolo richiesto è Worker Composer (composer.worker).

Crea un ambiente

Crea un nuovo ambiente denominato example-environment nella regione us-central1, con l'ultima versione di Managed Airflow (terza generazione) .

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-3-airflow-2.11.1-build.3 \
    --service-account ENVIRONMENT_SERVICE_ACCOUNT

Sostituisci ENVIRONMENT_SERVICE_ACCOUNT con il account di servizio per il tuo ambiente che hai creato in precedenza.

Crea un file DAG

Un DAG di Airflow è una raccolta di attività organizzate che vuoi pianificare ed eseguire. I DAG sono definiti in file Python standard.

Questa guida utilizza un DAG di Airflow di esempio definito nel file quickstart.py. Il codice Python in questo file esegue le seguenti operazioni:

  1. Crea un DAG, composer_sample_dag. Questo DAG viene eseguito ogni giorno.
  2. Esegue un'attività, print_dag_run_conf. L'attività stampa la configurazione dell'esecuzione del DAG utilizzando l'operatore bash.

Salva una copia del file quickstart.py sulla macchina locale:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Carica il file DAG nel bucket dell'ambiente

Ogni ambiente Managed Airflow ha un bucket Cloud Storage associato. Airflow in Managed Airflow pianifica solo i DAG che si trovano nella cartella /dags di questo bucket.

Per pianificare il DAG, carica quickstart.py dalla macchina locale alla cartella /dags dell'ambiente:

Per caricare quickstart.py con Google Cloud CLI, esegui il seguente comando nella cartella in cui si trova il file quickstart.py:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

Visualizza il DAG

Dopo aver caricato il file DAG, Airflow esegue le seguenti operazioni:

  1. Analizza il file DAG che hai caricato. Potrebbero essere necessari alcuni minuti prima che il DAG diventi disponibile per Airflow.
  2. Aggiunge il DAG all'elenco dei DAG disponibili.
  3. Esegue il DAG in base alla pianificazione fornita nel file DAG.

Verifica che il DAG venga elaborato senza errori e che sia disponibile in Airflow visualizzandolo nell'interfaccia utente DAG. L'interfaccia utente DAG è l'interfaccia di Managed Airflow per la visualizzazione di informazioni sui DAG nella Google Cloud console. Managed Airflow fornisce anche l'accesso all'interfaccia utente di Airflow, che è un'interfaccia web di Airflow nativa.

  1. Attendi circa cinque minuti per consentire ad Airflow di elaborare il file DAG che hai caricato in precedenza e di completare la prima esecuzione del DAG (spiegata più avanti).

  2. Esegui il seguente comando in Google Cloud CLI. Questo comando esegue il dags list comando Airflow CLI che elenca i DAG nel tuo ambiente.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Verifica che il DAG composer_quickstart sia elencato nell'output del comando.

    Output di esempio:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Visualizza i dettagli dell'esecuzione del DAG

Una singola esecuzione di un DAG è chiamata esecuzione del DAG. Airflow esegue immediatamente un'esecuzione del DAG di esempio perché la data di inizio nel file DAG è impostata su ieri. In questo modo, Airflow recupera la pianificazione del DAG specificato.

Il DAG di esempio contiene un'attività, print_dag_run_conf, che esegue il comando echo nella console. Questo comando restituisce i metadati del DAG (l'identificatore numerico dell'esecuzione del DAG).

Esegui il seguente comando in Google Cloud CLI. Questo comando elenca le esecuzioni del DAG composer_quickstart:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Output di esempio:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI non fornisce un comando per visualizzare i log delle attività. Puoi utilizzare altri metodi per visualizzare i log delle attività di Airflow: l'interfaccia utente DAG di Managed Airflow, l'interfaccia utente di Airflow o Cloud Logging. Questa guida mostra un modo per eseguire query su Cloud Logging per i log di un'esecuzione specifica del DAG.

Esegui il seguente comando in Google Cloud CLI. Questo comando legge i log di Cloud Logging per un'esecuzione specifica del DAG composer_quickstart. L'argomento --format formatta l'output in modo che venga visualizzato solo il testo del messaggio di log.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Sostituisci:

  • RUN_ID con il valore run_id dell'output del comando tasks states-for-dag-run che hai eseguito in precedenza. Ad esempio, 2024-02-17T15:38:38.969307+00:00.

Output di esempio:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Libera spazio

Per evitare che al tuo Google Cloud account vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il Google Cloud progetto con le risorse.

Elimina le risorse utilizzate in questo tutorial:

  1. Elimina l'ambiente Managed Airflow:

    1. Nella Google Cloud console, vai alla pagina Ambienti.

      Vai ad Ambienti

    2. Seleziona example-environment e fai clic su Elimina.

    3. Attendi l'eliminazione dell'ambiente.

  2. Elimina il bucket dell'ambiente. L'eliminazione dell'ambiente Managed Airflow non comporta l'eliminazione del relativo bucket.

    1. Nella Google Cloud console, vai alla pagina Storage > Browser.

      Vai a Storage > Browser

    2. Seleziona il bucket dell'ambiente e fai clic su Elimina. Ad esempio, questo bucket può essere denominato us-central1-example-environ-c1616fe8-bucket.

Passaggi successivi