Enviar mensajes de Pub/Sub en streaming a través de WebSockets

En este tutorial se muestra cómo puede gestionar una aplicación frontend (en este caso, una página web) grandes volúmenes de datos entrantes cuando se usaGoogle Cloud. En el tutorial se describen algunos de los problemas que plantean las secuencias de gran volumen. En este tutorial se proporciona una aplicación de ejemplo que muestra cómo usar WebSockets para visualizar un flujo denso de mensajes publicados en un tema de Pub/Sub y procesarlos de forma oportuna para mantener un frontend eficiente.

Este tutorial está dirigido a desarrolladores que estén familiarizados con la comunicación de navegador a servidor a través de HTTP y con la escritura de aplicaciones frontend con HTML, CSS y JavaScript. En este tutorial se da por hecho que tienes experiencia conGoogle Cloud y que conoces las herramientas de línea de comandos de Linux.

Introducción

A medida que más aplicaciones adoptan modelos basados en eventos, es importante que las aplicaciones frontend puedan establecer conexiones sencillas y fluidas con los servicios de mensajería que forman la piedra angular de estas arquitecturas.

Hay varias opciones para transmitir datos a clientes de navegadores web. La más habitual es WebSockets. En este tutorial se explica cómo instalar un proceso que se suscribe a un flujo de mensajes que se publican en un tema de Pub/Sub y cómo enrutar esos mensajes a través del servidor web hacia los clientes conectados mediante WebSockets.

En este tutorial, trabajarás con el tema de Pub/Sub disponible públicamente que se usa en el codelab de Google Dataflow de NYC Taxi Tycoon. En este tema se proporciona un flujo en tiempo real de telemetría de taxis simulada basada en datos históricos de viajes realizados en Nueva York procedentes de los conjuntos de datos de registros de viajes de la Comisión de Taxis y Limusinas.

Arquitectura

En el siguiente diagrama se muestra la arquitectura del tutorial que vas a crear.

Arquitectura del tutorial

En el diagrama se muestra un editor de mensajes que está fuera del proyecto que contiene el recurso de Compute Engine. El editor envía mensajes a un tema de Pub/Sub. La instancia de Compute Engine pone los mensajes a disposición a través de WebSockets en un navegador que ejecuta un panel de control basado en HTML5 y JavaScript.

En este tutorial se usa una combinación de herramientas para conectar Pub/Sub y WebSockets:

  • pulltop es un programa de Node.js que instalarás como parte de este tutorial. La herramienta se suscribe a un tema de Pub/Sub y transmite los mensajes recibidos a la salida estándar.
  • websocketd es una pequeña herramienta de línea de comandos que encapsula un programa de interfaz de línea de comandos y permite acceder a él mediante un WebSocket.

Si combinas pulltop y websocketd, puedes hacer que los mensajes que se reciban del tema de Pub/Sub se envíen a un navegador mediante WebSockets.

Ajustar el rendimiento de un tema de Pub/Sub

El tema público de Pub/Sub de NYC Taxi Tycoon genera entre 2000 y 2500 actualizaciones de viajes en taxi simulados por segundo, lo que supone hasta 8 Mb o más por segundo. El control de flujo integrado de Pub/Sub reduce automáticamente la velocidad de los mensajes de un suscriptor si Pub/Sub detecta una cola creciente de mensajes no confirmados. Por lo tanto, es posible que observes una gran variabilidad en la frecuencia de mensajes en diferentes estaciones de trabajo, conexiones de red y código de procesamiento del frontend.

Procesamiento eficaz de mensajes del navegador

Dado el gran volumen de mensajes que llegan a través del flujo de WebSocket, debes tener cuidado al escribir el código frontend que procesa este flujo. Por ejemplo, puedes crear elementos HTML de forma dinámica para cada mensaje. Sin embargo, con la tasa de mensajes prevista, actualizar la página para cada mensaje podría bloquear la ventana del navegador. Las asignaciones de memoria frecuentes que se producen al crear elementos HTML de forma dinámica también prolongan la duración de la recolección de elementos no utilizados, lo que empeora la experiencia del usuario. En resumen, no quieres llamar a document.createElement() por cada uno de los aproximadamente 2000 mensajes que llegan cada segundo.

