メッセージをモニタリングする

このガイドでは、Manufacturing Data Engine(MDE)に送信されたメッセージのモニタリング方法、処理パイプラインでのフロー、構成や潜在的なシステムの問題によって発生する可能性のある問題の診断方法について説明します。

MDE を通過するメッセージをモニタリングするには、operations-log BigQuery テーブルを使用する必要があります。MDE パイプラインのステップでメッセージの処理に失敗すると、ステップと失敗の理由を示す operations-log テーブルにメッセージが送信されます。

失敗したメッセージはすべてこのテーブルに書き込まれますが、成功したメッセージも operations-log テーブルに書き込むように MDE を構成することもできます。これは問題のデバッグに役立ちますが、システムで大量の追加トラフィックが生成され、本番環境でパフォーマンスが低下する可能性があるため、オンのままにしないでください。

REST

次の REST API リクエストを実行して、operations-log テーブルを構成します。

POST /configuration/v1/environment

{
  "operationsLogLevel": "ALL"
}

ここで

  • ALL: 処理パイプラインの各ステップで、すべてのメッセージが operations-log テーブルに送信されます。
  • ERROR: 処理ステップのいずれかに失敗したメッセージのみが operations-log テーブルに送信されます。

処理手順

処理パイプラインを通過する際にメッセージが拒否される理由を診断するには、さまざまな処理ステップとその内容を理解しておくと便利です。詳細については、MDE アーキテクチャをご覧ください。

  1. message-mapper: 元の JSON を読み取り、対応する message-class と照合して、Whistle を使用して処理し、1 つ以上のレコードを出力します。
  2. configuration-manager: システムにタグが存在しない場合は新しいタグを作成し、適切なタイプ(メタデータ バケット、シンク、変換)で定義されたすべてのプロパティをレコードに追加します。
  3. metadata-manager: このレコードのすべてのメタデータ参照を解決し、新しいメタデータ参照を受信した場合はシステム メタデータ インスタンスを更新し、マテリアライズされたメタデータをレコードに追加します(そのように構成されている場合)。
  4. bigquery-sink: レコードを適切な型構造にマッピングし、対応する PubSub トピックに送信して BigQuery に書き込みます。
  5. pubsub-sink: レコードを pubsub Proto または JSON 構造にマッピングし、対応するトピックに送信します。
  6. GCSWriter: input-messages トピックから受信した未加工データと、メタデータ マネージャーを通過した処理済みデータの両方を書き込みます。
  7. BigtableWriter: Bigtable にデータを書き込みます。
  8. GCSReader: Cloud Storage からファイルを読み取り、メッセージを input-messages に送信します。

構成されたシンクにメッセージが表示されない問題を診断する

MDE にメッセージを送信したのに、構成されたシンクにメッセージがない場合は、まず、タイプでシンクが正しく構成されていること(タイプ セクションを参照)と、BigQuery で正しいテーブルをクエリしていることを確認します。テーブルは型にちなんで命名されていることに注意してください。

正しく構成されている場合は、operations-log テーブルを使用して問題を診断する必要があります。次の例に示すように、event_timestamp で並べ替えるか、メッセージが送信されたときにフィルタリングして、一般的なクエリから始めることができます。

SELECT
  *
FROM
  `mde_system.operations-log`
WHERE
  DATE(event_timestamp) = CURRENT_DATE()
ORDER BY
  event_timestamp desc
LIMIT 100;

source_message_id を使用して、特定のメッセージでフィルタすることもできます。この ID は、メッセージのパブリッシュ時に Pub/Sub によって割り当てられます。gcloud CLI を使用してコマンドラインからメッセージをパブリッシュすると、パブリッシュされたメッセージの messageId が返されます。

SELECT
  *
FROM
  `mde_system.operations-log`
WHERE
  DATE(event_timestamp) <= CURRENT_DATE()
  AND source_message_id = 'PubSubMessageId';

メッセージが見つからない場合や、メッセージが多すぎる場合は、元のメッセージの属性に基づいてフィルタできます。メッセージは常に payload フィールドにログに記録され、最後のステップで残された状態が JSON フィールドに保存されるため、TO_JSON_STRING を使用しやすくなり、% を使用して目的のテキストを含むメッセージを検索しやすくなります。

SELECT
  *
FROM
  `mde_system.operations-log`
WHERE
  DATE(event_timestamp) = CURRENT_DATE()
  AND TO_JSON_STRING(payload) LIKE "%TEXT-TO-FIND%"
ORDER BY
  event_timestamp DESC;

operations-log テーブルでメッセージを見つけたら、error-message 列でメッセージが拒否された理由を確認します。最も一般的なエラーは次のとおりです。

  • 受信メッセージを構成マネージャーの登録済みメッセージ クラスと照合できませんでした。
  • メッセージ クラス <MESSAGE_CLASS_NAME> のパーサーが見つかりません。
  • メッセージを逆シリアル化できなかったため、メッセージの処理をスキップします(メッセージが有効な JSON ではありません)。
  • パーサー <PARSER_NAME> で有効なメッセージを構築できませんでした。メッセージにタイムスタンプ フィールドがありません。
  • パーサー <PARSER_NAME> で有効なメッセージを構築できませんでした。メッセージに tagName フィールドがないか、空です。
  • パーサー <PARSER_NAME> で有効なメッセージを構築できませんでした。メッセージのタイムスタンプがサポートされている範囲外です。

メッセージの失敗の原因となっているエラーを見つけて修正すると、メッセージはそれぞれのシンクに届くようになります。