Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questo tutorial mostra come utilizzare Cloud Composer per creare un DAG di Apache Airflow. Il DAG unisce i dati di un set di dati pubblico di BigQuery e di un file CSV archiviato in un bucket Cloud Storage, quindi esegue un job batch di Managed Service per Apache Spark per elaborare i dati uniti.
Il set di dati pubblico di BigQuery in questo tutorial è ghcn_d, un database integrato di riepiloghi climatici in tutto il mondo. Il file CSV contiene informazioni sulle date e sui nomi delle festività statunitensi dal 1997 al 2021.
La domanda a cui vogliamo rispondere utilizzando il DAG è: "Quanto caldo ha fatto a Chicago il giorno del Ringraziamento negli ultimi 25 anni?"
Obiettivi
- Crea un ambiente Cloud Composer nella configurazione predefinita
- Crea un set di dati BigQuery vuoto
- Crea un nuovo bucket Cloud Storage
- Crea ed esegui un DAG che includa le seguenti attività:
- Carica un set di dati esterno da Cloud Storage a BigQuery
- Unisci due set di dati in BigQuery
- Esegui un job PySpark di analisi dei dati
Prima di iniziare
Abilita API
Abilita le seguenti API:
Console
Abilita le API Managed Service per Apache Spark, Cloud Composer, BigQuery e Cloud Storage.
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo servizi (roles/serviceusage.serviceUsageAdmin), che
contiene l'autorizzazione serviceusage.services.enable. Scopri come concedere
i ruoli.
gcloud
Abilita le API Managed Service per Apache Spark, Cloud Composer, BigQuery e Cloud Storage:
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo servizi (roles/serviceusage.serviceUsageAdmin), che contiene l'
serviceusage.services.enable autorizzazione. Scopri come concedere
i ruoli.
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Concedi le autorizzazioni
Concedi i seguenti ruoli e autorizzazioni al tuo account utente:
Concedi i ruoli per la gestione degli ambienti Cloud Composer e dei bucket dell'ambiente.
Concedi il ruolo Proprietario dati BigQuery (
roles/bigquery.dataOwner) per creare un set di dati BigQuery.Concedi il ruolo Amministratore Storage (
roles/storage.admin) per creare un bucket Cloud Storage.
Crea e prepara l'ambiente Cloud Composer
Crea un ambiente Cloud Composer con i parametri predefiniti:
- Scegli una regione con sede negli Stati Uniti.
- Scegli l'ultima versione di Cloud Composer.
Concedi i seguenti ruoli al account di servizio utilizzato nel tuo ambiente Cloud Composer affinché i worker di Airflow possano eseguire correttamente le attività DAG:
- Utente BigQuery (
roles/bigquery.user) - Proprietario dati BigQuery (
roles/bigquery.dataOwner) - Utente Service Account (
roles/iam.serviceAccountUser) - Editor Dataproc (
roles/dataproc.editor) - Worker Dataproc (
roles/dataproc.worker)
- Utente BigQuery (
Crea risorse correlate
Crea un set di dati BigQuery vuoto con i seguenti parametri:
- Nome:
holiday_weather - Regione:
US
- Nome:
Crea un nuovo bucket Cloud Storage nella multiregione
US.Esegui il seguente comando per abilitare l'accesso privato Google nella subnet predefinita della regione in cui vuoi eseguire Managed Service per Apache Spark per soddisfare i requisiti di rete. Ti consigliamo di utilizzare la stessa regione del tuo ambiente Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Elaborazione dei dati utilizzando Managed Service per Apache Spark
Esplora il job PySpark di esempio
Il codice mostrato di seguito è un esempio di job PySpark che converte la temperatura da decimi di grado Celsius a gradi Celsius. Questo job converte i dati di temperatura del set di dati in un formato diverso.
Carica i file di supporto in Cloud Storage
Per caricare il file PySpark e il set di dati archiviato in holidays.csv:
Salva data_analytics_process.py sulla tua macchina locale.
Salva holidays.csv sulla tua macchina locale.
Nella Google Cloud console, vai alla pagina Browser Cloud Storage:
Fai clic sul nome del bucket che hai creato in precedenza.
Nella scheda Oggetti del bucket, fai clic sul pulsante Carica file , seleziona
data_analytics_process.pyeholidays.csvnella finestra di dialogo che viene visualizzata e fai clic su Apri.
DAG di analisi dei dati
Esplora il DAG di esempio
Il DAG utilizza più operatori per trasformare e unificare i dati:
The
GCSToBigQueryOperatorimporta il file holidays.csv da Cloud Storage a una nuova tabella nel set di dati BigQueryholidays_weatherche hai creato in precedenza.The
DataprocCreateBatchOperatorcrea ed esegue un job batch PySpark utilizzando Managed Service per Apache Spark.The
BigQueryInsertJobOperatorunisce i dati di holidays.csv nella colonna "Date" con i dati meteorologici del set di dati pubblico di BigQuery ghcn_d. Le attivitàBigQueryInsertJobOperatorvengono generate dinamicamente utilizzando un ciclo for e si trovano in unTaskGroupper una migliore leggibilità nella visualizzazione del grafico della UI di Airflow.
Utilizza la UI di Airflow per aggiungere le variabili
In Airflow, variabili sono un modo universale per archiviare e recuperare impostazioni o configurazioni arbitrarie come un semplice archivio chiave-valore. Questo DAG utilizza le variabili di Airflow per archiviare i valori comuni. Per aggiungerli al tuo ambiente:
Vai ad Amministratore > Variabili.
Aggiungi le seguenti variabili:
gcp_project: il tuo ID progetto.gcs_bucket: il nome del bucket che hai creato in precedenza (senza il prefissogs://).gce_region: la regione in cui vuoi che il job Managed Service per Apache Spark soddisfi i requisiti di rete di Managed Service per Apache Spark. Questa è la regione in cui hai abilitato l'accesso privato Google in precedenza.dataproc_service_account: il account di servizio per il tuo ambiente Cloud Composer. Puoi trovare questo service account nella scheda di configurazione dell'ambiente per il tuo ambiente Cloud Composer.
Carica il DAG nel bucket dell'ambiente
Cloud Composer pianifica i DAG che si trovano nella cartella /dags nel bucket dell'ambiente. Per caricare il DAG utilizzando la
Google Cloud console:
Salva data_analytics_dag.py sulla tua macchina locale.
Nella Google Cloud console, vai alla pagina Ambienti.
Nell'elenco degli ambienti, nella colonna Cartella DAG , fai clic sul link DAG. Si apre la cartella DAG del tuo ambiente.
Fai clic su Carica file.
Seleziona
data_analytics_dag.pysulla tua macchina locale e fai clic su Apri.
Attiva il DAG
Nel tuo ambiente Cloud Composer, fai clic sulla scheda DAG.
Fai clic sull'ID DAG
data_analytics_dag.Fai clic su Attiva DAG.
Attendi circa 5-10 minuti finché non visualizzi un segno di spunta verde che indica che le attività sono state completate correttamente.
Convalida l'esito positivo del DAG
Nella Google Cloud console, vai alla pagina BigQuery.
Nel riquadro Explorer, fai clic sul nome del progetto.
Fai clic su
holidays_weather_joined.Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella colonna dei valori sono in decimi di grado Celsius.
Fai clic su
holidays_weather_normalized.Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella colonna dei valori sono in gradi Celsius.
Approfondimento con Managed Service per Apache Spark (facoltativo)
Puoi provare una versione avanzata di questo DAG con un flusso di elaborazione dei dati PySpark più complesso. Consulta l'estensione Managed Service per Apache Spark per l'esempio di analisi dei dati su GitHub.
Esegui la pulizia
Elimina le singole risorse che hai creato per questo tutorial:
Elimina il bucket Cloud Storage che hai creato per questo tutorial.
Elimina l'ambiente Cloud Composer, incluso l'eliminazione manuale del bucket dell'ambiente.
Passaggi successivi
- Esegui un DAG di analisi dei dati in Google Cloud Utilizzo dei dati di AWS.
- Esegui un DAG di analisi dei dati in Azure.