このチュートリアルでは、Google Cloudの使用時にフロントエンド アプリ(この場合はウェブページ)で大量の受信データを処理する方法について説明します。また、大量のメッセージ ストリームを処理する際の課題についても説明します。このチュートリアルのサンプルアプリでは、WebSockets を使用して Pub/Sub トピックに公開された大量のメッセージ ストリームを可視化し、フロントエンドの効率性を維持しながら、メッセージをタイムリーに処理します。
このチュートリアルは、HTTP を介したブラウザ間通信と、HTML、CSS、JavaScript を使用したフロントエンド アプリの作成に精通しているデベロッパーを対象としています。また、Google Cloudの使用経験があり、Linux コマンドライン ツールに精通していることを前提としています。
目標
- 仮想マシン(VM)インスタンスを作成し、Pub/Sub サブスクリプションのペイロードをブラウザ クライアントにストリーミングするためのコンポーネントを構成します。
- Pub/Sub トピックをサブスクライブし、個々のメッセージをログに出力するように VM のプロセスを構成します。
- ウェブサーバーをインストールして静的コンテンツを提供し、シェルコマンド出力を WebSocket クライアントにストリーミングします。
- HTML、CSS、JavaScript を使用して、WebSocket ストリームの集計と個々のメッセージ サンプルをブラウザで可視化します。
費用
このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
- このチュートリアルに記載されているコマンドを実行するために、Cloud Shell を開きます。
このチュートリアルでは、Cloud Shell からすべてのターミナル コマンドを実行します。
- Compute Engine API と Pub/Sub API を有効にします。
gcloud services enable compute pubsub
このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
はじめに
イベント ドリブン モデルを採用するアプリが増えています。フロントエンド アプリでは、これらのアーキテクチャの基盤となるメッセージング サービスに簡単に接続できることが重要になります。
ウェブブラウザ クライアントにデータをストリーミングする方法はいくつかありますが、よく使われているのが WebSocket です。このチュートリアルでは、Pub/Sub トピックに公開されるメッセージ ストリームをサブスクライブするプロセスをインストールし、WebSocket 経由で接続しているクライアントにウェブサーバーからメッセージを転送します。
このチュートリアルでは、NYC Taxi Tycoon Google Dataflow CodeLab で使用されている一般公開の Pub/Sub トピックを使用します。このトピックは、Taxi & Limousine Commission の移動記録データセットからニューヨーク市の配車履歴データを取得して利用状況のシミュレーションを行い、その結果をリアルタイムで提供します。
アーキテクチャ
次の図に、このチュートリアルで構築するアーキテクチャを示します。
この図では、メッセージ パブリッシャーが Compute Engine リソースを含むプロジェクトの外部にあり、このパブリッシャーが Pub/Sub トピックにメッセージを送信します。メッセージは Compute Engine インスタンスから WebSocket 経由でブラウザに送信され、HTML5 と JavaScript で作成されたダッシュボードに表示されます。
このチュートリアルでは、Pub/Sub と WebSocket を連携させるため、次のツールを使用しています。
pulltopは、このチュートリアルでインストールする Node.js プログラムです。このツールは Pub/Sub トピックをサブスクライブし、受信したメッセージを標準出力にストリーミングします。websocketdは、既存のコマンドライン インターフェース プログラムをラップし、WebSocket を使用してアクセスできるようにする小さなコマンドライン ツールです。
pulltop と websocketd を組み合わせることで、Pub/Sub トピックから受信したメッセージを WebSocket 経由でブラウザにストリーミングすることが可能になります。
Pub/Sub トピックのスループットの調整
NYC Taxi Tycoon の一般公開の Pub/Sub トピックでは、1 秒間に 2,000〜2,500 件の配車データが更新され、1 秒あたり最大で 8 MB のデータが生成されます。キュー内で未確認メッセージが増加していることを Pub/Sub が検知すると、Pub/Sub に組み込まれたフロー制御により、サブスクライバーのメッセージ レートが自動的に低下します。このため、ワークステーション、ネットワーク接続、フロントエンド処理コードによって、メッセージレートの変動が大きくなることがあります。
効率的なブラウザ メッセージの処理
WebSocket ストリーム経由で大量のメッセージを受信する場合は、このストリームを処理するフロントエンド コードを作成する必要があります。たとえば、メッセージごとに HTML 要素を動的に生成するようにします。ただし、想定されたメッセージ レートで各メッセージのページを更新すると、ブラウザ ウィンドウがロックされる可能性があります。また、HTML 要素を動的に生成すると、メモリの割り当てが頻繁に発生してガベージ コレクションの期間が長くなり、ユーザー エクスペリエンスが低下します。毎秒 2, 000 件届くメッセージごとに document.createElement() を呼び出す必要はありません。
このチュートリアルでは、このような大量のメッセージ ストリームを管理するため、次のことを行います。
- 一連のストリーム指標をリアルタイムで計算し、継続的に更新する。これにより、モニタリングされたメッセージに関する大部分の情報を集計値として表示できます。
- ブラウザベースのダッシュボードを使用して、事前に定義されたスケジュールで個々のメッセージのサンプルを視覚化し、降車イベントと乗車イベントのみをリアルタイムで表示する。
次の図に、このチュートリアルで作成したダッシュボードを示します。

