パーサー作成時のヒントとトラブルシューティング
このドキュメントでは、パーサーコードを作成する際に発生する可能性のある問題について説明します。
パーサーコードを作成するときに、解析手順が想定どおりに機能しない場合にエラーが発生します。エラーが発生する可能性がある状況は次のとおりです。
Grokパターンが失敗するrenameオペレーションまたはreplaceオペレーションが失敗する- パーサーコードの構文エラー
パーサーコードでの一般的な手法
次のセクションでは、問題のトラブルシューティングに役立つベスト プラクティス、ヒント、解決策について説明します。
変数名にドットやハイフンを使用しない
変数名にハイフンやドットを使用すると、多くの場合、UDM フィールドに値を格納するために merge オペレーションを実行した際に、予期しない動作が発生する可能性があります。解析の問題が断続的に発生することもあります。
たとえば、次の変数名は使用しないでください。
my.variable.resultmy-variable-result
代わりに、次の変数名を使用します。my_variable_result
変数名として特別な意味を持つ用語を使用しない
event や timestamp など、パーサーコードで特別な意味を持つ単語があります。
文字列 event は、単一の UDM レコードを表すためによく使用され、@output ステートメントで使用されます。ログメッセージに event というフィールドが含まれている場合、または event という中間変数を定義し、パーサーコードで @output ステートメントに event という単語を使用すると、名前の競合に関するエラー メッセージが表示されます。
中間変数の名前を変更するか、UDM フィールド名と @output ステートメントで接頭辞として event1 という用語を使用してください。
timestamp という単語は、元の未加工ログの作成されたタイムスタンプを表します。この中間変数に設定された値は、metadata.event_timestamp UDM フィールドに保存されます。用語 @timestamp は、未加工ログが解析されて UDM レコードが作成された日時を表します。
次の例では、未加工ログが解析された日時に metadata.event_timestamp UDM フィールドを設定します。
# Save the log parse date and time to the timestamp variable
mutate {
rename => {
"@timestamp" => "timestamp"
}
}
次の例では、元の未加工ログから抽出され、when 中間変数に保存された日時に metadata.event_timestamp UDM フィールドを設定します。
# Save the event timestamp to timestamp variable
mutate {
rename => {
"when" => "timestamp"
}
}
次の用語を変数として使用しないでください。
- collectiontimestamp
- createtimestamp
- イベント
- filename
- message
- namespace
- 出力
- onerrorcount
- timestamp
- タイムゾーン
各データ値を個別の UDM フィールドに保存する
複数のフィールドを区切り文字で連結して 1 つの UDM フィールドに格納しないでください。次に例を示します。
"principal.user.first_name" => "first:%{first_name},last:%{last_name}"
代わりに、各値を個別の UDM フィールドに保存します。
"principal.user.first_name" => "%{first_name}"
"principal.user.last_name" => "%{last_name}"
コードではタブではなくスペースを使用する
パーサーコードでタブを使用しないでください。スペースのみを使用し、2 つのスペースを一度にインデントします。
1 つのオペレーションで複数のマージ アクションを実行しない
1 つのオペレーションで複数のフィールドをマージすると、結果に一貫性がなくなる可能性があります。代わりに、merge ステートメントを個別のオペレーションに配置します。
たとえば、次の例を置き換えます。
mutate {
merge => {
"security_result.category_details" => "category_details"
"security_result.category_details" => "super_category_details"
}
}
上記のコードブロックを次のコードブロックに置き換えます。
mutate {
merge => {
"security_result.category_details" => "category_details"
}
}
mutate {
merge => {
"security_result.category_details" => "super_category_details"
}
}
if 条件式と if else 条件式の選択
テスト対象の条件値に 1 つの一致のみが設定可能である場合は、if else 条件文を使用します。この方法の方が若干効率的です。ただし、テスト対象の値が複数回一致する可能性がある場合は、複数の異なる if ステートメントを使用し、最も一般的なケースから最も具体的なケースの順にステートメントを並べます。
パーサーの変更をテストするログファイルの代表的なセットを選択する
ベスト プラクティスは、さまざまな形式の未加工ログサンプルを使用してパーサーコードをテストすることです。これにより、パーサーで処理する必要がある一意のログやエッジケースを見つけることができます。
パーサーコードに説明的なコメントを追加する
ステートメントの内容ではなく、ステートメントが重要な理由を説明するコメントをパーサーコードに追加します。このコメントにより、パーサーを維持しているユーザーが、フローを追跡できます。次に例を示します。
# only assign a Namespace if the source address is RFC 1918 or Loopback IP address
if [jsonPayload][id][orig_h] =~ /^(127(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(10(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{3\}$)|(192\.168(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)|(172\.(?:1[6-9]|2\d|3[0-1])(?:\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\{2\}$)/ {
mutate {
replace => {
"event1.idm.read_only_udm.principal.namespace" => "%{resource.labels.project_id}"
}
}
}
中間変数を早めに初期化する
元の未加工ログから値を抽出する前に、テスト値の保存に使用される中間変数を初期化します。
これにより、中間変数が存在しないことを示すエラーが返されなくなります。
次のステートメントは、product 変数の値を metadata.product_name UDM フィールドに割り当てます。
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
product 変数が存在しない場合は、次のエラーが表示されます。
"generic::invalid_argument: pipeline failed: filter mutate (4) failed: replace failure: field \"event1.idm.read_only_udm.metadata.product_name\": source field \"product\": field not set"
on_error ステートメントを追加してエラーをキャッチできます。次に例を示します。
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
on_error => "_error_does_not_exist"
}
上の例のステートメントは、解析エラーを _error_does_not_exist というブール値の中間変数に正常にキャッチします。このことによっても、product 変数を if などの条件文で使用できるようにはなりません。次に例を示します。
if [product] != "" {
mutate{
replace => {
"event1.idm.read_only_udm.metadata.product_name" => "%{product}"
}
}
on_error => "_error_does_not_exist"
}
上の例では、if 条件句が on_error ステートメントをサポートしていないため、次のエラーが返されます。
"generic::invalid_argument: pipeline failed: filter conditional (4) failed: failed to evaluate expression: generic::invalid_argument: "product" not found in state data"
この問題を解決するには、抽出フィルタ(json、csv、xml、kv、grok)を実行する前に中間変数を初期化する別のステートメント ブロックを追加します。次に例を示します。
filter {
# Initialize intermediate variables for any field you will use for a conditional check
mutate {
replace => {
"timestamp" => ""
"does_not_exist" => ""
}
}
# load the logs fields from the message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
}
更新されたパーサーコードのスニペットは、条件文を使用してフィールドが存在するかどうかを確認することで、複数のシナリオを処理します。また、on_error ステートメントは、発生する可能性のあるエラーを処理します。
SHA-256 を base64 に変換する
次の例では、SHA-256 値を抽出し、base64 でエンコードし、 エンコードされたデータを 16 進文字列に変換してから、特定のフィールドを 抽出して処理した値に置き換えます。
if [Sha256] != ""
{
base64
{
encoding => "RawStandard"
source => "Sha256"
target => "base64_sha256"
on_error => "base64_message_error"
}
mutate
{
convert =>
{
"base64_sha256" => "bytestohex"
}
on_error => "already_a_string"
}
mutate
{
replace =>
{
"event.idm.read_only_udm.network.tls.client.certificate.sha256" => "%{base64_sha256}"
"event.idm.read_only_udm.target.resource.name" => "%{Sha256}"
}
}
}
パーサー ステートメントのエラーを処理する
受信ログが予期しないログ形式である場合や、形式が正しくないデータが含まれていることは珍しくありません。
これらのエラーを処理するようにパーサーを構築できます。ベスト プラクティスは、抽出フィルタに on_error ハンドラを追加し、パーサーロジックの次のセグメントに進む前に中間変数をテストすることです。
次の例では、json 抽出フィルタを on_error
ステートメントで使用して、_not_json ブール値変数を設定します。_not_json が true に設定されている場合、受信ログエントリが有効な JSON 形式ではなく、ログエントリが正常に解析されなかったことを意味します。_not_json 変数が false の場合、受信ログエントリは有効な JSON 形式でした。
# load the incoming log from the default message field
json {
source => "message"
array_function => "split_columns"
on_error => "_not_json"
}
フィールドが正しい形式であるかどうかをテストすることもできます。次の例では、_not_json が true に設定されているかどうかを確認します。これは、ログが想定される形式ではないことを示します。
# Test that the received log matches the expected format
if [_not_json] {
drop { tag => "TAG_MALFORMED_MESSAGE" }
} else {
# timestamp is always expected
if [timestamp] != "" {
# ...additional parser logic goes here …
} else {
# if the timestamp field does not exist, it's not a log source
drop { tag => "TAG_UNSUPPORTED" }
}
}
これにより、指定されたログタイプの形式が正しくないログが取り込まれた場合でも、解析が失敗することはありません。
drop フィルタを tag 変数とともに使用して、条件が BigQuery の
取り込み指標テーブルにキャプチャされるようにします。
TAG_UNSUPPORTEDTAG_MALFORMED_ENCODINGTAG_MALFORMED_MESSAGETAG_NO_SECURITY_VALUE
drop フィルタは、パーサーが未加工ログの処理、フィールドの正規化、UDM レコードの作成を行わないようにします。元の未加工ログは Google Security Operations に取り込まれ、Google SecOps の未加工ログ検索を使用して検索できます。
tag 変数に渡された値は、取り込み指標テーブルの drop_reason_code' フィールドに保存されます。次のようなアドホック クエリをテーブルに対して実行できます。
SELECT
log_type,
drop_reason_code,
COUNT(drop_reason_code) AS count
FROM `datalake.ingestion_metrics`
GROUP BY 1,2
ORDER BY 1 ASC
検証エラーのトラブルシューティング
パーサーの作成時に、検証に関連するエラーが発生することがあります。たとえば、必要なフィールドが UDM レコードに設定されていない場合などです。エラーは次のようなものになります。
Error: generic::unknown: invalid event 0: LOG_PARSING_GENERATED_INVALID_EVENT: "generic::invalid_argument: udm validation failed: target field is not set"
パーサーコードは正常に実行されますが、生成された UDM レコードには、metadata.event_type に設定された値で定義されている必要な UDM フィールドがすべて含まれていません。このエラーの原因となる可能性のある追加の例を次に示します。
metadata.event_typeがUSER_LOGINで、target.user valueUDM フィールドが設定されていない場合。metadata.event_typeがNETWORK_CONNECTIONで、target.hostnameUDM フィールドが設定されていない場合。
metadata.event_type UDM フィールドと必須
フィールドの詳細については、UDM 使用ガイドをご覧ください。
このタイプのエラーのトラブルシューティングを行う方法の一つは、最初に UDM フィールドに静的な値を設定することです。 必要な UDM フィールドをすべて定義したら元の元ログを調べ、パースして UDM レコードに保存する値を確認します。元の未加工ログに特定のフィールドが含まれていない場合は、デフォルト値の設定が必要な場合があります。
この方法を示す USER_LOGIN イベントタイプに固有のテンプレートの例を次に示します。
次の点に注意してください。
- テンプレートは中間変数を初期化し、それぞれを静的文字列に設定します。
- [Field Assignment] セクションのコードは、中間変数の値を UDM フィールドに設定します。
中間変数と UDM フィールドを追加して、このコードを拡張できます。 入力する必要がある UDM フィールドをすべて特定したら、次の操作を行います。
[入力構成] セクションで、元の未加工ログからフィールドを抽出し、値を中間変数に設定するコードを追加します。
[Date Extract] セクションで、元の未加工ログからイベント タイムスタンプを抽出し、変換して中間変数に設定するコードを追加します。
必要に応じて、各中間変数に設定された初期化値を空の文字列に置き換えます。
filter {
mutate {
replace => {
# UDM > Metadata
"metadata_event_timestamp" => ""
"metadata_vendor_name" => "Example"
"metadata_product_name" => "Example SSO"
"metadata_product_version" => "1.0"
"metadata_product_event_type" => "login"
"metadata_product_log_id" => "12345678"
"metadata_description" => "A user logged in."
"metadata_event_type" => "USER_LOGIN"
# UDM > Principal
"principal_ip" => "192.168.2.10"
# UDM > Target
"target_application" => "Example Connect"
"target_user_user_display_name" => "Mary Smith"
"target_user_userid" => "mary@example.com"
# UDM > Extensions
"auth_type" => "SSO"
"auth_mechanism" => "USERNAME_PASSWORD"
# UDM > Security Results
"securityResult_action" => "ALLOW"
"security_result.severity" => "LOW"
}
}
# ------------ Input Configuration --------------
# Extract values from the message using one of the extraction filters: json, kv, grok
# ------------ Date Extract --------------
# If the date {} function is not used, the default is the normalization process time
# ------------ Field Assignment --------------
# UDM Metadata
mutate {
replace => {
"event1.idm.read_only_udm.metadata.vendor_name" => "%{metadata_vendor_name}"
"event1.idm.read_only_udm.metadata.product_name" => "%{metadata_product_name}"
"event1.idm.read_only_udm.metadata.product_version" => "%{metadata_product_version}"
"event1.idm.read_only_udm.metadata.product_event_type" => "%{metadata_product_event_type}"
"event1.idm.read_only_udm.metadata.product_log_id" => "%{metadata_product_log_id}"
"event1.idm.read_only_udm.metadata.description" => "%{metadata_description}"
"event1.idm.read_only_udm.metadata.event_type" => "%{metadata_event_type}"
}
}
# Set the UDM > auth fields
mutate {
replace => {
"event1.idm.read_only_udm.extensions.auth.type" => "%{auth_type}"
}
merge => {
"event1.idm.read_only_udm.extensions.auth.mechanism" => "auth_mechanism"
}
}
# Set the UDM > principal fields
mutate {
merge => {
"event1.idm.read_only_udm.principal.ip" => "principal_ip"
}
}
# Set the UDM > target fields
mutate {
replace => {
"event1.idm.read_only_udm.target.user.userid" => "%{target_user_userid}"
"event1.idm.read_only_udm.target.user.user_display_name" => "%{target_user_user_display_name}"
"event1.idm.read_only_udm.target.application" => "%{target_application}"
}
}
# Set the UDM > security_results fields
mutate {
merge => {
"security_result.action" => "securityResult_action"
}
}
# Set the security result
mutate {
merge => {
"event1.idm.read_only_udm.security_result" => "security_result"
}
}
# ------------ Output the event --------------
mutate {
merge => {
"@output" => "event1"
}
}
}
Grok 関数を使用して構造化されていないテキストを解析する
Grok 関数を使用して構造化されていないテキストから値を抽出する場合は、事前定義された Grok パターンと正規表現ステートメントを使用できます。Grok パターンを使用すると、コードが読みやすくなります。正規表現に省略文字(\w、\s など)が含まれていない場合は、ステートメントをコピーしてパーサーコードに直接貼り付けることができます。
Grok パターンはステートメントの追加の抽象化レイヤであるため、エラーが発生した場合のトラブルシューティングが複雑になる可能性があります。次に、事前定義された Grok パターンと正規表現の両方を含む Grok 関数の例を示します。
grok {
match => {
"message" => [
"%{NUMBER:when}\\s+\\d+\\s%{SYSLOGHOST:srcip} %{WORD:action}\\/%{NUMBER:returnCode} %{NUMBER:size} %{WORD:method} (?P<url>\\S+) (?P<username>.*?) %{WORD}\\/(?P<tgtip>\\S+).*"
]
}
}
Grok パターンを使用しない抽出ステートメントの方がパフォーマンスが向上する可能性があります。たとえば、次の例では、一致する処理ステップが半分以下で済みます。ログソースのボリュームが大きくなる可能性がある場合は、この点を考慮することが重要です。
RE2 正規表現と PCRE 正規表現の違いを理解する
Google SecOps パーサーは、正規表現エンジンとして RE2 を使用します。PCRE 構文に慣れている場合は、違いに気づくかもしれません。次に例を示します。
PCRE ステートメントは (?<_custom_field>\w+)\s です。
パーサーコードの RE2 ステートメントは (?P<_custom_field>\\w+)\\s です。
エスケープ文字を必ずエスケープしてください
Google SecOps は、受信した未加工ログデータを JSON エンコード形式で保存します。これは、正規表現の省略文字のように見える文字列がリテラル文字列として解釈されるようにするためです。たとえば、\t はタブ文字ではなくリテラル文字列として解釈されます。
次の例は、元の未加工ログと JSON エンコードされた形式のログを示しています。entry という用語を囲む各バックスラッシュ文字の前にエスケープ文字が追加されていることに注意してください。
元の未加工ログは次のとおりです。
field=\entry\
以下に示すようにログが JSON エンコード形式に変換されます。
field=\\entry\\
パーサーコードで正規表現を使用する場合は、値のみを抽出するために追加のエスケープ文字を追加する必要があります。元の未加工ログのバックスラッシュと一致するには、抽出ステートメントで 4 つのバックスラッシュを使用します。
パーサーコードの正規表現は次のとおりです。
^field=\\\\(?P<_value>.*)\\\\$
生成される結果は次のとおりです。_value という名前付きグループには、entry という用語が保存されます。
"_value": "entry"
標準の正規表現ステートメントをパーサーコードに移動する場合は、抽出ステートメントで正規表現の省略文字をエスケープします。
たとえば、\s を \\s に変更します。
抽出ステートメントで二重エスケープする場合は、正規表現の特殊文字を変更しないままにします。たとえば、\\ は \\ として変更されない状態で保持されます。
標準の正規表現は次のとおりです。
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
次の正規表現は、パーサーコード内で機能するように変更されています。
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
次の表に、標準の正規表現をパーサーコードに含める前に、追加のエスケープ文字を含める必要がある場合をまとめます。
| 正規表現 | パーサーコード用に変更された正規表現 | 変更の説明 |
|---|---|---|
\s |
\\s |
省略文字はエスケープする必要があります。 |
\. |
\\. |
予約文字はエスケープする必要があります。 |
\\" |
\\\" |
予約文字はエスケープする必要があります。 |
\] |
\\] |
予約文字はエスケープする必要があります。 |
\| |
\\| |
予約文字はエスケープする必要があります。 |
[^\\]+ |
[^\\\\]+ |
文字クラス グループ内の特殊文字はエスケープする必要があります。 |
\\\\ |
\\\\ |
文字クラス グループ外の特殊文字または省略文字には、追加のエスケープは必要ありません。 |
正規表現に名前付きキャプチャ グループを必ず含める
正規表現("^.*$" など)は有効な RE2 構文です。ただし、パーサーコードでは次のエラーが発生します。
"ParseLogEntry failed: pipeline failed: filter grok (0) failed: failed to parse data with all match
patterns"
有効なキャプチャ グループを式に追加する必要があります。 Grok パターンを使用する場合、デフォルトでは名前付きキャプチャ グループが含まれます。 正規表現のオーバーライドを使用する場合は、名前付きグループを含めるようにしてください。
パーサーコードの正規表現の例を次に示します。
"^(?P<_catchall>.*$)"
結果を次に示します。_catchall という名前付きグループに割り当てられたテキストが表示されます。
"_catchall": "User \"BOB\" logged on to workstation \"DESKTOP-01\"."
正規表現の作成を開始する際に、キャッチオールの名前付きグループを使用する
抽出ステートメントを作成するときは、必要なものよりも多くをキャッチする式から始めます。次に、式を 1 つのフィールドずつ展開します。
次の例では、メッセージ全体に一致する名前付きグループ(_catchall)を使用することから開始しています。 次に、テキストの追加部分を照合して、式を段階的に作成します。各ステップで、_catchall という名前の名前付きグループは含まれている元のテキストが少ない状態です。 _catchall という名前付きグループが不要になるまで、メッセージを照合するステップを 1 つずつ繰り返します。
| ステップ | パーサーコード内の正規表現 | _catchall という名前付きキャプチャ グループの出力 |
|---|---|---|
| 1 | "^(?P<_catchall>.*$)" |
User \"BOB\" logged on to workstation \"DESKTOP-01\". |
| 2 | ^User\s\\\"(?P<_catchall>.*$) |
BOB\" logged on to workstation \"DESKTOP-01\". |
| 3 | ^User\s\\\"(?P<_user>.*?)\\\"\s(?P<_catchall>.*$) |
logged on to workstation \"DESKTOP-01\". |
| 式がテキスト文字列全体と一致するまで続けます。 | ||
正規表現の省略文字をエスケープする
パーサーコードで式を使用する場合は、正規表現の簡略文字をエスケープしてください。 次に、テキスト文字列の例と、最初の単語 This を抽出する標準の正規表現を示します。
This is a sample log.
次の標準の正規表現は、最初の単語 This を抽出します。
ただし、この式をパーサーコードで実行すると、結果から文字 s が欠落します。
| 標準の正規表現 | _firstWord という名前付きキャプチャ グループの出力 |
|---|---|
"^(?P<_firstWord>[^\s]+)\s.*$" |
"_firstWord": "Thi", |
これは、パーサーコードの正規表現では、省略文字に追加のエスケープ文字が必要になるためです。前の例では、\s を \\s に変更する必要があります。
| 改定後のパーサーコードの正規表現 | _firstWord という名前付きキャプチャ グループの出力 |
|---|---|
"^(?P<_firstWord>[^\\s]+)\\s.*$" |
"_firstWord": "This", |
これは、\s、\r、\t などの省略文字にのみ適用されます。「``」などの他の文字については、それ以上エスケープする必要はありません。
数字で始まるメールアドレスを抽出する Grok パターン
顧客固有のパーサーと コード スニペット パーサー拡張機能では、数字で始まるメールアドレスを抽出するために EMAILADDRESS Grok パターンを使用しないでください。回避策として、DATA Grok パターンを使用し、マッピング段階で正規表現の検証を適用できます。
完全なコード例
このセクションでは、前のルールをエンドツーエンドの例として説明します。次に、構造化されていないテキスト文字列と、文字列を解析するために記述された標準の正規表現を示します。最後に、パーサーコードで機能する変更された正規表現が含まれています。
元のテキスト文字列は次のとおりです。
User "BOB" logged on to workstation "DESKTOP-01".
以下に示すのは、テキスト文字列を解析する標準の RE2 正規表現です。
^.*?\\\"(?P<_user>[^\\]+)\\\"\s(?:(logged\son|logged\soff))\s.*?\\\"(?P<_device>[^\\]+)\\\"\.$
この式は次のフィールドを抽出します。
| 一致グループ | 文字位置 | テキスト文字列 |
|---|---|---|
| 完全一致 | 0-53 | User \"BOB\" logged on to workstation \"DESKTOP-01\". |
| グループ「_user」 | 7-10 | BOB |
| グループ 2 | 13-22 | logged on |
| Group `_device` | 40-50 | DESKTOP-01 |
これは変更された式です。標準の RE2 正規表現は、パーサーコードで機能するように変更されました。
^.*?\\\"(?P<_user>[^\\\\]+)\\\"\\s(?:(logged\\son|logged\\soff))\\s.*?\\\"(?P<_device>[^\\\\]+)\\\"\\.$
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。