Apache Hadoop のログを収集する

以下でサポートされています。

このドキュメントでは、Bindplane を使用して Apache Hadoop ログを Google Security Operations に取り込む方法について説明します。パーサーは、まず一般的な Hadoop ログ形式に基づいて Grok パターンを使用して、未加工の Hadoop ログからフィールドを抽出します。次に、抽出されたフィールドを Unified Data Model(UDM)スキーマの対応するフィールドにマッピングし、データ型の変換を行い、追加のコンテキストでデータを拡充します。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス
  • Windows 2016 以降、または systemd を使用する Linux ホスト
  • プロキシの背後で実行している場合は、Bindplane エージェントの要件に従ってファイアウォール ポートが開いていることを確認します。
  • Apache Hadoop クラスタ構成ファイルへの特権アクセス

Google SecOps の取り込み認証ファイルを取得する

  1. Google SecOps コンソールにログインします。
  2. [SIEM 設定] > [コレクション エージェント] に移動します。
  3. Ingestion Authentication File をダウンロードします。Bindplane をインストールするシステムにファイルを安全に保存します。

Google SecOps のお客様 ID を取得する

  1. Google SecOps コンソールにログインします。
  2. [SIEM 設定] > [プロファイル] に移動します。
  3. [組織の詳細情報] セクションから [お客様 ID] をコピーして保存します。

Bindplane エージェントをインストールする

次の手順に沿って、Windows または Linux オペレーティング システムに Bindplane エージェントをインストールします。

Windows のインストール

  1. 管理者として コマンド プロンプトまたは PowerShell を開きます。
  2. 次のコマンドを実行します。

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Linux のインストール

  1. root 権限または sudo 権限でターミナルを開きます。
  2. 次のコマンドを実行します。

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

その他のインストール リソース

Syslog を取り込んで Google SecOps に送信するように Bindplane エージェントを構成する

  1. 構成ファイルにアクセスします。

    1. config.yaml ファイルを見つけます。通常、Linux では /etc/bindplane-agent/ ディレクトリに、Windows ではインストール ディレクトリにあります。
    2. テキスト エディタ(nanovi、メモ帳など)を使用してファイルを開きます。
  2. config.yaml ファイルを次のように編集します。

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'HADOOP'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    
    • 自社のインフラストラクチャでの必要性に応じて、ポートと IP アドレスを置き換えます。
    • <CUSTOMER_ID> は、実際の顧客 ID に置き換えます。
    • /path/to/ingestion-authentication-file.json の値を、Google SecOps の取り込み認証ファイルを取得するで認証ファイルを保存したパスに更新します。

Bindplane エージェントを再起動して変更を適用する

  • Linux で Bindplane エージェントを再起動するには、次のコマンドを実行します。

    sudo systemctl restart bindplane-agent
    
  • Windows で Bindplane エージェントを再起動するには、Services コンソールを使用するか、次のコマンドを入力します。

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Apache Hadoop で Syslog 転送を構成する

Apache Hadoop はロギングに Log4j を使用します。Log4j のバージョンに基づいて適切な Syslog アペンダーを構成し、Hadoop デーモン(NameNode、DataNode、ResourceManager、NodeManager など)がログを syslog レシーバー(Bindplane ホスト)に直接転送するようにします。Log4j はファイルで構成されます(ウェブ UI はありません)。

オプション 1: Log4j 1.x の構成

  1. log4j.properties ファイル(通常は $HADOOP_CONF_DIR/log4j.properties にあります)を見つけます。
  2. ファイルに次の SyslogAppender 構成を追加します。

    # Syslog appender (UDP example)
    log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
    log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514
    log4j.appender.SYSLOG.Facility=LOCAL0
    log4j.appender.SYSLOG.FacilityPrinting=true
    log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
    log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n
    
    # Example: send NameNode logs to syslog
    log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false
    
    # Or attach to root logger to send all Hadoop logs
    # log4j.rootLogger=INFO, SYSLOG
    
  3. <BINDPLANE_HOST_IP> は、Bindplane ホストの IP アドレスに置き換えます。

  4. ファイルを保存します。

  5. Hadoop デーモンを再起動して、構成の変更を適用します。

オプション 2: Log4j 2.x の構成

  1. log4j2.xml ファイル(通常は $HADOOP_CONF_DIR/log4j2.xml にあります)を見つけます。
  2. ファイルに次の Syslog アペンダー構成を追加します。

    <Configuration status="WARN">
      <Appenders>
        <!-- UDP example; for TCP use protocol="TCP" -->
        <Syslog name="SYSLOG" format="RFC5424"
                host="<BINDPLANE_HOST_IP>"
                port="514"
                protocol="UDP"
                facility="LOCAL0"
                appName="hadoop"
                enterpriseNumber="18060"
                mdcId="mdc">
          <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <!-- Send NameNode logs to syslog -->
        <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false">
          <AppenderRef ref="SYSLOG"/>
        </Logger>
    
        <!-- Or send all Hadoop logs -->
        <Root level="info">
          <AppenderRef ref="SYSLOG"/>
        </Root>
      </Loggers>
    </Configuration>
    
    • <BINDPLANE_HOST_IP> は、Bindplane ホストの IP アドレスに置き換えます。
  3. ファイルを保存します。

  4. Hadoop デーモンを再起動して、構成の変更を適用します。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
