Streaming di messaggi Pub/Sub tramite WebSocket

Questo tutorial illustra un modo in cui un'app frontend, in questo caso una pagina web , può gestire volumi elevati di dati in entrata quando utilizzi Google Cloud. Il tutorial descrive alcune delle sfide degli stream ad alto volume. Con questo tutorial viene fornita un'app di esempio che illustra come utilizzare WebSockets per visualizzare un flusso denso di messaggi pubblicati in un argomento Pub/Sub, elaborandoli in modo tempestivo e mantenendo un frontend efficiente.

Questo tutorial è rivolto agli sviluppatori che hanno familiarità con la comunicazione browser-server tramite HTTP e con la scrittura di app frontend utilizzando HTML, CSS e JavaScript. Il tutorial presuppone che tu abbia una certa esperienza con Google Cloud e che tu abbia familiarità con gli strumenti della riga di comando Linux.

Obiettivi

  • Crea e configura un'istanza di macchina virtuale (VM) con i componenti necessari per eseguire lo streaming dei payload di un abbonamento Pub/Sub ai client browser.
  • Configura un processo sulla VM per abbonarti a un argomento Pub/Sub e inviare i singoli messaggi a un log.
  • Installa un server web per gestire contenuti statici ed eseguire lo streaming dell'output dei comandi della shell ai client WebSocket.
  • Visualizza le aggregazioni dello stream WebSocket e i singoli esempi di messaggi in un browser utilizzando HTML, CSS e JavaScript.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi Google Cloud utenti potrebbero avere diritto a una prova senza costi.

Prima di iniziare

  1. Accedi al tuo Google Cloud account. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  5. Verify that billing is enabled for your Google Cloud project.

  6. Apri Cloud Shell per eseguire i comandi elencati in questo tutorial.

    VAI A Cloud Shell

    Esegui tutti i comandi del terminale in questo tutorial da Cloud Shell.

  7. Abilita l'API Compute Engine e l'API Pub/Sub:
    gcloud services enable compute pubsub

Al termine di questo tutorial, puoi evitare l'addebito di ulteriori costi eliminando le risorse create. Per maggiori dettagli, vedi Esegui la pulizia.

Introduzione

Man mano che sempre più app adottano modelli basati su eventi, è importante che le app frontend siano in grado di stabilire connessioni semplici e senza problemi ai servizi di messaggistica che costituiscono la base di queste architetture.

Esistono diverse opzioni per lo streaming di dati ai client dei browser web; la più comune è WebSocket. Questo tutorial ti guida nell'installazione di un processo che si abbona a uno stream di messaggi pubblicati in un argomento Pub/Sub e li indirizza tramite il server web ai client connessi tramite WebSocket.

Per questo tutorial, utilizzerai l'argomento Pub/Sub disponibile pubblicamente utilizzato nel codelab NYC Taxi Tycoon Google Dataflow. Questo argomento ti fornisce uno stream in tempo reale di telemetria dei taxi simulata basata sui dati storici delle corse effettuate a New York dai set di dati dei record di viaggio della Taxi & Limousine Commission's trip record datasets.

Architettura

Il seguente diagramma mostra l'architettura del tutorial che creerai in questo tutorial.

Architettura del tutorial

Il diagramma mostra un publisher di messaggi che si trova all'esterno del progetto che contiene la risorsa Compute Engine; il publisher invia messaggi a un argomento Pub/Sub. L'istanza Compute Engine rende disponibili i messaggi tramite WebSocket a un browser che esegue una dashboard basata su HTML5 e JavaScript.

Questo tutorial utilizza una combinazione di strumenti per collegare Pub/Sub e WebSocket:

  • pulltop è un programma Node.js che installi nell'ambito di questo tutorial. Lo strumento si abbona a un argomento Pub/Sub ed esegue lo streaming dei messaggi ricevuti all'output standard.
  • websocketd è un piccolo strumento a riga di comando che esegue il wrapping di un programma di interfaccia a riga di comando esistente e ne consente l'accesso tramite WebSocket.

