Melakukan streaming pesan Pub/Sub melalui WebSockets

Tutorial ini mengilustrasikan cara aplikasi frontend—dalam hal ini, web halaman—menangani data masuk dalam volume tinggi saat Anda menggunakan Google Cloud. Tutorial ini menjelaskan beberapa tantangan streaming volume tinggi. Aplikasi contoh disertakan dalam tutorial ini yang mengilustrasikan cara menggunakan WebSockets untuk memvisualisasikan aliran pesan padat yang dipublikasikan ke topik Pub/Sub, memprosesnya secara tepat waktu yang mempertahankan frontend berperforma tinggi.

Tutorial ini ditujukan untuk developer yang memahami komunikasi browser ke server melalui HTTP dan penulisan aplikasi frontend menggunakan HTML, CSS, dan JavaScript. Tutorial ini mengasumsikan bahwa Anda memiliki pengalaman dengan Google Cloud dan memahami alat command line Linux.

Tujuan

  • Membuat dan mengonfigurasi instance virtual machine (VM) dengan komponen yang diperlukan untuk melakukan streaming payload langganan Pub/Sub ke klien browser.
  • Mengonfigurasi proses di VM untuk berlangganan topik Pub/Sub dan menampilkan pesan individual ke log.
  • Menginstal server web untuk menayangkan konten statis dan melakukan streaming output perintah shell ke klien WebSocket.
  • Memvisualisasikan agregasi streaming WebSocket dan contoh pesan individual di browser menggunakan HTML, CSS, dan JavaScript.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen yang dapat ditagih sebagai berikut Google Cloud:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis. Google Cloud

Sebelum memulai

  1. Sign in to your Google Cloud account. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  5. Verify that billing is enabled for your Google Cloud project.

  6. Buka Cloud Shell untuk menjalankan perintah yang tercantum dalam tutorial ini.

    BUKA Cloud Shell

    Anda menjalankan semua perintah terminal dalam tutorial ini dari Cloud Shell.

  7. Aktifkan Compute Engine API dan Pub/Sub API:
    gcloud services enable compute pubsub

Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Lihat Pembersihan untuk mengetahui detail selengkapnya.

Pengantar

Karena semakin banyak aplikasi yang menggunakan model berbasis peristiwa, penting bagi aplikasi frontend untuk dapat membuat koneksi sederhana dan tanpa hambatan ke layanan pesan yang menjadi dasar arsitektur ini.

Ada beberapa opsi untuk melakukan streaming data ke klien browser web; yang paling umum adalah WebSockets. Tutorial ini memandu Anda menginstal proses yang berlangganan streaming pesan yang dipublikasikan ke topik Pub/Sub, dan merutekan pesan tersebut melalui server web dalam perjalanan ke klien yang terhubung melalui WebSockets.

Untuk tutorial ini, Anda akan menggunakan topik Pub/Sub yang tersedia secara publik yang digunakan dalam NYC Taxi Tycoon Google Dataflow CodeLab. Topik ini memberi Anda streaming real-time telemetri taksi simulasi berdasarkan data perjalanan historis yang diambil di New York City dari Taxi & Limousine Commission's trip record set data.

Arsitektur

Diagram berikut menunjukkan arsitektur tutorial yang Anda buat dalam tutorial ini.

Arsitektur tutorial

Diagram ini menunjukkan penayang pesan yang berada di luar project yang berisi resource Compute Engine; penayang mengirim pesan ke topik Pub/Sub. Instance Compute Engine menyediakan pesan melalui WebSockets ke browser yang menjalankan dasbor berdasarkan HTML5 dan JavaScript.

Tutorial ini menggunakan kombinasi alat untuk menghubungkan Pub/Sub dan WebSockets:

  • pulltop adalah program Node.js yang Anda instal sebagai bagian dari tutorial ini. Alat ini berlangganan topik Pub/Sub dan melakukan streaming pesan yang diterima ke output standar.
  • websocketd adalah alat command line kecil yang menggabungkan program antarmuka command line yang ada dan memungkinkan program tersebut diakses menggunakan WebSocket.