許可 security_result.action 「false」の場合、アクションは「BLOCK」です。「true」の場合、アクションは「ALLOW」です。
auth_type additional.fields.key = "auth_type", additional.fields.value.string_value Grok パターン「%{DATA:suser}@.*auth:%{WORD:auth_type}」を使用して、「ugi」フィールドから抽出されます。括弧と「auth:」が削除されます。
call additional.fields.key = "Call#", additional.fields.value.string_value 直接マッピングされます。
call_context additional.fields.key = "callerContext", additional.fields.value.string_value 直接マッピングされます。
cliIP principal.ip 「json_data」フィールドが存在し、JSON として正常に解析された場合にのみマッピングされます。
cmd principal.process.command_line 直接マッピングされます。
cluster_name target.hostname ターゲット ホスト名として使用されます(存在する場合)。
metadata.event_timestamp.seconds 月、年、時、分、秒とともに使用して、event_timestamp を作成します。
説明 metadata.description 直接マッピングされます。
driver additional.fields.key = "driver", additional.fields.value.string_value 直接マッピングされます。
dst target.ip OR target.hostname OR target.file.full_path IP として正常に解析された場合は、ターゲット IP にマッピングされます。値が「/user」で始まる場合は、ターゲット ファイルパスにマッピングされます。それ以外の場合は、ターゲット ホスト名にマッピングされます。
dstport target.port 直接マッピングされ、整数に変換されます。
enforcer security_result.rule_name 直接マッピングされます。
event_count additional.fields.key = "event_count", additional.fields.value.string_value 直接マッピングされ、文字列に変換されます。
fname src.file.full_path 直接マッピングされます。
時間 metadata.event_timestamp.seconds 月、日、年、分、秒とともに使用して、event_timestamp を作成します。
id additional.fields.key = "id", additional.fields.value.string_value 直接マッピングされます。
ip principal.ip 先頭の「/」文字を削除してから、プリンシパル IP にマッピングされます。
json_data JSON として解析されます。抽出されたフィールドは、対応する UDM フィールドにマッピングされます。
logType additional.fields.key = "logType"、additional.fields.value.string_value 直接マッピングされます。
メッセージ grok パターンを使用してさまざまなフィールドを抽出するために使用されます。
method network.http.method 直接マッピングされます。
metadata.event_timestamp.seconds 月、日、年、時、秒とともに使用して、event_timestamp を作成します。
metadata.event_timestamp.seconds 日、年、時、分、秒とともに使用され、event_timestamp を構築します。
observer observer.hostname または observer.ip IP として正常に解析された場合は、オブザーバー IP にマッピングされます。それ以外の場合は、オブザーバーのホスト名にマッピングされます。
perm additional.fields.key = "perm", additional.fields.value.string_value 直接マッピングされます。
ポリシー security_result.rule_id 直接マッピングされ、文字列に変換されます。
product metadata.product_name 直接マッピングされます。
product_event metadata.product_event_type 直接マッピングされます。「rename」の場合、「dst」フィールドは「target.file.full_path」にマッピングされます。
proto network.application_protocol 「webhdfs」でない場合、直接マッピングされ、大文字に変換されます。
reason security_result.summary 直接マッピングされます。
リポジトリ additional.fields.key = "repo", additional.fields.value.string_value 直接マッピングされます。
resType additional.fields.key = "resType", additional.fields.value.string_value 直接マッピングされます。
結果 additional.fields.key = "result", additional.fields.value.string_value 直接マッピングされ、文字列に変換されます。
再試行 additional.fields.key = "Retry#"、additional.fields.value.string_value 直接マッピングされます。
metadata.event_timestamp.seconds 月、日、年、時、分とともに使用され、event_timestamp を作成します。
seq_num additional.fields.key = "seq_num", additional.fields.value.string_value 直接マッピングされ、文字列に変換されます。
重要度 security_result.severity 値に基づいて、さまざまな重大度レベルにマッピングされます。「INFO」、「Info」、「info」->「INFORMATIONAL」、「Low」、「low」、「LOW」->「LOW」、「error」、「Error」、「WARN」、「Warn」->「MEDIUM」、「High」、「high」、「HIGH」->「HIGH」、「Critical」、「critical」、「CRITICAL」->「CRITICAL」。
shost principal.hostname 「src」と異なる場合は、プリンシパル ホスト名として使用されます。
src principal.ip OR principal.hostname OR observer.ip IP として正常に解析された場合は、プリンシパルとオブザーバーの IP にマッピングされます。それ以外の場合は、プリンシパル ホスト名にマッピングされます。
srcport principal.port 直接マッピングされ、整数に変換されます。
概要 security_result.summary 直接マッピングされます。
suser principal.user.userid 直接マッピングされます。
タグ additional.fields.key = "tags", additional.fields.value.string_value 直接マッピングされます。
フォローします。 additional.fields.key = "thread", additional.fields.value.string_value 直接マッピングされます。
チップ target.ip 直接マッピングされます。
ugi target.hostname 「log_data」フィールドに「·」が含まれていない場合、ターゲット ホスト名として使用されます。
URL target.url 直接マッピングされます。
vendor metadata.vendor_name 直接マッピングされます。
version metadata.product_version 直接マッピングされます。
metadata.event_timestamp.seconds 月、日、時、分、秒とともに使用され、event_timestamp を構築します。
なし metadata.event_type デフォルトは「NETWORK_CONNECTION」に設定されます。ターゲットが特定されない場合は「STATUS_UPDATE」に変更されます。
なし metadata.log_type 「HADOOP」に設定します。
なし security_result.alert_state 重大度が「HIGH」または「CRITICAL」の場合は、「ALERTING」に設定します。
なし is_alert 重大度が「HIGH」または「CRITICAL」の場合は true に設定します。
なし is_significant 重大度が「HIGH」または「CRITICAL」の場合は true に設定します。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。