Questo tutorial mostra come eseguire il deployment di una pipeline di dati in Dataflow per un flusso in tempo reale delle modifiche al database provenienti dal flusso di modifiche di una tabella Bigtable. L'output della pipeline viene scritto in una serie di file su Cloud Storage.
Viene fornito un set di dati di esempio per un'applicazione di ascolto di musica. In questo tutorial, monitori i brani ascoltati e poi classifichi i primi cinque in un periodo di tempo.
Questo tutorial è rivolto agli utenti tecnici che hanno familiarità con la scrittura di codice e il deployment di pipeline di dati in Google Cloud.
Obiettivi
Questo tutorial mostra come:
- Crea una tabella Bigtable con un flusso di modifiche abilitato.
- Esegui il deployment di una pipeline su Dataflow che trasforma e restituisce il flusso di modifiche.
- Visualizza i risultati della pipeline di dati.
Costi
In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:
Per generare una stima dei costi in base all'utilizzo previsto,
utilizza il calcolatore prezzi.
Al termine delle attività descritte in questo documento, puoi evitare l'addebito di ulteriori costi eliminando le risorse che hai creato. Per saperne di più, consulta Esegui la pulizia.
Prima di iniziare
- Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
-
Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto
(
roles/resourcemanager.projectCreator), che contiene l'autorizzazioneresourcemanager.projects.create. Scopri come concedere i ruoli. -
Creare un progetto Google Cloud :
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_IDcon un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_IDcon il nome del progetto Google Cloud . - Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
-
Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto
(
roles/resourcemanager.projectCreator), che contiene l'autorizzazioneresourcemanager.projects.create. Scopri come concedere i ruoli. -
Creare un progetto Google Cloud :
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_IDcon un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_IDcon il nome del progetto Google Cloud . - Aggiorna e installa la CLI
cbt.gcloud components update gcloud components install cbt
Installa Google Cloud CLI. Dopo l'installazione, inizializza Google Cloud CLI eseguendo il comando seguente:
gcloud initSe utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.
Crea o seleziona un Google Cloud progetto.
Ruoli richiesti per selezionare o creare un progetto
Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .
Abilita le API Dataflow, API Cloud Bigtable, API Cloud Bigtable Admin e Cloud Storage:
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere
i ruoli.
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Installa Google Cloud CLI. Dopo l'installazione, inizializza Google Cloud CLI eseguendo il comando seguente:
gcloud initSe utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.
Crea o seleziona un Google Cloud progetto.
Ruoli richiesti per selezionare o creare un progetto
Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .
Abilita le API Dataflow, API Cloud Bigtable, API Cloud Bigtable Admin e Cloud Storage:
Ruoli richiesti per abilitare le API
Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere
i ruoli.
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Prepara l'ambiente
Ottieni il codice
Clona il repository che contiene il codice campione. Se hai già scaricato questo repository, esegui il pull per ottenere l'ultima versione.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Crea un bucket
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME con un nome di bucket
che soddisfi i requisiti per la denominazione dei bucket.
Crea un'istanza Bigtable
Puoi utilizzare un'istanza esistente per questo tutorial o crearne una con le configurazioni predefinite in una regione vicina a te.
Creare una tabella
L'applicazione di esempio monitora i brani ascoltati dagli utenti e memorizza gli eventi di ascolto in Bigtable. Crea una tabella con uno stream di modifiche abilitato che abbia una famiglia di colonne (cf) e una colonna (song) e utilizzi gli ID utente per le chiavi di riga.
Crea la tabella.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Sostituisci quanto segue:
- PROJECT_ID: l'ID del progetto che stai utilizzando
- BIGTABLE_INSTANCE_ID: l'ID dell'istanza che conterrà la nuova tabella
Avvia la pipeline
Questa pipeline trasforma lo stream delle modifiche nel seguente modo:
- Legge il flusso di modifiche
- Recupera il titolo del brano
- Raggruppa gli eventi di ascolto dei brani in finestre di N secondi
- Conta i primi cinque brani
- Restituisce i risultati
Esegui la pipeline.
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Sostituisci BIGTABLE_REGION con l'ID della regione in cui si trova l'istanza Bigtable, ad esempio us-east5.
Comprendere la pipeline
I seguenti snippet di codice della pipeline possono aiutarti a comprendere il codice che stai eseguendo.
Lettura del flusso di modifiche
Il codice in questa configurazione di esempio configura il flusso di origine con i parametri per l'istanza e la tabella Bigtable specifiche.
Recupero del titolo del brano
Quando viene ascoltata una canzone, il nome viene scritto nella famiglia di colonne cf
e nel qualificatore di colonna song, quindi il codice estrae il valore dalla mutazione
del flusso di modifiche e lo restituisce al passaggio successivo della pipeline.
Conteggio dei primi cinque brani
Puoi utilizzare le funzioni Beam integrate Count e Top.of per ottenere le prime cinque
canzoni nella finestra corrente.
Output dei risultati
Questa pipeline scrive i risultati in standard out e nei file. Per i file, le scritture vengono raggruppate in gruppi di 10 elementi o segmenti di un minuto.
Visualizzare la pipeline
Nella console Google Cloud , vai alla pagina Dataflow.
Fai clic sul job con un nome che inizia con song-rank.
Nella parte inferiore dello schermo, fai clic su Mostra per aprire il pannello dei log.
Fai clic su Log del worker per monitorare i log di output dello stream di modifiche.
Operazioni di scrittura dei flussi
Utilizza la
CLI cbt
per scrivere un numero di ascolti di brani per vari utenti nella
tabella song-rank. È progettato per essere scritto in pochi minuti per simulare
gli ascolti in streaming dei brani nel tempo.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
Visualizzare l'output
Leggi l'output su Cloud Storage per visualizzare i brani più popolari.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Output di esempio:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
Esegui la pulizia
Per evitare che al tuo Account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.
Elimina il progetto
Elimina un progetto Google Cloud :
gcloud projects delete PROJECT_ID
Elimina singole risorse
Elimina il bucket e i file.
gcloud storage rm --recursive gs://BUCKET_NAME/Disattiva lo stream di modifiche nella tabella.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-periodElimina la tabella
song-rank.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rankArresta la pipeline di modifiche in tempo reale.
Elenca i job per ottenere l'ID job.
gcloud dataflow jobs list --region=BIGTABLE_REGIONAnnulla il job.
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGIONSostituisci JOB_ID con l'ID job visualizzato dopo il comando precedente.