Apache Hadoop のログを収集する
このドキュメントでは、Bindplane を使用して Apache Hadoop ログを Google Security Operations に取り込む方法について説明します。パーサーは、まず一般的な Hadoop ログ形式に基づいて Grok パターンを使用して、未加工の Hadoop ログからフィールドを抽出します。次に、抽出されたフィールドを Unified Data Model(UDM)スキーマの対応するフィールドにマッピングし、データ型の変換を行い、追加のコンテキストでデータを拡充します。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- Windows 2016 以降、または
systemdを使用する Linux ホスト - プロキシの背後で実行している場合は、Bindplane エージェントの要件に従ってファイアウォール ポートが開いていることを確認します。
- Apache Hadoop クラスタ構成ファイルへの特権アクセス
Google SecOps の取り込み認証ファイルを取得する
- Google SecOps コンソールにログインします。
- [SIEM 設定] > [コレクション エージェント] に移動します。
- Ingestion Authentication File をダウンロードします。Bindplane をインストールするシステムにファイルを安全に保存します。
Google SecOps のお客様 ID を取得する
- Google SecOps コンソールにログインします。
- [SIEM 設定] > [プロファイル] に移動します。
- [組織の詳細情報] セクションから [お客様 ID] をコピーして保存します。
Bindplane エージェントをインストールする
次の手順に沿って、Windows または Linux オペレーティング システムに Bindplane エージェントをインストールします。
Windows のインストール
- 管理者として コマンド プロンプトまたは PowerShell を開きます。
次のコマンドを実行します。
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Linux のインストール
- root 権限または sudo 権限でターミナルを開きます。
次のコマンドを実行します。
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
その他のインストール リソース
- その他のインストール オプションについては、こちらのインストール ガイドをご覧ください。
Syslog を取り込んで Google SecOps に送信するように Bindplane エージェントを構成する
構成ファイルにアクセスします。
config.yamlファイルを見つけます。通常、Linux では/etc/bindplane-agent/ディレクトリに、Windows ではインストール ディレクトリにあります。- テキスト エディタ(
nano、vi、メモ帳など)を使用してファイルを開きます。
config.yamlファイルを次のように編集します。receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'HADOOP' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- 自社のインフラストラクチャでの必要性に応じて、ポートと IP アドレスを置き換えます。
<CUSTOMER_ID>は、実際の顧客 ID に置き換えます。/path/to/ingestion-authentication-file.jsonの値を、Google SecOps の取り込み認証ファイルを取得するで認証ファイルを保存したパスに更新します。
Bindplane エージェントを再起動して変更を適用する
Linux で Bindplane エージェントを再起動するには、次のコマンドを実行します。
sudo systemctl restart bindplane-agentWindows で Bindplane エージェントを再起動するには、Services コンソールを使用するか、次のコマンドを入力します。
net stop BindPlaneAgent && net start BindPlaneAgent
Apache Hadoop で Syslog 転送を構成する
Apache Hadoop はロギングに Log4j を使用します。Log4j のバージョンに基づいて適切な Syslog アペンダーを構成し、Hadoop デーモン(NameNode、DataNode、ResourceManager、NodeManager など)がログを syslog レシーバー(Bindplane ホスト)に直接転送するようにします。Log4j はファイルで構成されます(ウェブ UI はありません)。
オプション 1: Log4j 1.x の構成
- log4j.properties ファイル(通常は
$HADOOP_CONF_DIR/log4j.propertiesにあります)を見つけます。 ファイルに次の SyslogAppender 構成を追加します。
# Syslog appender (UDP example) log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514 log4j.appender.SYSLOG.Facility=LOCAL0 log4j.appender.SYSLOG.FacilityPrinting=true log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n # Example: send NameNode logs to syslog log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false # Or attach to root logger to send all Hadoop logs # log4j.rootLogger=INFO, SYSLOG<BINDPLANE_HOST_IP>は、Bindplane ホストの IP アドレスに置き換えます。ファイルを保存します。
Hadoop デーモンを再起動して、構成の変更を適用します。
オプション 2: Log4j 2.x の構成
- log4j2.xml ファイル(通常は
$HADOOP_CONF_DIR/log4j2.xmlにあります)を見つけます。 ファイルに次の Syslog アペンダー構成を追加します。
<Configuration status="WARN"> <Appenders> <!-- UDP example; for TCP use protocol="TCP" --> <Syslog name="SYSLOG" format="RFC5424" host="<BINDPLANE_HOST_IP>" port="514" protocol="UDP" facility="LOCAL0" appName="hadoop" enterpriseNumber="18060" mdcId="mdc"> <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/> </Syslog> </Appenders> <Loggers> <!-- Send NameNode logs to syslog --> <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false"> <AppenderRef ref="SYSLOG"/> </Logger> <!-- Or send all Hadoop logs --> <Root level="info"> <AppenderRef ref="SYSLOG"/> </Root> </Loggers> </Configuration><BINDPLANE_HOST_IP>は、Bindplane ホストの IP アドレスに置き換えます。
ファイルを保存します。
Hadoop デーモンを再起動して、構成の変更を適用します。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック |
|---|---|---|
| 許可 | security_result.action | 「false」の場合、アクションは「BLOCK」です。「true」の場合、アクションは「ALLOW」です。 |
| auth_type | additional.fields.key = "auth_type", additional.fields.value.string_value | Grok パターン「%{DATA:suser}@.*auth:%{WORD:auth_type}」を使用して、「ugi」フィールドから抽出されます。括弧と「auth:」が削除されます。 |
| call | additional.fields.key = "Call#", additional.fields.value.string_value | 直接マッピングされます。 |
| call_context | additional.fields.key = "callerContext", additional.fields.value.string_value | 直接マッピングされます。 |
| cliIP | principal.ip | 「json_data」フィールドが存在し、JSON として正常に解析された場合にのみマッピングされます。 |
| cmd | principal.process.command_line | 直接マッピングされます。 |
| cluster_name | target.hostname | ターゲット ホスト名として使用されます(存在する場合)。 |
| 日 | metadata.event_timestamp.seconds | 月、年、時、分、秒とともに使用して、event_timestamp を作成します。 |
| 説明 | metadata.description | 直接マッピングされます。 |
| driver | additional.fields.key = "driver", additional.fields.value.string_value | 直接マッピングされます。 |
| dst | target.ip OR target.hostname OR target.file.full_path | IP として正常に解析された場合は、ターゲット IP にマッピングされます。値が「/user」で始まる場合は、ターゲット ファイルパスにマッピングされます。それ以外の場合は、ターゲット ホスト名にマッピングされます。 |
| dstport | target.port | 直接マッピングされ、整数に変換されます。 |
| enforcer | security_result.rule_name | 直接マッピングされます。 |
| event_count | additional.fields.key = "event_count", additional.fields.value.string_value | 直接マッピングされ、文字列に変換されます。 |
| fname | src.file.full_path | 直接マッピングされます。 |
| 時間 | metadata.event_timestamp.seconds | 月、日、年、分、秒とともに使用して、event_timestamp を作成します。 |
| id | additional.fields.key = "id", additional.fields.value.string_value | 直接マッピングされます。 |
| ip | principal.ip | 先頭の「/」文字を削除してから、プリンシパル IP にマッピングされます。 |
| json_data | JSON として解析されます。抽出されたフィールドは、対応する UDM フィールドにマッピングされます。 | |
| logType | additional.fields.key = "logType"、additional.fields.value.string_value | 直接マッピングされます。 |
| メッセージ | grok パターンを使用してさまざまなフィールドを抽出するために使用されます。 | |
| method | network.http.method | 直接マッピングされます。 |
| 分 | metadata.event_timestamp.seconds | 月、日、年、時、秒とともに使用して、event_timestamp を作成します。 |
| 月 | metadata.event_timestamp.seconds | 日、年、時、分、秒とともに使用され、event_timestamp を構築します。 |
| observer | observer.hostname または observer.ip | IP として正常に解析された場合は、オブザーバー IP にマッピングされます。それ以外の場合は、オブザーバーのホスト名にマッピングされます。 |
| perm | additional.fields.key = "perm", additional.fields.value.string_value | 直接マッピングされます。 |
| ポリシー | security_result.rule_id | 直接マッピングされ、文字列に変換されます。 |
| product | metadata.product_name | 直接マッピングされます。 |
| product_event | metadata.product_event_type | 直接マッピングされます。「rename」の場合、「dst」フィールドは「target.file.full_path」にマッピングされます。 |
| proto | network.application_protocol | 「webhdfs」でない場合、直接マッピングされ、大文字に変換されます。 |
| reason | security_result.summary | 直接マッピングされます。 |
| リポジトリ | additional.fields.key = "repo", additional.fields.value.string_value | 直接マッピングされます。 |
| resType | additional.fields.key = "resType", additional.fields.value.string_value | 直接マッピングされます。 |
| 結果 | additional.fields.key = "result", additional.fields.value.string_value | 直接マッピングされ、文字列に変換されます。 |
| 再試行 | additional.fields.key = "Retry#"、additional.fields.value.string_value | 直接マッピングされます。 |
| 秒 | metadata.event_timestamp.seconds | 月、日、年、時、分とともに使用され、event_timestamp を作成します。 |
| seq_num | additional.fields.key = "seq_num", additional.fields.value.string_value | 直接マッピングされ、文字列に変換されます。 |
| 重要度 | security_result.severity | 値に基づいて、さまざまな重大度レベルにマッピングされます。「INFO」、「Info」、「info」->「INFORMATIONAL」、「Low」、「low」、「LOW」->「LOW」、「error」、「Error」、「WARN」、「Warn」->「MEDIUM」、「High」、「high」、「HIGH」->「HIGH」、「Critical」、「critical」、「CRITICAL」->「CRITICAL」。 |
| shost | principal.hostname | 「src」と異なる場合は、プリンシパル ホスト名として使用されます。 |
| src | principal.ip OR principal.hostname OR observer.ip | IP として正常に解析された場合は、プリンシパルとオブザーバーの IP にマッピングされます。それ以外の場合は、プリンシパル ホスト名にマッピングされます。 |
| srcport | principal.port | 直接マッピングされ、整数に変換されます。 |
| 概要 | security_result.summary | 直接マッピングされます。 |
| suser | principal.user.userid | 直接マッピングされます。 |
| タグ | additional.fields.key = "tags", additional.fields.value.string_value | 直接マッピングされます。 |
| フォローします。 | additional.fields.key = "thread", additional.fields.value.string_value | 直接マッピングされます。 |
| チップ | target.ip | 直接マッピングされます。 |
| ugi | target.hostname | 「log_data」フィールドに「·」が含まれていない場合、ターゲット ホスト名として使用されます。 |
| URL | target.url | 直接マッピングされます。 |
| vendor | metadata.vendor_name | 直接マッピングされます。 |
| version | metadata.product_version | 直接マッピングされます。 |
| 年 | metadata.event_timestamp.seconds | 月、日、時、分、秒とともに使用され、event_timestamp を構築します。 |
| なし | metadata.event_type | デフォルトは「NETWORK_CONNECTION」に設定されます。ターゲットが特定されない場合は「STATUS_UPDATE」に変更されます。 |
| なし | metadata.log_type | 「HADOOP」に設定します。 |
| なし | security_result.alert_state | 重大度が「HIGH」または「CRITICAL」の場合は、「ALERTING」に設定します。 |
| なし | is_alert | 重大度が「HIGH」または「CRITICAL」の場合は true に設定します。 |
| なし | is_significant | 重大度が「HIGH」または「CRITICAL」の場合は true に設定します。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。