Trasmettere messaggi in streaming da Pub/Sub utilizzando Dataflow e Cloud Storage
Dataflow è un servizio completamente gestito per la trasformazione e l'arricchimento dei dati in modalità flusso (in tempo reale) e batch con uguale affidabilità ed espressività. Fornisce un ambiente di sviluppo delle pipeline semplificato utilizzando l'SDK Apache Beam, che dispone di un ricco set di primitive per windowing e analisi delle sessioni, nonché di un ecosistema di connettori di origine e sink. Questa guida rapida mostra come utilizzare Dataflow per:
- Leggere i messaggi pubblicati in un argomento Pub/Sub
- Raggruppa i messaggi per timestamp
- Scrivi i messaggi in Cloud Storage
Questa guida rapida introduce l'utilizzo di Dataflow in Java e Python. È supportato anche SQL. Questa guida rapida è disponibile anche come tutorial di Google Cloud Skills Boost che offre credenziali temporanee per iniziare.
Puoi anche iniziare utilizzando i modelli Dataflow basati su UI se non intendi eseguire l'elaborazione personalizzata dei dati.
Prima di iniziare
- Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
-
Installa Google Cloud CLI.
-
Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.
-
Per inizializzare gcloud CLI, esegui questo comando:
gcloud init -
Crea o seleziona un Google Cloud progetto.
Ruoli richiesti per selezionare o creare un progetto
- Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
-
Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto
(
roles/resourcemanager.projectCreator), che contiene l'autorizzazioneresourcemanager.projects.create. Scopri come concedere i ruoli.
-
Creare un progetto Google Cloud :
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_IDcon un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_IDcon il nome del progetto Google Cloud .
-
Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud API JSON Storage, Pub/Sub, Resource Manager e Cloud Scheduler:
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (
roles/serviceusage.serviceUsageAdmin), che include l'autorizzazioneserviceusage.services.enable. Scopri come concedere i ruoli.gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configura l'autenticazione:
-
Assicurati di disporre del ruolo IAM Creazione account di servizio
(
roles/iam.serviceAccountCreator) e del ruolo Amministratore IAM progetto (roles/resourcemanager.projectIamAdmin). Scopri come concedere i ruoli. -
Crea l'account di servizio:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Sostituisci
SERVICE_ACCOUNT_NAMEcon un nome per il account di servizio. -
Concedi ruoli al account di servizio. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME: il nome del account di servizioPROJECT_ID: l'ID progetto in cui hai creato il account di servizioROLE: il ruolo da concedere
-
Concedi il ruolo richiesto all'entità che collegherà ilaccount di serviziot ad altre risorse.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME: il nome del account di servizioPROJECT_ID: l'ID progetto in cui hai creato il account di servizioUSER_EMAIL: l'indirizzo email di un Account Google
-
Assicurati di disporre del ruolo IAM Creazione account di servizio
(
-
Installa Google Cloud CLI.
-
Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.
-
Per inizializzare gcloud CLI, esegui questo comando:
gcloud init -
Crea o seleziona un Google Cloud progetto.
Ruoli richiesti per selezionare o creare un progetto
- Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
-
Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto
(
roles/resourcemanager.projectCreator), che contiene l'autorizzazioneresourcemanager.projects.create. Scopri come concedere i ruoli.
-
Creare un progetto Google Cloud :
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_IDcon un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_IDcon il nome del progetto Google Cloud .
-
Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud API JSON Storage, Pub/Sub, Resource Manager e Cloud Scheduler:
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (
roles/serviceusage.serviceUsageAdmin), che include l'autorizzazioneserviceusage.services.enable. Scopri come concedere i ruoli.gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configura l'autenticazione:
-
Assicurati di disporre del ruolo IAM Creazione account di servizio
(
roles/iam.serviceAccountCreator) e del ruolo Amministratore IAM progetto (roles/resourcemanager.projectIamAdmin). Scopri come concedere i ruoli. -
Crea l'account di servizio:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Sostituisci
SERVICE_ACCOUNT_NAMEcon un nome per il account di servizio. -
Concedi ruoli al account di servizio. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME: il nome del account di servizioPROJECT_ID: l'ID progetto in cui hai creato il account di servizioROLE: il ruolo da concedere
-
Concedi il ruolo richiesto all'entità che collegherà ilaccount di serviziot ad altre risorse.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Sostituisci quanto segue:
SERVICE_ACCOUNT_NAME: il nome del account di servizioPROJECT_ID: l'ID progetto in cui hai creato il account di servizioUSER_EMAIL: l'indirizzo email di un Account Google
-
Assicurati di disporre del ruolo IAM Creazione account di servizio
(
-
Crea credenziali di autenticazione locali per il tuo account utente:
gcloud auth application-default login
Se viene restituito un errore di autenticazione e utilizzi un provider di identità (IdP) esterno, verifica di aver acceduto a gcloud CLI con la tua identità federata.
Configurare il progetto Pub/Sub
-
Crea variabili per il bucket, il progetto e la regione. I nomi dei bucket Cloud Storage devono essere univoci a livello globale. Seleziona una regione Dataflow vicina a quella in cui esegui i comandi in questa guida rapida. Il valore della variabile
REGIONdeve essere un nome di regione valido. Per saperne di più su regioni e località, consulta Località di Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Crea un bucket Cloud Storage di proprietà di questo progetto:
gcloud storage buckets create gs://$BUCKET_NAME
-
Crea un argomento Pub/Sub in questo progetto:
gcloud pubsub topics create $TOPIC_ID
-
Crea un job Cloud Scheduler in questo progetto. Il job pubblica un messaggio in un argomento Pub/Sub a intervalli di un minuto.
Se non esiste un'app App Engine per il progetto, questo passaggio ne creerà una.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Avvia il lavoro.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Utilizza i seguenti comandi per clonare il repository di avvio rapido e passare alla directory delcodice campioneo:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Trasmettere flussi di messaggi da Pub/Sub a Cloud Storage
Esempio di codice
Questo codice campione utilizza Dataflow per:
- Leggi i messaggi Pub/Sub.
- Raggruppa i messaggi in intervalli di dimensioni fisse in base ai timestamp di pubblicazione.
Scrivi i messaggi in ogni finestra nei file di Cloud Storage.
Java
Python
Avvia la pipeline
Per avviare la pipeline, esegui questo comando:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
Il comando precedente viene eseguito localmente e avvia un job Dataflow
che viene eseguito nel cloud. Quando il comando restituisce JOB_MESSAGE_DETAILED: Workers
have started successfully, esci dal programma locale utilizzando Ctrl+C.
Osserva l'avanzamento del job e della pipeline
Puoi osservare l'avanzamento del job nella console Dataflow.
Apri la visualizzazione dei dettagli del job per visualizzare:
- Struttura del job
- Log job
- Metriche della fase
Potresti dover attendere alcuni minuti prima di visualizzare i file di output in Cloud Storage.
In alternativa, utilizza la riga di comando riportata di seguito per verificare quali file sono stati scritti.
gcloud storage ls gs://${BUCKET_NAME}/samples/
L'output dovrebbe essere simile al seguente:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Google Cloud con le risorse.
Elimina il job Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
Nella console Dataflow, arresta il job. Annulla la pipeline senza svuotarla.
Elimina l'argomento.
gcloud pubsub topics delete $TOPIC_ID
Elimina i file creati dalla pipeline.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Rimuovi il bucket Cloud Storage.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
Elimina il account di servizio:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
(Facoltativo) Revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.
gcloud auth application-default revoke
-
(Facoltativo) Revoca le credenziali da gcloud CLI.
gcloud auth revoke
Passaggi successivi
Se vuoi visualizzare i messaggi Pub/Sub in base a un timestamp personalizzato, puoi specificare il timestamp come attributo nel messaggio Pub/Sub e poi utilizzare il timestamp personalizzato con
withTimestampAttributedi PubsubIO.Dai un'occhiata ai modelli Dataflow open source di Google progettati per lo streaming.
Scopri di più su come Dataflow si integra con Pub/Sub.
Dai un'occhiata a questo tutorial che legge da Pub/Sub e scrive in BigQuery utilizzando i modelli Dataflow Flex.
Per saperne di più sul windowing, consulta l'esempio di pipeline di giochi per dispositivi mobili Apache Beam.