Processe uma stream de alterações do Bigtable

Este tutorial mostra como implementar um pipeline de dados no Dataflow para um fluxo em tempo real de alterações à base de dados provenientes de um fluxo de alterações de uma tabela do Bigtable. A saída do pipeline é escrita numa série de ficheiros no Cloud Storage.

É fornecido um conjunto de dados de exemplo para uma aplicação de audição de música. Neste tutorial, vai monitorizar as músicas que são ouvidas e, em seguida, classificar as cinco principais durante um período.

Este tutorial destina-se a utilizadores técnicos com experiência na escrita de código e na implementação de pipelines de dados no Google Cloud.

Prepare o ambiente

Obter o código

Clone o repositório que contém o código de exemplo. Se já transferiu este repositório anteriormente, extraia a versão mais recente.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Crie um contentor

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    Crie uma instância do Bigtable

    Pode usar uma instância existente para este tutorial ou criar uma instância com as configurações predefinidas numa região perto de si.

    Criar uma tabela

    A aplicação de exemplo acompanha as músicas que os utilizadores ouvem e armazena os eventos de audição no Bigtable. Crie uma tabela com uma stream de alterações ativada que tenha uma família de colunas (cf) e uma coluna (song) e use IDs de utilizadores para chaves de linhas.

    Crie a tabela.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto que está a usar
    • BIGTABLE_INSTANCE_ID: o ID da instância que vai conter a nova tabela

    Inicie a conduta

    Este pipeline transforma o fluxo de alterações fazendo o seguinte:

    1. Lê a stream de alterações
    2. Obtém o nome da música
    3. Agrupa os eventos de audição de músicas em intervalos de N segundos
    4. Conta as cinco músicas mais ouvidas
    5. Produz os resultados

    Execute a conduta.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Substitua BIGTABLE_REGION pelo ID da região em que a sua instância do Bigtable se encontra, como us-east5.

    Compreenda o processo

    Os seguintes fragmentos de código do pipeline podem ajudar a compreender o código que está a executar.

    Ler a stream de alterações

    O código neste exemplo configura a stream de origem com os parâmetros da instância e da tabela do Bigtable específicas.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Obter o nome da música

    Quando uma música é ouvida, o nome da música é escrito na família de colunas cf e no qualificador de colunas song, pelo que o código extrai o valor da mutação do fluxo de alterações e envia-o para o passo seguinte do pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Contagem das cinco músicas mais ouvidas

    Pode usar as funções integradas do Beam Count e Top.of para obter as cinco músicas mais ouvidas na janela atual.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Produção dos resultados

    Este pipeline escreve os resultados na saída padrão, bem como em ficheiros. Para os ficheiros, divide as gravações em grupos de 10 elementos ou segmentos de 1 minuto.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Veja o pipeline

    1. Na Google Cloud consola, aceda à página Fluxo de dados.

      Aceda ao Dataflow

    2. Clique na tarefa com um nome que comece por song-rank.

    3. Na parte inferior do ecrã, clique em Mostrar para abrir o painel de registos.

    4. Clique em Registos do trabalhador para monitorizar os registos de saída da stream de alterações.

    Escritas de streams

    Use a cbtCLI para escrever um número de audições de músicas para vários utilizadores na tabela song-rank. Esta ação foi concebida para escrever durante alguns minutos para simular as audições de músicas em streaming ao longo do tempo.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Veja a saída

    Leia o resultado no Cloud Storage para ver as músicas mais populares.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Exemplo de saída:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]