Combinando pulltop e websocketd, puoi eseguire lo streaming dei messaggi ricevuti dall'argomento Pub/Sub a un browser utilizzando WebSocket.

Regolazione della velocità effettiva dell'argomento Pub/Sub

L'argomento Pub/Sub pubblico NYC Taxi Tycoon genera da 2000 a 2500 aggiornamenti delle corse in taxi simulate al secondo, fino a 8 MB o più al secondo. Il controllo del flusso integrato in Pub/Sub rallenta automaticamente la frequenza dei messaggi di un abbonato se Pub/Sub rileva una coda crescente di messaggi non confermati. Di conseguenza, potresti notare una variabilità elevata della frequenza dei messaggi tra diverse workstation, connessioni di rete e codice di elaborazione frontend.

Elaborazione efficace dei messaggi del browser

Dato l'elevato volume di messaggi provenienti dallo stream WebSocket, devi prestare attenzione alla scrittura del codice frontend che elabora questo stream. Ad esempio, puoi creare dinamicamente elementi HTML per ogni messaggio. Tuttavia, alla frequenza dei messaggi prevista, l'aggiornamento della pagina per ogni messaggio potrebbe bloccare la finestra del browser. Le allocazioni di memoria frequenti derivanti dalla creazione dinamica di elementi HTML estendono anche le durate della garbage collection, peggiorando l'esperienza utente. In breve, non vuoi chiamare document.createElement() per ognuno dei circa 2000 messaggi che arrivano ogni secondo.

L'approccio adottato da questo tutorial per la gestione di questo flusso denso di messaggi è il seguente:

  • Calcola e aggiorna continuamente un insieme di metriche dello stream in tempo reale, visualizzando la maggior parte delle informazioni sui messaggi osservati come valori aggregati.
  • Utilizza una dashboard basata su browser per visualizzare un piccolo campione di singoli messaggi in base a una pianificazione predefinita, mostrando solo gli eventi di rilascio e ritiro in tempo reale.

La figura seguente mostra la dashboard creata nell'ambito di questo tutorial.

Dashboard creata nella pagina web dal codice di questo tutorial

La figura mostra una latenza dell'ultimo messaggio di 24 millisecondi a una frequenza di quasi 2100 messaggi al secondo. Se i percorsi di codice critici per l'elaborazione di ogni singolo messaggio non vengono completati in tempo, il numero di messaggi osservati al secondo diminuisce man mano che aumenta la latenza dell'ultimo messaggio. Il campionamento delle corse viene eseguito utilizzando l'API JavaScript setInterval impostata per eseguire il ciclo una volta ogni tre secondi, il che impedisce al frontend di creare un numero enorme di elementi DOM durante la sua durata. (La stragrande maggioranza di questi è praticamente non osservabile a frequenze superiori a 10 al secondo.)

La dashboard inizia a elaborare gli eventi nel mezzo dello stream, quindi le corse già in corso vengono riconosciute come nuove dalla dashboard, a meno che non siano state viste in precedenza. Il codice utilizza un array associativo per archiviare ogni corsa osservata, indicizzata dal valore ride_id, e rimuove il riferimento a una corsa specifica quando il passeggero è stato fatto scendere. Le corse in stato "enroute" o "partenza" aggiungono un riferimento a questo array, a meno che (nel caso di "enroute") la corsa non sia stata osservata in precedenza.

Installazione e configurazione del server WebSocket