Dengan menggabungkan pulltop dan websocketd, Anda dapat melakukan streaming pesan yang diterima dari topik Pub/Sub ke browser menggunakan WebSockets.

Menyesuaikan throughput topik Pub/Sub

Topik Pub/Sub publik NYC Taxi Tycoon menghasilkan 2.000 hingga 2.500 update perjalanan taksi simulasi per detik—hingga 8 Mb atau lebih per detik. Kontrol aliran bawaan di Pub/Sub memperlambat kecepatan pesan pelanggan secara otomatis jika Pub/Sub mendeteksi antrean pesan yang tidak dikonfirmasi yang terus bertambah. Oleh karena itu, Anda mungkin melihat variabilitas kecepatan pesan yang tinggi di berbagai workstation, koneksi jaringan, dan kode pemrosesan frontend.

Pemrosesan pesan browser yang efektif

Mengingat volume pesan yang tinggi yang berasal dari streaming WebSocket, Anda harus berhati-hati dalam menulis kode frontend yang memproses streaming ini. Misalnya, Anda dapat membuat elemen HTML secara dinamis untuk setiap pesan. Namun, pada kecepatan pesan yang diharapkan, memperbarui halaman untuk setiap pesan dapat mengunci jendela browser. Alokasi memori yang sering terjadi akibat pembuatan elemen HTML secara dinamis juga memperpanjang durasi pengumpulan sampah memori, sehingga menurunkan kualitas pengalaman pengguna. Singkatnya, Anda tidak ingin memanggil document.createElement() untuk setiap pesan yang berjumlah sekitar 2.000 yang tiba setiap detik.

Pendekatan yang diambil oleh tutorial ini untuk mengelola streaming pesan padat ini adalah sebagai berikut:

  • Menghitung dan terus memperbarui kumpulan metrik streaming secara real time, menampilkan sebagian besar informasi tentang pesan yang diamati sebagai nilai gabungan.
  • Menggunakan dasbor berbasis browser untuk memvisualisasikan contoh kecil pesan individual pada jadwal yang telah ditentukan, hanya menampilkan peristiwa penurunan dan penjemputan secara real time.

Gambar berikut menunjukkan dasbor yang dibuat sebagai bagian dari tutorial ini.

Dasbor yang dibuat di halaman web oleh kode dalam tutorial ini

Gambar ini menggambarkan latensi pesan terakhir sebesar 24 milidetik dengan kecepatan hampir 2.100 pesan per detik. Jika jalur kode penting untuk memproses setiap pesan individual tidak selesai tepat waktu, jumlah pesan yang diamati per detik akan berkurang seiring meningkatnya latensi pesan terakhir. Pengambilan sampel perjalanan dilakukan menggunakan JavaScript setInterval API yang ditetapkan untuk melakukan siklus sekali setiap tiga detik, yang mencegah frontend membuat sejumlah besar elemen DOM selama masa pakainya. (Sebagian besar elemen tersebut praktis tidak dapat diamati dengan kecepatan lebih tinggi dari 10 per detik.)

Dasbor mulai memproses peristiwa di tengah streaming, sehingga perjalanan yang sudah berlangsung dikenali sebagai perjalanan baru oleh dasbor kecuali jika perjalanan tersebut telah dilihat sebelumnya. Kode ini menggunakan array asosiatif untuk menyimpan setiap perjalanan yang diamati, yang diindeks berdasarkan nilai ride_id, dan menghapus referensi ke perjalanan tertentu saat penumpang telah diturunkan. Perjalanan dalam status "enroute" atau "pickup" menambahkan referensi ke array tersebut kecuali (untuk kasus "enroute") perjalanan telah diamati sebelumnya.

Menginstal dan mengonfigurasi server WebSocket

