Streaming de mensagens Pub/Sub através de WebSockets

Este tutorial ilustra uma forma de uma app de front-end, neste caso, uma página Web, processar grandes volumes de dados recebidos quando usa oGoogle Cloud. O tutorial descreve alguns dos desafios das streams de volume elevado. Este tutorial inclui uma app de exemplo que ilustra como usar WebSockets para visualizar um fluxo denso de mensagens publicadas num tópico do Pub/Sub, processando-as de forma oportuna que mantenha um front-end com bom desempenho.

Este tutorial destina-se a programadores familiarizados com a comunicação do navegador para o servidor através de HTTP e com a escrita de apps de front-end com HTML, CSS e JavaScript. Este tutorial pressupõe que tem alguma experiência com o Google Cloud e que está familiarizado com as ferramentas de linha de comandos do Linux.

Introdução

À medida que mais apps adotam modelos orientados por eventos, é importante que as apps de front-end possam estabelecer ligações simples e sem problemas aos serviços de mensagens que formam a base destas arquiteturas.

Existem várias opções para fazer streaming de dados para clientes de navegador de Internet. A mais comum é a WebSockets. Este tutorial explica como instalar um processo que subscreve um fluxo de mensagens publicadas num tópico do Pub/Sub e encaminha essas mensagens através do servidor Web para os clientes ligados através de WebSockets.

Para este tutorial, vai trabalhar com o tópico do Pub/Sub disponível publicamente usado no NYC Taxi Tycoon Google Dataflow CodeLab. Este tópico oferece-lhe um fluxo em tempo real de telemetria de táxis simulada com base em dados do histórico de viagens recolhidos na cidade de Nova Iorque a partir dos conjuntos de dados do registo de viagens da Taxi & Limousine Commission.

Arquitetura

O diagrama seguinte mostra a arquitetura do tutorial que cria neste tutorial.

Arquitetura do tutorial

O diagrama mostra um publicador de mensagens que está fora do projeto que contém o recurso do Compute Engine. O publicador envia mensagens para um tópico do Pub/Sub. A instância do Compute Engine disponibiliza as mensagens através de WebSockets a um navegador que executa um painel de controlo baseado em HTML5 e JavaScript.

Este tutorial usa uma combinação de ferramentas para fazer a ponte entre o Pub/Sub e os WebSockets:

  • pulltop é um programa Node.js que instala como parte deste tutorial. A ferramenta subscreve um tópico Pub/Sub e transmite as mensagens recebidas para a saída padrão.
  • websocketd é uma pequena ferramenta de linha de comandos que envolve um programa de interface de linha de comandos existente e permite o acesso através de um WebSocket.

Ao combinar pulltop e websocketd, pode ter mensagens recebidas do tópico do Pub/Sub transmitidas para um navegador através de WebSockets.

Ajustar o débito do tópico Pub/Sub

O tópico público do Pub/Sub do NYC Taxi Tycoon gera 2000 a 2500 atualizações de viagens de táxi simuladas por segundo, até 8 MB ou mais por segundo. O controlo de fluxo integrado no Pub/Sub abranda automaticamente a taxa de mensagens de um subscritor se o Pub/Sub detetar uma fila crescente de mensagens não reconhecidas. Por conseguinte, pode observar uma elevada variabilidade da taxa de mensagens em diferentes estações de trabalho, ligações de rede e código de processamento de front-end.

Processamento eficaz de mensagens do navegador

Dado o elevado volume de mensagens provenientes do fluxo WebSocket, tem de ter cuidado ao escrever o código de front-end que processa este fluxo. Por exemplo, pode criar dinamicamente elementos HTML para cada mensagem. No entanto, à taxa de mensagens esperada, a atualização da página para cada mensagem pode bloquear a janela do navegador. As atribuições de memória frequentes resultantes da criação dinâmica de elementos HTML também prolongam as durações da recolha de lixo, degradando a experiência do utilizador. Em resumo, não quer chamar document.createElement() para cada uma das aproximadamente 2000 mensagens que chegam a cada segundo.