En este tutorial, se gestiona este flujo denso de mensajes de la siguiente manera:

  • Calcula y actualiza continuamente un conjunto de métricas de streaming en tiempo real, mostrando la mayoría de la información sobre los mensajes observados como valores agregados.
  • Usa un panel de control basado en navegador para visualizar una pequeña muestra de mensajes individuales según una programación predefinida, que solo muestre los eventos de entrega y recogida en tiempo real.

En la siguiente imagen se muestra el panel de control que se crea en este tutorial.

Panel de control creado en la página web con el código de este tutorial

La figura muestra una latencia del último mensaje de 24 milisegundos a una velocidad de casi 2100 mensajes por segundo. Si las rutas de código críticas para procesar cada mensaje no se completan a tiempo, el número de mensajes observados por segundo disminuirá a medida que aumente la latencia del último mensaje. El muestreo de viajes se realiza mediante la API de JavaScript setInterval, que se ha configurado para que se ejecute una vez cada tres segundos. De esta forma, se evita que el frontend cree un número enorme de elementos DOM durante su ciclo de vida. La inmensa mayoría de ellas son prácticamente imperceptibles a velocidades superiores a 10 por segundo.

El panel de control empieza a procesar eventos en mitad del flujo, por lo que los viajes que ya estén en curso se reconocerán como nuevos en el panel de control, a menos que se hayan visto antes. El código usa un array asociativo para almacenar cada viaje observado, indexado por el valor ride_id, y elimina la referencia a un viaje concreto cuando se ha dejado al pasajero. Los viajes en estado "en ruta" o "recogida" añaden una referencia a ese array, a menos que (en el caso de "en ruta") el viaje se haya observado anteriormente.

Instalar y configurar el servidor WebSocket

Para empezar, crea una instancia de Compute Engine que usarás como servidor WebSocket. Una vez que hayas creado la instancia, instala en ella las herramientas que necesites más adelante.

  1. En Cloud Shell, define la zona predeterminada de Compute Engine. En el siguiente ejemplo se muestra us-central1-a, pero puedes usar la zona que quieras.

    gcloud config set compute/zone us-central1-a
    
  2. Crea una instancia de Compute Engine llamada websocket-server en la zona predeterminada:

    gcloud compute instances create websocket-server --tags wss
    
  3. Añade una regla de cortafuegos que permita el tráfico TCP en el puerto 8000 a cualquier instancia etiquetada como wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Si usas un proyecto, asegúrate de que el puerto TCP 22 esté abierto para permitir la conectividad SSH a la instancia.

    De forma predeterminada, la regla de cortafuegos default-allow-ssh está habilitada en la red predeterminada. Sin embargo, si tú o tu administrador habéis quitado la regla predeterminada de un proyecto, es posible que el puerto TCP 22 no esté abierto. Si has creado un proyecto para este tutorial, la regla estará habilitada de forma predeterminada y no tendrás que hacer nada.

    Añade una regla de cortafuegos que permita el tráfico TCP en el puerto 22 a cualquier instancia etiquetada como wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Conéctate a la instancia mediante SSH:

    gcloud compute ssh websocket-server
    
  6. En el comando de terminal de la instancia, cambia a la cuenta root para poder instalar software:

    sudo -s
    
  7. Instala las herramientas git y unzip:

    apt-get install -y unzip git
    
  8. Instala el archivo binario de websocketd en la instancia:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Instalar Node.js y el código del tutorial

  1. En un terminal de la instancia, instala Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Descarga el repositorio de origen del tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Cambia los permisos de pulltop para permitir la ejecución:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Instala las dependencias de pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Comprobar que pulltop puede leer mensajes

  1. En la instancia, ejecuta pulltop en el tema público:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Si pulltop funciona, verás un flujo de resultados como el siguiente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Pulsa Ctrl+C para detener la emisión.

Establecer el flujo de mensajes a websocketd

Ahora que has confirmado que pulltop puede leer el tema de Pub/Sub, puedes iniciar el proceso de websocketd para empezar a enviar mensajes al navegador.

Capturar mensajes de temas en un archivo local