Untuk memulai, Anda membuat instance Compute Engine yang akan digunakan sebagai server WebSocket. Setelah membuat instance, Anda akan menginstal alat di instance tersebut yang akan diperlukan nanti.

  1. Di Cloud Shell, tetapkan zona Compute Engine default. Contoh berikut menunjukkan us-central1-a, tetapi Anda dapat menggunakan zona yang diinginkan.

    gcloud config set compute/zone us-central1-a
    
  2. Buat instance Compute Engine bernama websocket-server di zona default:

    gcloud compute instances create websocket-server --tags wss
    
  3. Tambahkan aturan firewall yang mengizinkan traffic TCP di port 8000 ke instance mana pun yang diberi tag sebagai wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Jika Anda menggunakan project yang sudah ada, pastikan port TCP 22 terbuka untuk mengizinkan konektivitas SSH ke instance.

    Secara default, aturan firewall default-allow-ssh diaktifkan di jaringan default. Namun, jika Anda atau administrator menghapus aturan default dalam project yang sudah ada, port TCP 22 mungkin tidak terbuka. (Jika Anda membuat project baru untuk tutorial ini, aturan ini akan diaktifkan secara default, dan Anda tidak perlu melakukan apa pun.)

    Tambahkan aturan firewall yang mengizinkan traffic TCP di port 22 ke instance mana pun yang diberi tag sebagai wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Hubungkan ke instance menggunakan SSH:

    gcloud compute ssh websocket-server
    
  6. Pada perintah terminal instance, ganti akun ke root agar Anda dapat menginstal software:

    sudo -s
    
  7. Instal alat git dan unzip:

    apt-get install -y unzip git
    
  8. Instal biner websocketd di instance:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Menginstal Node.js dan kode tutorial

  1. Di terminal pada instance, instal Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Download repositori sumber tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Ubah izin pada pulltop untuk mengizinkan eksekusi:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Instal dependensi pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Menguji apakah pulltop dapat membaca pesan

  1. Di instance, jalankan pulltop terhadap topik publik:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Jika pulltop berfungsi, Anda akan melihat streaming hasil seperti berikut:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Tekan Ctrl+C untuk menghentikan streaming.

Membuat alur pesan ke websocketd

Setelah memastikan bahwa pulltop dapat membaca topik Pub/Sub, Anda dapat memulai proses websocketd untuk mulai mengirim pesan ke browser.

Mengambil pesan topik ke file lokal

Untuk tutorial ini, Anda akan mengambil streaming pesan yang Anda dapatkan dari pulltop dan menuliskannya ke file lokal. Mengambil traffic pesan ke file lokal akan menambahkan persyaratan penyimpanan, tetapi juga memisahkan operasi proses websocketd dari pesan topik Pub/Sub streaming. Mengambil informasi secara lokal memungkinkan skenario saat Anda mungkin ingin menghentikan streaming Pub/Sub untuk sementara (mungkin untuk menyesuaikan parameter kontrol aliran), tetapi tidak memaksa klien WebSocket yang saat ini terhubung untuk direset. Saat streaming pesan dibuat ulang, websocketd akan otomatis melanjutkan streaming pesan ke klien.

  1. Di instance, jalankan pulltop terhadap topik publik, dan alihkan output pesan ke file taxi.json lokal. Perintah nohup menginstruksikan OS untuk terus menjalankan proses pulltop jika Anda logout atau menutup terminal.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Verifikasi bahwa pesan JSON sedang ditulis ke file:

    tail /var/tmp/taxi.json
    

    Jika pesan sedang ditulis ke file taxi.json, output-nya akan mirip dengan yang berikut ini:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Ubah ke folder web aplikasi Anda:

    cd ../web
    
  4. Mulai websocketd untuk mulai melakukan streaming konten file lokal menggunakan WebSockets:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Tindakan ini menjalankan perintah websocketd di latar belakang. Alat websocketd menggunakan output perintah tail dan melakukan streaming setiap elemen sebagai pesan WebSocket.

  5. Periksa konten nohup.out untuk memverifikasi bahwa server dimulai dengan benar:

    tail nohup.out
    

    Jika semuanya berfungsi dengan benar, output-nya akan mirip dengan yang berikut ini:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Memvisualisasikan pesan

Pesan perjalanan individual yang dipublikasikan ke topik Pub/Sub memiliki struktur seperti ini:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