A abordagem adotada por este tutorial para gerir esta stream densa de mensagens é a seguinte:

  • Calcular e atualizar continuamente um conjunto de métricas de fluxo em tempo real, apresentando a maioria das informações sobre as mensagens observadas como valores agregados.
  • Use um painel de controlo baseado no navegador para visualizar uma pequena amostra de mensagens individuais num horário predefinido, mostrando apenas eventos de entrega e recolha em tempo real.

A figura seguinte mostra o painel de controlo criado como parte deste tutorial.

Painel de controlo criado na página Web pelo código neste tutorial

A figura representa uma latência da última mensagem de 24 milissegundos a uma taxa de quase 2100 mensagens por segundo. Se os caminhos de código críticos para o processamento de cada mensagem individual não forem concluídos a tempo, o número de mensagens observadas por segundo diminui à medida que a latência da última mensagem aumenta. A amostragem de viagens é feita através da API setInterval JavaScript definida para ser executada uma vez a cada três segundos, o que impede que o front-end crie um número enorme de elementos DOM ao longo da sua duração. (A grande maioria dessas alterações é praticamente impercetível a taxas superiores a 10 por segundo.)

O painel de controlo começa a processar eventos a meio da stream, pelo que as viagens já em curso são reconhecidas como novas pelo painel de controlo, a menos que tenham sido vistas anteriormente. O código usa uma matriz associativa para armazenar cada viagem observada, indexada pelo valor ride_id, e remove a referência a uma viagem específica quando o passageiro é deixado no destino. As viagens num estado "em trajeto" ou "recolha" adicionam uma referência a essa matriz, a menos que (no caso de "em trajeto") a viagem tenha sido observada anteriormente.

Instale e configure o servidor WebSocket

Para começar, crie uma instância do Compute Engine que vai usar como servidor WebSocket. Depois de criar a instância, instale nela as ferramentas de que precisa mais tarde.

  1. No Cloud Shell, defina a zona predefinida do Compute Engine. O exemplo seguinte mostra us-central1-a, mas pode usar qualquer zona que quiser.

    gcloud config set compute/zone us-central1-a
    
  2. Crie uma instância do Compute Engine com o nome websocket-server na zona predefinida:

    gcloud compute instances create websocket-server --tags wss
    
  3. Adicione uma regra de firewall que permita o tráfego TCP na porta 8000 a qualquer instância etiquetada como wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Se estiver a usar um projeto existente, certifique-se de que a porta TCP 22 está aberta para permitir a conetividade SSH à instância.

    Por predefinição, a regra de firewall default-allow-ssh está ativada na rede predefinida. No entanto, se tiver removido a regra predefinida num projeto existente, ou se o administrador o tiver feito, a porta TCP 22 pode não estar aberta. (Se criou um novo projeto para este tutorial, a regra está ativada por predefinição e não tem de fazer nada.)

    Adicione uma regra de firewall que permita o tráfego TCP na porta 22 a qualquer instância etiquetada como wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Associe-se à instância através de SSH:

    gcloud compute ssh websocket-server
    
  6. No comando de terminal da instância, mude de conta para root para poder instalar software:

    sudo -s
    
  7. Instale as ferramentas git e unzip:

    apt-get install -y unzip git
    
  8. Instale o ficheiro binário websocketd na instância:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Instale o Node.js e o código do tutorial

  1. Num terminal na instância, instale o Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Transfira o repositório de origem do tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Altere as autorizações em pulltop para permitir a execução:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Instale as dependências do pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Teste se o pulltop consegue ler mensagens

  1. Na instância, execute pulltop no tópico público:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Se pulltop estiver a funcionar, vê um fluxo de resultados semelhante ao seguinte:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Prima Ctrl+C para parar a stream.

Estabeleça o fluxo de mensagens para o websocketd

Agora que estabeleceu que pulltop pode ler o tópico do Pub/Sub, pode iniciar o processo websocketd para começar a enviar mensagens para o navegador.

Capture mensagens de tópicos num ficheiro local