この図は、最後のメッセージのレイテンシが 24 ミリ秒で、メッセージ レートが 1 秒あたり約 2,100 件であることを示しています。個々のメッセージを処理するクリティカル コードパスが時間内に完了しないと、最後のメッセージのレイテンシが増加するため、1 秒あたりのモニタリング メッセージの数が減少します。配車のサンプリングは 3 秒に 1 回のサイクルで実行される JavaScript setInterval API で行われます。これにより、フロントエンドで大量の DOM 要素が生成されなくなります(その大半は毎秒 10 を超えるレートでは確認できません)。
ダッシュボードは、ストリームの途中でイベントの処理を開始するため、進行中の配車は以前に表示されていない限りダッシュボードで新しいデータとして認識されます。このコードは、ride_id 値をインデックスとした連想配列で配車データを格納します。乗客が降車すると、その配車への参照を削除します。状態が enroute または pickup の配車は、その配車が確認済みでない限り、その配列への参照を追加します。
WebSocket サーバーをインストールして構成する
まず、WebSocket サーバーとして使用する Compute Engine インスタンスを作成します。インスタンスを作成したら、後で必要なツールをインストールします。
Cloud Shell で、デフォルトの Compute Engine ゾーンを設定します。次の例では
us-central1-aを設定していますが、任意のゾーンを使用できます。gcloud config set compute/zone us-central1-aデフォルト ゾーンに
websocket-serverという名前の Compute Engine インスタンスを作成します。gcloud compute instances create websocket-server --tags wssファイアウォール ルールを追加し、ポート
8000でwssタグ付きのインスタンスに TCP トラフィックを許可します。gcloud compute firewall-rules create websocket \ --direction=IN \ --allow=tcp:8000 \ --target-tags=wss既存のプロジェクトを使用している場合は、インスタンスへの SSH 接続が許可されるように、TCP ポート
22が開いていることを確認してください。デフォルトでは、
default-allow-sshファイアウォール ルールはデフォルト ネットワークで有効になっています。ただし、自身または管理者が既存のプロジェクトでデフォルト ルールを削除した場合、TCP ポート22が開かない可能性があります(このチュートリアル用に新しいプロジェクトを作成した場合、このルールはデフォルトで有効になり、操作は不要です)。ファイアウォール ルールを追加し、ポート
22でwssタグ付きのインスタンスに TCP トラフィックを許可します。gcloud compute firewall-rules create wss-ssh \ --direction=IN \ --allow=tcp:22 \ --target-tags=wssSSH を使用してインスタンスに接続します。
gcloud compute ssh websocket-serverインスタンスのターミナル コマンドで、アカウントを
rootに切り替えて、ソフトウェアをインストールできるようにします。sudo -sgitツールとunzipツールをインストールします。apt-get install -y unzip gitインスタンスに
websocketdバイナリをインストールします。cd /var/tmp/ wget \ https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip unzip websocketd-0.3.0-linux_386.zip mv websocketd /usr/bin
Node.js とチュートリアル コードをインストールする
インスタンスのターミナルで Node.js をインストールします。
curl -sL https://deb.nodesource.com/setup_10.x | bash - apt-get install -y nodejsチュートリアルのソース リポジトリをダウンロードします。
exit cd ~ git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.gitpulltopの権限を変更して実行を許可します。cd solutions-pubsub-websockets chmod 755 pulltop/pulltop.jspulltop依存関係をインストールします。cd pulltop npm install sudo npm link
pulltop がメッセージを読み取れることをテストする
インスタンスで、公開トピックに対して
pulltopを実行します。pulltop projects/pubsub-public-data/topics/taxirides-realtimepulltopが機能している場合は、次のような結果が表示されます。{"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude" :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6 593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat us":"enroute","passenger_count":1}[
Ctrl+C] を押してストリームを停止します。
websocketd とのメッセージ フローを確立する
pulltop が Pub/Sub トピックを読み取れることを確認したら、websocketd プロセスを開始してブラウザへのメッセージ送信を開始できます。
トピック メッセージをローカル ファイルにキャプチャする
このチュートリアルでは、pulltop から取得したメッセージ ストリームをキャプチャし、ローカル ファイルに書き込みます。メッセージ トラフィックをローカル ファイルにキャプチャする場合、ストレージ要件が増えますが、Pub/Sub トピック メッセージのストリーミング と websocketd プロセスのオペレーションは分離されます。ローカルで情報をキャプチャする場合は、現在接続している WebSocket クライアントを強制的にリセットするのではなく、Pub/Sub ストリーミングを一時的に停止できます。メッセージ ストリームが再度確立されると、websocketd はクライアントへのメッセージ ストリーミングを自動的に再開します。
インスタンスで、公開トピックに
pulltopを実行し、メッセージの出力をローカルのtaxi.jsonファイルにリダイレクトします。ログアウトするかターミナルを閉じると、nohupコマンドはpulltopプロセスを続行するように OS に指示します。nohup pulltop \ projects/pubsub-public-data/topics/taxirides-realtime > \ /var/tmp/taxi.json &JSON メッセージがファイルに書き込まれていることを確認します。
tail /var/tmp/taxi.jsonメッセージが
taxi.jsonファイルに書き込まれている場合、出力は次のようになります。{"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude" :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6 593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta tus":"enroute","passenger_count":1}アプリのウェブフォルダに移動します。
cd ../webwebsocketdを起動して、WebSocket を使用してローカル ファイルのコンテンツ ストリーミングを開始します。nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &これにより、
websocketdコマンドがバックグラウンドで実行されます。websocketdツールは、tailコマンドの出力を使用し、WebSocket メッセージとして各要素をストリーミングします。nohup.outの内容を参照して、サーバーが正しく起動したことを確認します。tail nohup.outすべてが正常に機能している場合、出力は次のようになります。
Mon, 25 Mar 2019 14:03:53 -0400 | INFO | server | | Serving using application : /usr/bin/tail -f /var/tmp/taxi.json Mon, 25 Mar 2019 14:03:53 -0400 | INFO | server | | Serving static content from : .
メッセージの可視化
Pub/Sub トピックに公開された個々の配車メッセージは、次のような構造になっています。
{
"ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
"point_idx": 248,
"latitude": 40.74644000000001,
"longitude": -73.97144,
"timestamp": "2019-03-24T00:46:08.49094-04:00",
"meter_reading": 8.40615,
"meter_increment": 0.033895764,
"ride_status": "enroute",
"passenger_count": 1
}
これらの値に基づいて、ダッシュボードのヘッダーの指標を計算します。この計算は、インバウンドの配車イベントごとに 1 回実行されます。値は次のとおりです。
- 最後のメッセージのレイテンシ。最後に確認された配車イベントのタイムスタンプから現在の時刻(ウェブブラウザをホストしているシステムの時刻)までの秒数。
- アクティブな配車。現在進行中の配車です。この値は急増する場合があります。また、
ride_statusの値がdropoffになると減少します。 - メッセージ レート。1 秒あたりに処理された配車イベントの平均数。
- メーター料金の合計額。アクティブな配車のメーター料金の合計額。この値は、乗客が降車すると減少します。
- 乗客の総数。乗客の数。この数は、乗車が完了すると減少します。
- 乗車 1 台あたりの平均乗車人数。乗客数の合計数を配車数の合計数で割った値。
- 乗客 1 人あたりの平均メーター料金。メーター料金の合計金額を乗客の総数で割った値。
乗客が乗車または降車すると、指標と個々の配車サンプルのほかに、配車サンプルのグリッドの上にアラート通知が表示されます。
現在のインスタンスの外部 IP アドレスを取得します。
curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echoその IP アドレスをコピーします。
ローカルマシンで新しいウェブブラウザを開き、URL を入力します。
http://$ip-address:8000このチュートリアルのダッシュボードにページが表示されます。

上部にあるタクシーのアイコンをクリックしてストリームへの接続を開き、メッセージの処理を開始します。
個々の配車が視覚化され、9 つの配車サンプルが 3 秒ごとにレンダリングされます。

タクシーのアイコンをクリックすると、WebSocket ストリームを開始または停止できます。WebSocket 接続が切断されると、アイコンが赤に変わり、指標と個々の配車情報の更新が停止します。再接続するには、タクシーのアイコンをもう一度クリックします。
パフォーマンス
次のスクリーンショットは、Chrome デベロッパー ツールのパフォーマンス モニターです。ブラウザのタブが 1 秒あたり約 2,100 件のメッセージを処理していることを表しています。

約 30 ミリ秒のレイテンシでメッセージがディスパッチされると、CPU 使用率は平均で約 80% になります。メモリ使用率は最小の 29 MB です。合計で 57 MB が割り当てられていますが、増減は自由に行われます。
クリーンアップ
ファイアウォール ルールを削除する
このチュートリアルで既存のプロジェクトを使用した場合、作成したファイアウォール ルールを削除できます。開いているポートを最小限に抑えることをおすすめします。
ポート
8000で TCP を許可するように作成したファイアウォール ルールを削除します。gcloud compute firewall-rules delete websocketSSH 接続を許可するファイアウォール ルールも作成した場合は、ポート
22で TCP を許可するファイアウォール ルールを削除します。gcloud compute firewall-rules delete wss-ssh
プロジェクトの削除
このプロジェクトを再度使用したくない場合は、プロジェクトを削除できます。
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
次のステップ
- Pub/Sub と WebSocket プロトコルについて学習する。
- Google Maps Platform API キーを
cabdash.jsに追加して、乗車と降車の位置情報を取得する。 - Google Cloud に関するリファレンス アーキテクチャ、図、ベスト プラクティスを確認する。Cloud アーキテクチャ センターをご覧ください。