Berdasarkan nilai ini, Anda menghitung beberapa metrik untuk header dasbor. Perhitungan dilakukan sekali per peristiwa perjalanan masuk. Nilai mencakup:

  • Latensi pesan terakhir. Jumlah detik antara stempel waktu stempel waktu peristiwa perjalanan terakhir yang diamati dan waktu saat ini (berasal dari jam di sistem yang menghosting browser web).
  • Perjalanan aktif. Jumlah perjalanan yang sedang berlangsung. Jumlah ini dapat bertambah dengan cepat, dan jumlahnya akan berkurang saat nilai ride_status dropoff diamati.
  • Kecepatan pesan. Jumlah rata-rata peristiwa perjalanan yang diproses per detik.
  • Jumlah total yang diukur. Jumlah pengukur dari semua perjalanan aktif. Jumlah ini akan berkurang seiring dengan penurunan perjalanan.
  • Jumlah total penumpang. Jumlah penumpang di semua perjalanan. Jumlah ini akan berkurang seiring dengan selesainya perjalanan.
  • Jumlah rata-rata penumpang per perjalanan. Jumlah total perjalanan, dibagi dengan jumlah total penumpang.
  • Jumlah rata-rata yang diukur per penumpang. Jumlah total yang diukur dibagi dengan jumlah total penumpang.

Selain metrik dan contoh perjalanan individual, saat penumpang dijemput atau diturunkan, dasbor akan menampilkan notifikasi peringatan di atas petak contoh perjalanan.

  1. Dapatkan alamat IP eksternal instance saat ini:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Salin alamat IP.

  3. Di komputer lokal Anda, buka browser web baru dan masukkan URL:

    http://$ip-address:8000.

    Anda akan melihat halaman yang menampilkan dasbor untuk tutorial ini:

    Dasbor yang dibuat oleh kode dalam tutorial ini, dengan pesan selamat datang dan sebelum data apa pun ditampilkan.

  4. Klik ikon taksi di bagian atas untuk membuka koneksi ke streaming dan mulai memproses pesan.

    Perjalanan individual divisualisasikan dengan contoh sembilan perjalanan aktif yang dirender setiap tiga detik:

    Dasbor yang menampilkan perjalanan aktif.

    Anda dapat mengklik ikon taksi kapan saja untuk memulai atau menghentikan streaming WebSocket. Jika koneksi WebSocket terputus, ikon akan berubah menjadi merah, dan pembaruan metrik serta perjalanan individual akan dihentikan. Untuk terhubung kembali, klik ikon taksi lagi.

Performa

Screenshot berikut menunjukkan monitor performa Alat Developer Chrome saat tab browser memproses sekitar 2.100 pesan per detik.

Panel monitor performa browser yang menampilkan penggunaan CPU, ukuran heap, node DOM, dan penghitungan ulang gaya per detik. Nilainya relatif datar.

Dengan pengiriman pesan yang terjadi pada latensi sekitar 30 md, penggunaan CPU rata-rata sekitar 80%. Penggunaan memori ditampilkan pada minimum 29 MB, dengan total 57 MB yang dialokasikan, dan bertambah serta berkurang secara bebas.

Pembersihan

Menghapus aturan firewall

Jika Anda menggunakan project yang sudah ada untuk tutorial ini, Anda dapat menghapus aturan firewall yang Anda buat. Sebaiknya minimalkan port terbuka.

  1. Hapus aturan firewall yang Anda buat untuk mengizinkan TCP di port 8000:

    gcloud compute firewall-rules delete websocket
    
  2. Jika Anda juga membuat aturan firewall untuk mengizinkan konektivitas SSH, hapus aturan firewall untuk mengizinkan TCP di port 22:

    gcloud compute firewall-rules delete wss-ssh
    

Menghapus project

Jika tidak ingin menggunakan project ini lagi, Anda dapat menghapus project tersebut.

  1. Di Google Cloud konsol, buka halaman Kelola resource.

    Buka Kelola resource

  2. Pada daftar project, pilih project yang Anda ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Langkah berikutnya