Para este tutorial, captura o fluxo de mensagens que recebe de pulltop e escreve-o num ficheiro local. A captura do tráfego de mensagens para um ficheiro local adiciona um requisito de armazenamento, mas também desvincula o funcionamento do processo das mensagens de tópicos Pub/Sub de streaming.websocketd A captura das informações localmente permite cenários em que pode querer interromper temporariamente o streaming do Pub/Sub (talvez para ajustar os parâmetros de controlo de fluxo), mas não forçar uma reposição dos clientes WebSocket atualmente ligados. Quando o fluxo de mensagens é restabelecido, o websocketd retoma automaticamente o streaming de mensagens para os clientes.

  1. Na instância, execute pulltop no tópico público e redirecione a saída de mensagens para o ficheiro taxi.json local. O comando nohup indica ao SO que mantenha o processo pulltop em execução se terminar sessão ou fechar o terminal.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Verifique se as mensagens JSON estão a ser escritas no ficheiro:

    tail /var/tmp/taxi.json
    

    Se as mensagens estiverem a ser escritas no ficheiro taxi.json, a saída é semelhante à seguinte:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Altere para a pasta Web da sua app:

    cd ../web
    
  4. Inicie websocketd para começar a fazer streaming do conteúdo do ficheiro local através de WebSockets:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Isto executa o comando websocketd em segundo plano. A ferramenta websocketd usa o resultado do comando tail e transmite cada elemento como uma mensagem WebSocket.

  5. Verifique o conteúdo de nohup.out para confirmar se o servidor foi iniciado corretamente:

    tail nohup.out
    

    Se tudo estiver a funcionar corretamente, o resultado é semelhante ao seguinte:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Visualizar mensagens

As mensagens de viagens individuais publicadas no tópico Pub/Sub têm uma estrutura semelhante à seguinte:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

Com base nestes valores, calcula várias métricas para o cabeçalho do painel de controlo. Os cálculos são executados uma vez por evento de viagem de entrada. Os valores incluem o seguinte:

  • Latência da última mensagem. O número de segundos entre a data/hora do evento da última viagem observada e a hora atual (derivada do relógio no sistema que aloja o navegador de Internet).
  • Viagens ativas. O número de viagens atualmente em curso. Este número pode aumentar rapidamente e diminui quando é observado um valor de ride_status de dropoff.
  • Taxa de mensagens. O número médio de eventos de viagens processados por segundo.
  • Valor total medido. A soma dos contadores de todas as viagens ativas. Este número diminui à medida que os passageiros são deixados no destino.
  • Número total de passageiros. O número de passageiros em todas as viagens. Este número diminui à medida que as viagens são concluídas.
  • Número médio de passageiros por viagem. O número total de viagens, dividido pelo número total de passageiros.
  • Valor medido médio por passageiro. O valor total do taxímetro dividido pelo número total de passageiros.

Além das métricas e dos exemplos de viagens individuais, quando um passageiro é recolhido ou deixado, o painel de controlo mostra uma notificação de alerta acima da grelha de exemplos de viagens.

  1. Obtenha o endereço IP externo da instância atual:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Copie o endereço IP.

  3. Na sua máquina local, abra um novo navegador de Internet e introduza o URL:

    http://$ip-address:8000.

    É apresentada uma página que mostra o painel de controlo deste tutorial:

    Painel de controlo criado por código neste tutorial, com mensagem de boas-vindas e antes de serem apresentados dados.

  4. Clique no ícone de táxi na parte superior para abrir uma ligação à stream e começar a processar mensagens.

    As viagens individuais são visualizadas com uma amostra de nove viagens ativas renderizadas a cada três segundos:

    Painel de controlo que mostra viagens ativas.

    Pode clicar no ícone de táxi em qualquer altura para iniciar ou parar a stream WebSocket. Se a ligação WebSocket for interrompida, o ícone fica vermelho e as atualizações das métricas e das viagens individuais são interrompidas. Para voltar a estabelecer ligação, clique novamente no ícone de táxi.

Desempenho

A captura de ecrã seguinte mostra o monitor de desempenho das Ferramentas para programadores do Chrome enquanto o separador do navegador está a processar cerca de 2100 mensagens por segundo.

Painel de monitorização do desempenho do navegador que mostra a utilização da CPU, o tamanho da memória dinâmica, os nós do DOM e os recálculos de estilo por segundo. Os valores são relativamente estáveis.

Com o envio de mensagens a ocorrer com uma latência de aproximadamente 30 ms, a utilização da CPU é, em média, de cerca de 80%. A utilização de memória é apresentada com um mínimo de 29 MB, com um total de 57 MB alocados, e aumenta e diminui livremente.