Per iniziare, crea un'istanza Compute Engine che utilizzerai come server WebSocket. Dopo aver creato l'istanza, installa gli strumenti necessari in un secondo momento.

  1. In Cloud Shell, imposta la zona Compute Engine predefinita. L'esempio seguente mostra us-central1-a, ma puoi utilizzare qualsiasi zona.

    gcloud config set compute/zone us-central1-a
    
  2. Crea un'istanza Compute Engine denominata websocket-server nella zona predefinita:

    gcloud compute instances create websocket-server --tags wss
    
  3. Aggiungi una regola firewall che consenta il traffico TCP sulla porta 8000 a qualsiasi istanza con il tag wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Se utilizzi un progetto esistente, assicurati che la porta TCP 22 sia aperta per consentire la connettività SSH all'istanza.

    Per impostazione predefinita, la regola firewall default-allow-ssh è abilitata nella rete predefinita. Tuttavia, se tu o il tuo amministratore avete rimosso la regola predefinita in un progetto esistente, la porta TCP 22 potrebbe non essere aperta. (Se hai creato un nuovo progetto per questo tutorial, la regola è abilitata per impostazione predefinita e non devi fare nulla.)

    Aggiungi una regola firewall che consenta il traffico TCP sulla porta 22 a qualsiasi istanza con il tag wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Connettiti all'istanza tramite SSH:

    gcloud compute ssh websocket-server
    
  6. Al comando del terminale dell'istanza, passa all'account root per poter installare il software:

    sudo -s
    
  7. Installa gli strumenti git e unzip:

    apt-get install -y unzip git
    
  8. Installa il file binario websocketd sull'istanza:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Installazione di Node.js e del codice del tutorial

  1. In un terminale dell'istanza, installa Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Scarica il repository di origine del tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Modifica le autorizzazioni su pulltop per consentirne l'esecuzione:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Installa le dipendenze di pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Verifica che pulltop possa leggere i messaggi

  1. Nell'istanza, esegui pulltop sull'argomento pubblico:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Se pulltop funziona, vedrai uno stream di risultati simile al seguente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Premi Ctrl+C per interrompere lo stream.

Stabilisci il flusso di messaggi in websocketd

Ora che hai verificato che pulltop può leggere l'argomento Pub/Sub, puoi avviare il processo websocketd per iniziare a inviare messaggi al browser.

Acquisisci i messaggi dell'argomento in un file locale

Per questo tutorial, acquisisci lo stream di messaggi che ricevi da pulltop e lo scrivi in un file locale. L'acquisizione del traffico di messaggi in un file locale aggiunge un requisito di archiviazione, ma disaccoppia anche l'operazione del processo websocketd dai messaggi dell'argomento Pub/Sub in streaming. L'acquisizione delle informazioni in locale consente scenari in cui potresti voler interrompere temporaneamente lo streaming Pub/Sub (magari per regolare i parametri di controllo del flusso), ma non forzare la reimpostazione dei client WebSocket attualmente connessi. Quando lo stream di messaggi viene ristabilito, websocketd riprende automaticamente lo streaming dei messaggi ai client.

  1. Nell'istanza, esegui pulltop sull'argomento pubblico e reindirizza l'output dei messaggi al file locale taxi.json. Il comando nohup indica al sistema operativo di mantenere in esecuzione il processo pulltop se esci o chiudi il terminale.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Verifica che i messaggi JSON vengano scritti nel file:

    tail /var/tmp/taxi.json
    

    Se i messaggi vengono scritti nel file taxi.json, l'output è simile al seguente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Passa alla cartella web dell'app:

    cd ../web
    
  4. Avvia websocketd per iniziare lo streaming dei contenuti del file locale utilizzando WebSocket:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Questo comando esegue il comando websocketd in background. Lo strumento websocketd utilizza l'output del comando tail ed esegue lo streaming di ogni elemento come messaggio WebSocket.

  5. Controlla i contenuti di nohup.out per verificare che il server sia stato avviato correttamente:

    tail nohup.out
    

    Se tutto funziona correttamente, l'output è simile al seguente:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Visualizzazione dei messaggi

I singoli messaggi di corsa pubblicati nell'argomento Pub/Sub hanno una struttura simile alla seguente:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

