Processar um fluxo de alterações do Bigtable

Neste tutorial, mostramos como implantar um pipeline de dados no Dataflow para um fluxo em tempo real das alterações do banco de dados provenientes de um fluxo de alterações da tabela do Bigtable. A saída do pipeline é gravada em uma série de arquivos no Cloud Storage.

Um exemplo de conjunto de dados para um aplicativo de música é fornecido. Neste tutorial, você rastreia as músicas ouvidas e, em seguida, classifica as cinco principais ao longo de um período.

Este tutorial é destinado a usuários técnicos familiarizados com a escrita de código e a implantação de pipelines de dados no Google Cloud.

Objetivos

Este tutorial mostra como fazer o seguinte:

  • Criar uma tabela do Bigtable com um fluxo de alterações ativado
  • Implantar um pipeline no Dataflow que transforma e gera o fluxo de alterações.
  • Conferir resultados do seu pipeline de dados

Custos

Neste documento, você vai usar os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.

Novos usuários do Google Cloud podem estar qualificados para um teste sem custo financeiro.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Para mais informações, consulte Limpeza.

Antes de começar

    Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.

    Instale a CLI do Google Cloud. Após a instalação, inicialize a Google Cloud CLI executando o seguinte comando:

    gcloud init

    Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

    Crie ou selecione um Google Cloud projeto.

    Funções necessárias para selecionar ou criar um projeto

    • Selecionar um projeto: não é necessário um papel específico do IAM para selecionar um projeto. Você pode escolher qualquer projeto em que tenha recebido um papel.
    • Criar um projeto: para criar um projeto, é necessário ter o papel de Criador de projetos (roles/resourcemanager.projectCreator), que contém a permissão resourcemanager.projects.create. Saiba como conceder papéis.
    • Crie um projeto do Google Cloud :

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto Google Cloud que você está criando.

    • Selecione o projeto Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud .

    Verifique se o faturamento está ativado para o projeto do Google Cloud .

    Ative as APIs Dataflow, API Cloud Bigtable, API Cloud Bigtable Admin e Cloud Storage:

    Funções necessárias para ativar APIs

    Para ativar as APIs, é necessário ter o papel do IAM de administrador do Service Usage (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder papéis.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com

    Instale a CLI do Google Cloud. Após a instalação, inicialize a Google Cloud CLI executando o seguinte comando:

    gcloud init

    Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

    Crie ou selecione um Google Cloud projeto.

    Funções necessárias para selecionar ou criar um projeto

    • Selecionar um projeto: não é necessário um papel específico do IAM para selecionar um projeto. Você pode escolher qualquer projeto em que tenha recebido um papel.
    • Criar um projeto: para criar um projeto, é necessário ter o papel de Criador de projetos (roles/resourcemanager.projectCreator), que contém a permissão resourcemanager.projects.create. Saiba como conceder papéis.
    • Crie um projeto do Google Cloud :

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto Google Cloud que você está criando.

    • Selecione o projeto Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud .

    Verifique se o faturamento está ativado para o projeto do Google Cloud .

    Ative as APIs Dataflow, API Cloud Bigtable, API Cloud Bigtable Admin e Cloud Storage:

    Funções necessárias para ativar APIs

    Para ativar as APIs, é necessário ter o papel do IAM de administrador do Service Usage (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder papéis.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  1. Atualize e instale a CLI cbt .
    gcloud components update
    gcloud components install cbt

Prepare o ambiente

Buscar o código

Clone o repositório GitHub que contém o código de exemplo: Se você já fez o download desse repositório, extraia a versão mais recente.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Criar um bucket

  • Crie um bucket do Cloud Storage:
    gcloud storage buckets create gs://BUCKET_NAME
    Substitua BUCKET_NAME por um nome de bucket que atenda aos requisitos de nomenclatura de bucket.
  • Criar uma instância do Bigtable

    É possível usar uma instância deste tutorial ou criar uma instância com as configurações padrão em uma região perto de você.

    Criar uma tabela

    O aplicativo de amostra rastreia as músicas que os usuários ouvem e armazena os eventos de detecção no Bigtable. Crie uma tabela com um fluxo de alterações ativado que tenha um grupo de colunas (cf) e uma coluna (música) e que use IDs de usuário para chaves de linha.

    Crie a tabela.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Substitua:

    • PROJECT_ID: o ID do projeto que você está usando.
    • BIGTABLE_INSTANCE_ID: o ID da instância para conter a nova tabela.

    Iniciar o pipeline

    Esse pipeline transforma o stream de alteração fazendo o seguinte:

    1. Lê o fluxo de alterações
    2. Obtém o nome da música
    3. Agrupa os eventos de música para ouvir em janelas de N segundos
    4. Conta as cinco músicas principais
    5. Gera os resultados

    Executar o pipeline.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Substitua BIGTABLE_REGION pelo ID da região em que sua instância do Bigtable está, como us-east5.

    Entender o pipeline

    Os snippets de código do pipeline a seguir podem ajudar a entender o código que você está executando.

    Como ler o fluxo de alterações

    O código nesta amostra configura o fluxo de origem com os parâmetros para a instância e a tabela específicas do Bigtable.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Como conseguir o nome da música

    Quando uma música é ouvida, o nome dela é gravado no grupo de colunas cf e no qualificador de coluna song. Assim, o código extrai o valor da mutação do fluxo de alterações e o envia para a próxima etapa do pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Como contar as cinco músicas principais

    É possível usar as funções Count e Top.of integradas do Beam para encontrar facilmente as cinco músicas principais na janela atual.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Como gerar os resultados

    Esse pipeline grava os resultados na saída padrão e nos arquivos. Para os arquivos, ele exibe as gravações em grupos de 10 elementos ou segmentos de um minuto.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Ver o pipeline

    1. No Google Cloud console, acesse a página Dataflow.

      Acessar o Dataflow

    2. Clique no job com um nome que comece com song-rank.

    3. Na parte inferior da tela, clique em Mostrar para abrir o painel de registros.

    4. Clique em Registros do worker para monitorar os registros de saída do fluxo de alterações.

    Fazer streaming de gravações

    Use a CLI cbt para gravar várias faixas de música para vários usuários na tabela song-rank. Ela é projetada para gravar durante alguns minutos e simular o streaming de detecções de músicas ao longo do tempo.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Veja o resultado

    Leia a saída no Cloud Storage para ver as músicas mais famosas.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Exemplo de saída:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    Limpar

    Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

    Excluir o projeto

      Excluir um projeto do Google Cloud :

      gcloud projects delete PROJECT_ID

    Excluir recursos individuais

    1. Exclua o bucket e os arquivos.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Desative o fluxo de alterações na tabela.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Exclua a tabela song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Interrompa o pipeline de fluxo de alterações.

      1. Liste os jobs para ver o ID deles.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Cancele o job.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Substitua JOB_ID pelo ID do job exibido após o comando anterior.

    A seguir