En este tutorial, capturarás el flujo de mensajes que obtienes de pulltop y lo escribirás en un archivo local. Capturar el tráfico de mensajes en un archivo local añade un requisito de almacenamiento, pero también desacopla el funcionamiento del proceso websocketd de los mensajes de temas de Pub/Sub en streaming. Capturar la información de forma local permite que se den situaciones en las que quieras detener temporalmente el streaming de Pub/Sub (quizá para ajustar los parámetros de control de flujo), pero sin forzar el restablecimiento de los clientes WebSocket conectados. Cuando se restablece el flujo de mensajes, websocketd reanuda automáticamente el streaming de mensajes a los clientes.

  1. En la instancia, ejecuta pulltop en el tema público y redirige el resultado del mensaje al archivo local taxi.json. El comando nohup indica al sistema operativo que mantenga el proceso pulltop en ejecución si cierras sesión o cierras la terminal.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Comprueba que los mensajes JSON se escriben en el archivo:

    tail /var/tmp/taxi.json
    

    Si los mensajes se escriben en el archivo taxi.json, el resultado será similar al siguiente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Cambia a la carpeta web de tu aplicación:

    cd ../web
    
  4. Empieza con websocketd para iniciar la emisión del contenido del archivo local mediante WebSockets:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    De esta forma, se ejecuta el comando websocketd en segundo plano. La herramienta websocketd consume la salida del comando tail y transmite cada elemento como un mensaje de WebSocket.

  5. Comprueba el contenido de nohup.out para verificar que el servidor se ha iniciado correctamente:

    tail nohup.out
    

    Si todo funciona correctamente, el resultado será similar al siguiente:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Visualizar mensajes

Los mensajes de viajes individuales publicados en el tema de Pub/Sub tienen una estructura como esta:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

A partir de estos valores, se calculan varias métricas para el encabezado del panel de control. Los cálculos se ejecutan una vez por cada evento de viaje entrante. Entre los valores se incluyen los siguientes:

  • Latencia del último mensaje. Número de segundos transcurridos entre la marca de tiempo del último evento de viaje observado y la hora actual (derivada del reloj del sistema que aloja el navegador web).
  • Viajes activos. El número de viajes que están en curso. Este número puede aumentar rápidamente y disminuye cuando se observa un valor ride_status de dropoff.
  • Tarifa de mensajes. El número medio de eventos de viaje procesados por segundo.
  • Importe total medido. La suma de los contadores de todos los viajes activos. Este número disminuye a medida que se dejan a los pasajeros.
  • Número total de pasajeros. El número de pasajeros en todos los viajes. Este número disminuye a medida que se completan los viajes.
  • Número medio de pasajeros por viaje. El número total de viajes dividido entre el número total de pasajeros.
  • Importe medio de la tarifa por pasajero. El importe total registrado dividido entre el número total de pasajeros.

Además de las métricas y las muestras de viajes individuales, cuando se recoge o se deja a un pasajero, el panel de control muestra una notificación de alerta encima de la cuadrícula de muestras de viajes.

  1. Obtén la dirección IP externa de la instancia actual:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Copia la dirección IP.

  3. En tu máquina local, abre un navegador web e introduce la URL:

    http://$ip-address:8000.

    Verá una página con el panel de control de este tutorial:

    Panel de control creado mediante el código de este tutorial, con un mensaje de bienvenida y antes de que se muestren los datos.

  4. Haz clic en el icono del taxi de la parte superior para abrir una conexión con el flujo y empezar a procesar mensajes.

    Los viajes individuales se visualizan con una muestra de nueve viajes activos que se renderizan cada tres segundos:

    Panel de control que muestra los viajes activos.

    Puedes hacer clic en el icono del taxi en cualquier momento para iniciar o detener el flujo de WebSocket. Si se interrumpe la conexión WebSocket, el icono se pondrá rojo y se detendrán las actualizaciones de las métricas y los viajes individuales. Para volver a conectarte, haz clic de nuevo en el icono del taxi.

Rendimiento

En la siguiente captura de pantalla se muestra el monitor de rendimiento de las herramientas para desarrolladores de Chrome mientras la pestaña del navegador procesa alrededor de 2100 mensajes por segundo.

Panel del monitor de rendimiento del navegador que muestra el uso de la CPU, el tamaño del montículo, los nodos del DOM y los recálculos de estilo por segundo. Los valores son relativamente estables.

El envío de mensajes se produce con una latencia de aproximadamente 30 ms, y la utilización de la CPU es de alrededor del 80%. El uso de memoria se muestra con un mínimo de 29 MB, con un total de 57 MB asignados, que aumentan y disminuyen libremente.