In base a questi valori, calcoli diverse metriche per l'intestazione della dashboard. I calcoli vengono eseguiti una volta per ogni evento di corsa in entrata. I valori includono:

  • Latenza dell'ultimo messaggio. Il numero di secondi tra il timestamp dell'evento dell'ultima corsa osservata e l'ora corrente (derivata dall'orologio del sistema che ospita il browser web).
  • Corse attive. Il numero di corse attualmente in corso. Questo numero può aumentare rapidamente e diminuisce quando viene osservato un valore ride_status di dropoff.
  • Frequenza dei messaggi. Il numero medio di eventi di corsa elaborati al secondo.
  • Importo totale misurato. La somma dei contatori di tutte le corse attive. Questo numero diminuisce man mano che le corse vengono rilasciate.
  • Numero totale di passeggeri. Il numero di passeggeri in tutte le corse. Questo numero diminuisce man mano che le corse vengono completate.
  • Numero medio di passeggeri per corsa. Il numero totale di corse, diviso per il numero totale di passeggeri.
  • Importo medio misurato per passeggero. L'importo totale misurato diviso per il numero totale di passeggeri.

Oltre alle metriche e ai singoli esempi di corse, quando un passeggero viene prelevato o rilasciato, la dashboard mostra una notifica di avviso sopra la griglia degli esempi di corse.

  1. Ottieni l'indirizzo IP esterno dell'istanza corrente:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Copia l'indirizzo IP.

  3. Sul computer locale, apri un nuovo browser web e inserisci l'URL:

    http://$ip-address:8000.

    Viene visualizzata una pagina che mostra la dashboard per questo tutorial:

    Dashboard creata dal codice di questo tutorial, con messaggio di benvenuto e prima della visualizzazione dei dati.

  4. Fai clic sull'icona del taxi in alto per aprire una connessione allo stream e iniziare a elaborare i messaggi.

    Le singole corse vengono visualizzate con un campione di nove corse attive sottoposte a rendering ogni tre secondi:

    Dashboard che mostra le corse attive.

    Puoi fare clic sull'icona del taxi in qualsiasi momento per avviare o interrompere lo stream WebSocket. Se la connessione WebSocket viene interrotta, l'icona diventa rossa e gli aggiornamenti delle metriche e delle singole corse vengono interrotti. Per riconnetterti, fai di nuovo clic sull'icona del taxi.

Prestazioni

Lo screenshot seguente mostra il monitor delle prestazioni degli Strumenti per sviluppatori di Chrome mentre la scheda del browser elabora circa 2100 messaggi al secondo.

Il riquadro del monitor delle prestazioni del browser mostra l'utilizzo della CPU, le dimensioni dell'heap, i nodi DOM e i ricalcoli dello stile al secondo. I valori sono relativamente piatti.

Con l'invio dei messaggi con una latenza di circa 30 ms, l'utilizzo della CPU è in media intorno all'80%. L'utilizzo della memoria è indicato a un minimo di 29 MB, con un totale di 57 MB allocati, in crescita e riduzione libera.

Libera spazio

Rimuovi le regole firewall

Se hai utilizzato un progetto esistente per questo tutorial, puoi rimuovere le regole firewall che hai creato. È una buona pratica ridurre al minimo le porte aperte.

  1. Elimina la regola firewall che hai creato per consentire il traffico TCP sulla porta 8000:

    gcloud compute firewall-rules delete websocket
    
  2. Se hai creato anche una regola firewall per consentire la connettività SSH, elimina la regola firewall per consentire il traffico TCP sulla porta 22:

    gcloud compute firewall-rules delete wss-ssh
    

Elimina il progetto

Se non vuoi utilizzare di nuovo questo progetto, puoi eliminarlo.

  1. Nella Google Cloud console, vai alla pagina Gestisci risorse.

    Vai a Gestisci risorse

  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare, quindi fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID progetto e fai clic su Chiudi per eliminare il progetto.

Passaggi successivi