Crea una pipeline Dataflow utilizzando Python
Questo documento mostra come utilizzare l'SDK Apache Beam per Python per creare un programma che definisce una pipeline. Dopodiché, esegui la pipeline utilizzando un runner locale diretto o un runner basato su cloud come Dataflow. Per un'introduzione alla pipeline WordCount, guarda il video Come utilizzare WordCount in Apache Beam.
Per seguire le indicazioni dettagliate per questa attività direttamente nella console Google Cloud , fai clic su Procedura guidata:
Prima di iniziare
- Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
-
Installa Google Cloud CLI.
-
Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.
-
Per inizializzare gcloud CLI, esegui questo comando:
gcloud init -
Crea o seleziona un Google Cloud progetto.
Ruoli richiesti per selezionare o creare un progetto
- Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
-
Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto
(
roles/resourcemanager.projectCreator), che contiene l'autorizzazioneresourcemanager.projects.create. Scopri come concedere i ruoli.
-
Creare un progetto Google Cloud :
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_IDcon un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_IDcon il nome del progetto Google Cloud .
-
Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore e Cloud Resource Manager:
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (
roles/serviceusage.serviceUsageAdmin), che include l'autorizzazioneserviceusage.services.enable. Scopri come concedere i ruoli.gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Crea credenziali di autenticazione locali per il tuo account utente:
gcloud auth application-default login
Se viene restituito un errore di autenticazione e utilizzi un provider di identità (IdP) esterno, verifica di aver acceduto a gcloud CLI con la tua identità federata.
-
Concedi ruoli al tuo account utente. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Sostituisci quanto segue:
PROJECT_ID: il tuo ID progetto.USER_IDENTIFIER: l'identificatore del tuo account utente . Ad esempio:myemail@example.com.ROLE: il ruolo IAM che concedi al tuo account utente.
-
Installa Google Cloud CLI.
-
Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.
-
Per inizializzare gcloud CLI, esegui questo comando:
gcloud init -
Crea o seleziona un Google Cloud progetto.
Ruoli richiesti per selezionare o creare un progetto
- Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
-
Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto
(
roles/resourcemanager.projectCreator), che contiene l'autorizzazioneresourcemanager.projects.create. Scopri come concedere i ruoli.
-
Creare un progetto Google Cloud :
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_IDcon un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_IDcon il nome del progetto Google Cloud .
-
Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore e Cloud Resource Manager:
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (
roles/serviceusage.serviceUsageAdmin), che include l'autorizzazioneserviceusage.services.enable. Scopri come concedere i ruoli.gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Crea credenziali di autenticazione locali per il tuo account utente:
gcloud auth application-default login
Se viene restituito un errore di autenticazione e utilizzi un provider di identità (IdP) esterno, verifica di aver acceduto a gcloud CLI con la tua identità federata.
-
Concedi ruoli al tuo account utente. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Sostituisci quanto segue:
PROJECT_ID: il tuo ID progetto.USER_IDENTIFIER: l'identificatore del tuo account utente . Ad esempio:myemail@example.com.ROLE: il ruolo IAM che concedi al tuo account utente.
Concedi ruoli al account di servizio Compute Engine predefinito. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Sostituisci
PROJECT_IDcon l'ID progetto. - Sostituisci
PROJECT_NUMBERcon il numero del progetto. Per trovare il numero del progetto, consulta Identifica i progetti o utilizza il comandogcloud projects describe. - Sostituisci
SERVICE_ACCOUNT_ROLEcon ogni singolo ruolo.
-
Crea un bucket Cloud Storage e configuralo nel seguente modo:
-
Imposta la classe di archiviazione su
S(Standard). -
Imposta la posizione di archiviazione su:
US(Stati Uniti). -
Sostituisci
BUCKET_NAMEcon un nome di bucket univoco. Non includere informazioni sensibili nel nome del bucket perché lo spazio dei nomi dei bucket è globale e visibile pubblicamente.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Imposta la classe di archiviazione su
- Copia l' Google Cloud ID progetto e il nome del bucket Cloud Storage. Ti serviranno questi valori più avanti in questo documento.
Configura l'ambiente
In questa sezione, utilizza il prompt dei comandi per configurare un ambiente virtuale Python isolato per eseguire il progetto della pipeline utilizzando venv. Questo processo ti consente di isolare le dipendenze di un progetto da quelle di altri progetti.
Se non hai un prompt dei comandi a portata di mano, puoi utilizzare Cloud Shell. Cloud Shell ha già installato il gestore dei pacchetti per Python 3, quindi puoi passare alla creazione di un ambiente virtuale.
Per installare Python e creare un ambiente virtuale, segui questi passaggi:
- Verifica che Python 3 e
pipsiano in esecuzione nel tuo sistema:python --version python -m pip --version
- Se necessario, installa Python 3 e poi configura un ambiente virtuale Python: segui le istruzioni fornite nelle sezioni Installazione di Python e Configurazione di venv della pagina Configurazione di un ambiente di sviluppo Python.
Al termine della guida rapida, puoi disattivare l'ambiente virtuale eseguendo deactivate.
Scarica l'SDK Apache Beam
L'SDK Apache Beam è un modello di programmazione open source per pipeline di dati. Definisci una pipeline con un programma Apache Beam e poi scegli un runner, ad esempio Dataflow, per eseguire la pipeline.
Per scaricare e installare l'SDK Apache Beam, segui questi passaggi:
- Verifica di trovarti nell'ambiente virtuale Python che hai creato nella sezione precedente.
Assicurati che il prompt inizi con
<env_name>, doveenv_nameè il nome dell'ambiente virtuale. - Installa l'ultima versione dell'SDK Apache Beam per Python:
pip install apache-beam[gcp]
Esegui la pipeline in locale
Per vedere come viene eseguita una pipeline in locale, utilizza un modulo Python già pronto per l'esempio wordcount
incluso nel pacchetto apache_beam.
L'esempio di pipeline wordcount esegue le seguenti operazioni:
Accetta un file di testo come input.
Questo file di testo si trova in un bucket Cloud Storage con il nome risorsa
gs://dataflow-samples/shakespeare/kinglear.txt.- Analizza ogni riga in parole.
- Esegue un conteggio della frequenza delle parole tokenizzate.
Per preparare la pipeline wordcount in locale:
- Dal terminale locale, esegui l'esempio
wordcount:python -m apache_beam.examples.wordcount \ --output outputs
- Visualizza l'output della pipeline:
more outputs* - Per uscire, premi q.
wordcount.py
su GitHub di Apache Beam.
Esegui la pipeline sul servizio Dataflow
In questa sezione, esegui la pipeline di esempiowordcount dal pacchetto apache_beam nel servizio Dataflow. Questo
esempio specifica DataflowRunner come parametro per
--runner.
- Esegui la pipeline:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Sostituisci quanto segue:
DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempioeurope-west1Il flag
--regionsostituisce la regione predefinita impostata nel server dei metadati, nel client locale o nelle variabili di ambiente.BUCKET_NAME: il nome del bucket Cloud Storage che hai copiato in precedenzaPROJECT_ID: l'ID progetto Google Cloud che hai copiato in precedenza
Visualizza i tuoi risultati
Quando esegui una pipeline utilizzando Dataflow, i risultati vengono archiviati in un bucket Cloud Storage. In questa sezione, verifica che la pipeline sia in esecuzione utilizzando la console Google Cloud o il terminale locale.
ConsoleGoogle Cloud
Per visualizzare i risultati nella Google Cloud console, segui questi passaggi:
- Nella console Google Cloud , vai alla pagina Job di Dataflow.
La pagina Job mostra i dettagli del job
wordcount, incluso lo stato In esecuzione all'inizio e poi Riuscito. - Vai alla pagina Bucket di Cloud Storage.
Nell'elenco dei bucket del tuo progetto, fai clic sul bucket di archiviazione creato in precedenza.
Nella directory
wordcountvengono visualizzati i file di output creati dal job.
Terminale locale
Visualizza i risultati dal terminale o utilizzando Cloud Shell.
- Per elencare i file di output, utilizza il comando
gcloud storage ls:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- Per visualizzare i risultati nei file di output, utilizza il comando
gcloud storage cat:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Sostituisci BUCKET_NAME con il nome del bucket Cloud Storage utilizzato
nel programma della pipeline.
Modifica il codice della pipeline
La pipelinewordcount negli esempi precedenti distingue tra parole maiuscole e minuscole.
I seguenti passaggi mostrano come modificare la pipeline in modo che la pipeline wordcount
non faccia distinzione tra maiuscole e minuscole.
- Sulla macchina locale, scarica l'ultima copia del
codice
wordcountdal repository GitHub di Apache Beam. - Dal terminale locale, esegui la pipeline:
python wordcount.py --output outputs
- Visualizza i risultati:
more outputs* - Per uscire, premi q.
- In un editor a tua scelta, apri il file
wordcount.py. - All'interno della funzione
run, esamina i passaggi della pipeline:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Dopo
split, le righe vengono suddivise in parole come stringhe. - Per convertire le stringhe in minuscolo, modifica la riga dopo
split: Questa modifica mappa la funzionecounts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lowersu ogni parola. Questa riga è equivalente abeam.Map(lambda word: str.lower(word)). - Salva il file ed esegui il job
wordcountmodificato:python wordcount.py --output outputs
- Visualizza i risultati della pipeline modificata:
more outputs* - Per uscire, premi q.
- Esegui la pipeline modificata sul servizio Dataflow:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Sostituisci quanto segue:
DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job DataflowBUCKET_NAME: il nome del bucket Cloud StoragePROJECT_ID: l'ID progetto Google Cloud
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Google Cloud con le risorse.
- Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.
- Fai clic sulla casella di controllo del bucket da eliminare.
- Per eliminare il bucket, fai clic su Elimina, quindi segui le istruzioni.
Se mantieni il progetto, revoca i ruoli che hai concesso al account di servizio Compute Engine predefinito. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
(Facoltativo) Revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.
gcloud auth application-default revoke
-
(Facoltativo) Revoca le credenziali da gcloud CLI.
gcloud auth revoke