Este tutorial mostra como implementar um pipeline de dados no Dataflow para um fluxo em tempo real de alterações à base de dados provenientes de um fluxo de alterações de uma tabela do Bigtable. A saída do pipeline é escrita numa série de ficheiros no Cloud Storage.
É fornecido um conjunto de dados de exemplo para uma aplicação de audição de música. Neste tutorial, vai monitorizar as músicas que são ouvidas e, em seguida, classificar as cinco principais durante um período.
Este tutorial destina-se a utilizadores técnicos com experiência na escrita de código e na implementação de pipelines de dados no Google Cloud.
Prepare o ambiente
Obter o código
Clone o repositório que contém o código de exemplo. Se já transferiu este repositório anteriormente, extraia a versão mais recente.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Crie um contentor
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME
with a bucket name
that meets the bucket naming requirements.
Crie uma instância do Bigtable
Pode usar uma instância existente para este tutorial ou criar uma instância com as configurações predefinidas numa região perto de si.
Criar uma tabela
A aplicação de exemplo acompanha as músicas que os utilizadores ouvem e armazena os eventos de audição no Bigtable. Crie uma tabela com uma stream de alterações ativada que tenha uma família de colunas (cf) e uma coluna (song) e use IDs de utilizadores para chaves de linhas.
Crie a tabela.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Substitua o seguinte:
- PROJECT_ID: o ID do projeto que está a usar
- BIGTABLE_INSTANCE_ID: o ID da instância que vai conter a nova tabela
Inicie a conduta
Este pipeline transforma o fluxo de alterações fazendo o seguinte:
- Lê a stream de alterações
- Obtém o nome da música
- Agrupa os eventos de audição de músicas em intervalos de N segundos
- Conta as cinco músicas mais ouvidas
- Produz os resultados
Execute a conduta.
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Substitua BIGTABLE_REGION pelo ID da região em que a sua instância do Bigtable se encontra, como us-east5
.
Compreenda o processo
Os seguintes fragmentos de código do pipeline podem ajudar a compreender o código que está a executar.
Ler a stream de alterações
O código neste exemplo configura a stream de origem com os parâmetros da instância e da tabela do Bigtable específicas.
Obter o nome da música
Quando uma música é ouvida, o nome da música é escrito na família de colunas cf
e no qualificador de colunas song
, pelo que o código extrai o valor da mutação do fluxo de alterações e envia-o para o passo seguinte do pipeline.
Contagem das cinco músicas mais ouvidas
Pode usar as funções integradas do Beam Count
e Top.of
para obter as cinco músicas mais ouvidas na janela atual.
Produção dos resultados
Este pipeline escreve os resultados na saída padrão, bem como em ficheiros. Para os ficheiros, divide as gravações em grupos de 10 elementos ou segmentos de 1 minuto.
Veja o pipeline
Na Google Cloud consola, aceda à página Fluxo de dados.
Clique na tarefa com um nome que comece por song-rank.
Na parte inferior do ecrã, clique em Mostrar para abrir o painel de registos.
Clique em Registos do trabalhador para monitorizar os registos de saída da stream de alterações.
Escritas de streams
Use a
cbt
CLI
para escrever um número de audições de músicas para vários utilizadores na tabela song-rank
. Esta ação foi concebida para escrever durante alguns minutos para simular
as audições de músicas em streaming ao longo do tempo.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
Veja a saída
Leia o resultado no Cloud Storage para ver as músicas mais populares.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Exemplo de saída:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]