Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Questo tutorial è una modifica di Esegui un DAG di analisi dei dati in Google Cloud che mostra come connettere l'ambiente Managed Airflow ad Amazon Web Services per utilizzare i dati archiviati. Mostra come utilizzare Managed Airflow per creare un DAG Apache Airflow. Il DAG unisce i dati di un set di dati pubblico BigQuery e di un file CSV archiviato in un bucket Amazon Web Services (AWS) S3 e poi esegue un job batch Managed Service for Apache Spark per elaborare i dati uniti.
Il set di dati pubblico BigQuery in questo tutorial è ghcn_d, un database integrato di riepiloghi climatici in tutto il mondo. Il file CSV contiene informazioni sulle date e sui nomi delle festività statunitensi dal 1997 al 2021.
La domanda a cui vogliamo rispondere utilizzando il DAG è: "Che temperatura c'era a Chicago il giorno del Ringraziamento negli ultimi 25 anni?"
Obiettivi
- Crea un ambiente Managed Airflow nella configurazione predefinita
- Crea un bucket in AWS S3
- crea un set di dati BigQuery vuoto
- Crea un nuovo bucket Cloud Storage
- Crea ed esegui un DAG che includa le seguenti attività:
- Caricare un set di dati esterno da S3 a Cloud Storage
- Carica un set di dati esterno da Cloud Storage a BigQuery
- Unire due set di dati in BigQuery
- Esegui un job PySpark di analisi dei dati
Prima di iniziare
Gestire le autorizzazioni in AWS
Segui la sezione "Creazione di policy con l'editor visivo" del tutorial AWS sulla creazione di policy IAM per creare una policy IAM personalizzata per AWS S3 con la seguente configurazione:
- Servizio: S3
- ListAllMyBuckets (
s3:ListAllMyBuckets), per visualizzare il bucket S3 - CreateBucket (
s3:CreateBucket), per creare un bucket - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls), per la creazione di un bucket - ListBucket (
s3:ListBucket), per concedere l'autorizzazione a elencare gli oggetti in un bucket S3 - PutObject (
s3:PutObject), per caricare file in un bucket - GetBucketVersioning (
s3:GetBucketVersioning), per eliminare un oggetto in un bucket - DeleteObject (
s3:DeleteObject), per eliminare un oggetto in un bucket - ListBucketVersions (
s3:ListBucketVersions), per eliminare un bucket - DeleteBucket (
s3:DeleteBucket), per eliminare un bucket - Risorse:scegli "Qualsiasi" accanto a "bucket" e "oggetto" per concedere autorizzazioni a qualsiasi risorsa di quel tipo.
- Tag: nessuno
- Nome:TutorialPolicy
Per saperne di più su ogni configurazione, consulta l'elenco delle azioni supportate in Amazon S3.
Abilita API
Abilita le seguenti API:
Console
Abilita le API Managed Service for Apache Spark, Managed Airflow, BigQuery e Cloud Storage.
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere
i ruoli.
gcloud
Abilita le API Managed Service for Apache Spark, Managed Airflow, BigQuery e Cloud Storage:
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere
i ruoli.
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Concedi le autorizzazioni
Concedi i seguenti ruoli e autorizzazioni al tuo account utente:
Concedi ruoli per la gestione degli ambienti Managed Airflow e dei bucket dell'ambiente.
Concedi il ruolo BigQuery Data Owner (
roles/bigquery.dataOwner) per creare un set di dati BigQuery.Concedi il ruolo Amministratore Storage (
roles/storage.admin) per creare un bucket Cloud Storage.
Crea e prepara l'ambiente Managed Airflow
Crea un ambiente Managed Airflow con parametri predefiniti:
- Scegli una regione con sede negli Stati Uniti.
- Scegli l'ultima versione di Managed Airflow.
Concedi i seguenti ruoli al account di servizio utilizzato nel tuo ambiente Managed Airflow affinché i worker Airflow eseguano correttamente le attività DAG:
- Utente BigQuery (
roles/bigquery.user) - Proprietario dati BigQuery (
roles/bigquery.dataOwner) - Service Account User (
roles/iam.serviceAccountUser) - Editor Dataproc (
roles/dataproc.editor) - Dataproc Worker (
roles/dataproc.worker)
- Utente BigQuery (
Creare e modificare le risorse correlate in Google Cloud
Installa il
apache-airflow-providers-amazonpacchetto PyPI nel tuo ambiente Managed Airflow.Crea un set di dati BigQuery vuoto con i seguenti parametri:
- Nome:
holiday_weather - Regione:
US
- Nome:
Crea un nuovo bucket Cloud Storage nella multiregione
US.Esegui questo comando per abilitare l'accesso privato Google nella subnet predefinita della regione in cui vuoi eseguire Managed Service for Apache Spark per soddisfare i requisiti di rete. Ti consigliamo di utilizzare la stessa regione del tuo ambiente Managed Airflow.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Crea risorse correlate in AWS
Crea un bucket S3 con le impostazioni predefinite nella tua regione preferita.
Connettiti ad AWS da Managed Airflow
- Recuperare l'ID chiave di accesso e la chiave di accesso segreta di AWS
Aggiungi la tua connessione AWS S3 utilizzando la UI di Airflow:
- Vai ad Amministrazione > Connessioni.
Crea una nuova connessione con la seguente configurazione:
- ID connessione:
aws_s3_connection - Tipo di connessione:
Amazon S3 - Extra (o JSON dei campi extra):
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- ID connessione:
Elaborazione dei dati utilizzando Managed Service for Apache Spark
Questa sezione descrive l'elaborazione dei dati con Managed Service for Apache Spark.
Esplora il job PySpark di esempio
Il codice mostrato di seguito è un esempio di job PySpark che converte la temperatura da decimi di grado Celsius a gradi Celsius. Questo job converte i dati di temperatura del set di dati in un formato diverso.
Carica il file PySpark su Cloud Storage
Per caricare il file PySpark su Cloud Storage:
Salva data_analytics_process.py sulla tua macchina locale.
Nella console Google Cloud , vai alla pagina Browser Cloud Storage:
Fai clic sul nome del bucket che hai creato in precedenza.
Nella scheda Oggetti del bucket, fai clic sul pulsante Carica i file, seleziona
data_analytics_process.pynella finestra di dialogo che viene visualizzata e fai clic su Apri.
Carica il file CSV su AWS S3
Per caricare il file holidays.csv:
- Salva
holidays.csvsulla tua macchina locale. - Segui la guida di AWS per caricare il file nel tuo bucket.
DAG di analisi dei dati
Questa sezione descrive la configurazione e l'utilizzo del DAG di analisi dei dati.
Esplora il DAG di esempio
Il DAG utilizza più operatori per trasformare e unificare i dati:
Il
S3ToGCSOperatortrasferisce il file holidays.csv dal bucket AWS S3 al bucket Cloud Storage.GCSToBigQueryOperatorimporta il file holidays.csv da Cloud Storage in una nuova tabella nel set di dati BigQueryholidays_weatherche hai creato in precedenza.DataprocCreateBatchOperatorcrea ed esegue un job batch PySpark utilizzando Managed Service for Apache Spark.BigQueryInsertJobOperatorunisce i dati di holidays.csv nella colonna "Date" con i dati meteo del set di dati pubblico BigQuery ghcn_d. Le attivitàBigQueryInsertJobOperatorvengono generate dinamicamente utilizzando un ciclo for e si trovano in unTaskGroupper una migliore leggibilità nella visualizzazione Grafico dell'interfaccia utente di Airflow.
Utilizzare la UI di Airflow per aggiungere variabili
In Airflow, le variabili sono un modo universale per archiviare e recuperare impostazioni o configurazioni arbitrarie come un semplice archivio chiave-valore. Questo DAG utilizza le variabili Airflow per archiviare i valori comuni. Per aggiungerli al tuo ambiente:
Accedi all'interfaccia utente di Airflow dalla consoleGoogle Cloud .
Vai ad Amministrazione > Variabili.
Aggiungi le seguenti variabili:
s3_bucket: il nome del bucket S3 che hai creato in precedenza.gcp_project: il tuo ID progetto.gcs_bucket: il nome del bucket creato in precedenza (senza il prefissogs://).gce_region: la regione in cui vuoi che il job Managed Service for Apache Spark soddisfi i requisiti di rete di Managed Service for Apache Spark. Questa è la regione in cui hai abilitato l'accesso privato Google in precedenza.dataproc_service_account: il account di servizio per il tuo ambiente Managed Airflow. Puoi trovare questo service account nella scheda di configurazione dell'ambiente per il tuo ambiente Managed Airflow.
Carica il DAG nel bucket del tuo ambiente
Managed Airflow pianifica i DAG che si trovano nella
cartella /dags del bucket del tuo ambiente. Per caricare il DAG utilizzando la consoleGoogle Cloud :
Sulla tua macchina locale, salva s3togcsoperator_tutorial.py.
Nella console Google Cloud , vai alla pagina Ambienti.
Nell'elenco degli ambienti, nella colonna Cartella DAG, fai clic sul link DAG. Si apre la cartella DAG del tuo ambiente.
Fai clic su Carica file.
Seleziona
s3togcsoperator_tutorial.pysulla macchina locale e fai clic su Apri.
Attiva il DAG
Nell'ambiente Managed Airflow, fai clic sulla scheda DAG.
Fai clic sull'ID DAG
s3_to_gcs_dag.Fai clic su Attiva DAG.
Attendi circa 5-10 minuti finché non viene visualizzato un segno di spunta verde che indica che le attività sono state completate correttamente.
Convalida l'esito positivo del DAG
Nella console Google Cloud , vai alla pagina BigQuery.
Nel riquadro Explorer, fai clic sul nome del progetto.
Fai clic su
holidays_weather_joined.Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella colonna dei valori sono in decimi di grado Celsius.
Fai clic su
holidays_weather_normalized.Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella colonna dei valori sono in gradi Celsius.
Esegui la pulizia
Elimina le singole risorse che hai creato per questo tutorial:
Elimina il file
holidays.csvnel bucket AWS S3.Elimina il bucket AWS S3 che hai creato.
Elimina il bucket Cloud Storage che hai creato per questo tutorial.
Elimina l'ambiente Managed Airflow, incluso l'eliminazione manuale del bucket dell'ambiente.