Cisco Application Centric Infrastructure(ACI)のログを収集する

以下でサポートされています。

このドキュメントでは、Cisco Application Centric Infrastructure(ACI)ログを Google Security Operations に取り込む方法について説明します。パーサーは、まず Grok パターンを使用して、着信した Cisco ACI ログを syslog メッセージとして処理しようとします。syslog の解析に失敗すると、メッセージが JSON 形式であると想定して、それに応じて解析します。最後に、抽出されたフィールドを統合データモデル(UDM)にマッピングします。

この統合では、次の 2 つの方法がサポートされています。

  • オプション 1: Bindplane エージェントを使用した Syslog 形式
  • オプション 2: APIC REST API を使用して Google Cloud Storage を使用する JSON 形式

各オプションは自己完結型であり、インフラストラクチャの要件とログ形式の好みに基づいて個別に実装できます。

オプション 1: Bindplane エージェントを使用した Syslog

このオプションは、Syslog メッセージを Bindplane エージェントに送信するように Cisco ACI ファブリックを構成します。Bindplane エージェントは、分析のために Google Security Operations にメッセージを転送します。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス
  • systemd を使用する Windows 2016 以降または Linux ホスト
  • プロキシの背後で実行している場合は、Bindplane エージェントの要件に従ってファイアウォール ポートが開いていることを確認します。
  • Cisco APIC コンソールへの特権アクセス

Google SecOps の取り込み認証ファイルを取得する

  1. [SIEM 設定] > [収集エージェント] に移動します。
  2. Ingestion Authentication File をダウンロードします。
  3. Bindplane をインストールするシステムにファイルを安全に保存します。

Google SecOps のお客様 ID を取得する

  1. [SIEM 設定] > [プロファイル] に移動します。
  2. [組織の詳細情報] セクションから [お客様 ID] をコピーして保存します。

Bindplane エージェントをインストールする

次の手順に沿って、Windows または Linux オペレーティング システムに Bindplane エージェントをインストールします。

Windows のインストール

  1. 管理者としてコマンド プロンプトまたは PowerShell を開きます。
  2. 次のコマンドを実行します。

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Linux のインストール

  1. root 権限または sudo 権限でターミナルを開きます。
  2. 次のコマンドを実行します。

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

その他のインストール オプションについては、Bindplane エージェントのインストール ガイドをご覧ください。

Syslog を取り込んで Google SecOps に送信するように Bindplane エージェントを構成する

構成ファイルにアクセスする

  1. config.yaml ファイルを見つけます。通常、Linux では /etc/bindplane-agent/ ディレクトリに、Windows ではインストール ディレクトリにあります。
  2. テキスト エディタ(nanovi、メモ帳など)を使用してファイルを開きます。
  3. config.yaml ファイルを編集します。

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'CISCO_ACI'
        raw_log_field: body
        ingestion_labels:
          service:
    
    pipelines:
      logs/source0__chronicle_w_labels-0:
        receivers:
          - udplog
        exporters:
          - chronicle/chronicle_w_labels
    
    • 次のように置き換えます。
      • 自社のインフラストラクチャでの必要性に応じて、ポートと IP アドレスを置き換えます。
      • <CUSTOMER_ID> は、実際の顧客 ID に置き換えます。
      • /path/to/ingestion-authentication-file.json の値を、認証ファイルを保存したパスに更新します。

Bindplane エージェントを再起動して変更を適用する

  • Linux で Bindplane エージェントを再起動するには、次のコマンドを実行します。

    sudo systemctl restart bindplane-agent
    
  • Windows で Bindplane エージェントを再起動するには、サービス コンソールを使用するか、次のコマンドを入力します。

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Cisco ACI で Syslog 転送を構成する

帯域外管理契約を構成する

  1. Cisco APIC コンソールにログインします。
  2. [テナント > 管理 > 契約 > フィルタ] に移動します。
  3. [フィルタを作成] をクリックします。
  4. 次の構成の詳細を指定します。
    • 名前: 「syslog-udp-514」と入力します。
    • エントリ名: 「syslog」と入力します。
    • EtherType: [IP] を選択します。
    • IP Protocol: [UDP] を選択します。
    • 宛先ポートの範囲(開始): 514 と入力します。
    • 宛先ポートの範囲(終了): 「514」と入力します。
  5. [送信] をクリックします。

管理契約を作成する

  1. [テナント> 管理 > 契約 > 標準] に移動します。
  2. [Create Contract] をクリックします。
  3. 次の構成の詳細を指定します。
    • 名前: 「mgmt-syslog-contract」と入力します。
    • スコープ: [コンテキスト] を選択します。
  4. [送信] をクリックします。
  5. 契約を開き、[Subjects] をクリックします。
  6. [Create Contract Subject] をクリックします。
  7. 次の構成の詳細を指定します。
    • 名前: 「syslog-subject」と入力します。
    • 両方向に適用: このオプションをオンにします。
  8. [送信] をクリックします。
  9. 件名を開き、[フィルタ] をクリックします。
  10. [フィルタ バインディングを作成] をクリックします。
  11. syslog-udp-514 フィルタを選択します。
  12. [送信] をクリックします。

Syslog 宛先グループを構成する

  1. [管理者] > [外部データ コレクタ] > [モニタリングの宛先] > [Syslog] に移動します。
  2. [Syslog] を右クリックし、[Syslog モニタリング移行先グループを作成] を選択します。
  3. 次の構成の詳細を指定します。
    • 名前: 「Chronicle-Syslog-Group」と入力します。
    • Admin State: [Enabled] を選択します。
    • 形式: aci を選択します。
  4. [次へ] をクリックします。
  5. [Syslog モニタリングの宛先を作成] ダイアログで、次の操作を行います。
    • 名前: 「Chronicle-BindPlane」と入力します。
    • ホスト: Bindplane エージェント サーバーの IP アドレスを入力します。
    • ポート: 「514」と入力します。
    • Admin State: [Enabled] を選択します。
    • 重大度: [情報] を選択します(詳細なログをキャプチャするため)。
  6. [送信] をクリックします。

モニタリング ポリシーを構成する

ファブリック モニタリング ポリシー
  1. [Fabric] > [Fabric Policies] > [Policies] > [Monitoring] > [Common Policy] に移動します。
  2. [Callhome/Smart Callhome/SNMP/Syslog/TACACS] を展開します。
  3. [Syslog] を右クリックし、[Create Syslog Source] を選択します。
  4. 次の構成の詳細を指定します。
    • 名前: 「Chronicle-Fabric-Syslog」と入力します。
    • 監査ログ: 監査イベントを含める場合にオンにします。
    • イベント: システム イベントを含める場合にオンにします。
    • Faults: 障害イベントを含める場合にオンにします。
    • セッションログ: セッションログを含める場合にオンにします。
    • 宛先グループ: Chronicle-Syslog-Group を選択します。
  5. [送信] をクリックします。
アクセス モニタリング ポリシー
  1. [Fabric] > [Access Policies] > [Policies] > [Monitoring] > [Default Policy] に移動します。
  2. [Callhome/Smart Callhome/SNMP/Syslog] を開きます。
  3. [Syslog] を右クリックし、[Create Syslog Source] を選択します。
  4. 次の構成の詳細を指定します。
    • 名前: 「Chronicle-Access-Syslog」と入力します。
    • 監査ログ: 監査イベントを含める場合にオンにします。
    • イベント: システム イベントを含める場合にオンにします。
    • Faults: 障害イベントを含める場合にオンにします。
    • セッションログ: セッションログを含める場合にオンにします。
    • 宛先グループ: Chronicle-Syslog-Group を選択します。
  5. [送信] をクリックします。

システム Syslog メッセージ ポリシーを構成する

  1. [Fabric] > [Fabric Policies] > [Policies] > [Monitoring] > [Common Policy] に移動します。
  2. [Syslog Messages Policies] を開きます。
  3. [default] をクリックします。
  4. [施設フィルタ] セクションで次の操作を行います。
    • ファシリティ: [default] を選択します。
    • 最小重大度: [情報] に変更します。
  5. [送信] をクリックします。

オプション 2: Google Cloud Storage を使用した JSON

このオプションでは、APIC REST API を使用して、Cisco ACI ファブリックから JSON 形式のイベント、障害、監査ログを収集し、Google SecOps の取り込み用に Google Cloud Storage に保存します。

始める前に

次の前提条件を満たしていることを確認します。

  • Google SecOps インスタンス
  • Cloud Storage API が有効になっている GCP プロジェクト
  • GCS バケットを作成および管理する権限
  • GCS バケットの IAM ポリシーを管理する権限
  • Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
  • Cisco APIC コンソールへの特権アクセス

Cisco ACI APIC の前提条件を収集する

APIC 認証情報を取得する

  1. HTTPS を使用して Cisco APIC コンソールにログインします。
  2. [Admin] > [AAA](APIC 6.0 以降)または [Admin] > [Authentication] > [AAA](以前のリリース)。

  3. 適切な権限を持つローカル ユーザーを作成するか、既存のローカル ユーザーを使用します。

  4. 次の詳細をコピーして安全な場所に保存します。

    • APIC ユーザー名: モニタリング データへの読み取りアクセス権を持つローカル ユーザー
    • APIC Password: ユーザーのパスワード
    • APIC URL: APIC の HTTPS URL(例: https://apic.example.com

Google Cloud Storage バケットを作成する

  1. Google Cloud コンソールに移動します。
  2. プロジェクトを選択するか、新しいプロジェクトを作成します。
  3. ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
  4. [バケットを作成] をクリックします。
  5. 次の構成情報を提供してください。

    設定
    バケットに名前を付ける グローバルに一意の名前(cisco-aci-logs など)を入力します。
    ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。
    ロケーション ロケーションを選択します(例: us-central1)。
    ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ)
    アクセス制御 均一(推奨)
    保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする
  6. [作成] をクリックします。

Cloud Run functions のサービス アカウントを作成する

Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。

サービス アカウントの作成

  1. GCP Console で、[IAM と管理>サービス アカウント] に移動します。
  2. [サービス アカウントを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • サービス アカウント名: 「cisco-aci-collector-sa」と入力します。
    • サービス アカウントの説明: 「Service account for Cloud Run function to collect Cisco ACI logs」と入力します。
  4. [作成して続行] をクリックします。
  5. [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
    1. [ロールを選択] をクリックします。
    2. [ストレージ オブジェクト管理者] を検索して選択します。
    3. [+ 別のロールを追加] をクリックします。
    4. [Cloud Run 起動元] を検索して選択します。
    5. [+ 別のロールを追加] をクリックします。
    6. [Cloud Functions 起動元] を検索して選択します。
  6. [続行] をクリックします。
  7. [完了] をクリックします。

これらのロールは、次の目的で必要です。

  • Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
  • Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
  • Cloud Functions 起動元: 関数の呼び出しを許可する

GCS バケットに対する IAM 権限を付与する

GCS バケットに対する書き込み権限をサービス アカウント(cisco-aci-collector-sa)に付与します。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名(cisco-aci-logs など)をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: サービス アカウントのメールアドレス(例: cisco-aci-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。
    • ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
  6. [保存] をクリックします。

Pub/Sub トピックの作成

Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。

  1. GCP Console で、[Pub/Sub> トピック] に移動します。
  2. [トピックを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • トピック ID: 「cisco-aci-trigger」と入力します。
    • その他の設定はデフォルトのままにします。
  4. [作成] をクリックします。

ログを収集する Cloud Run 関数を作成する

Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Cisco APIC REST API からログを取得して GCS に書き込みます。

  1. GCP Console で、[Cloud Run] に移動します。
  2. [サービスを作成] をクリックします。
  3. [関数] を選択します(インライン エディタを使用して関数を作成します)。
  4. [構成] セクションで、次の構成の詳細を指定します。

    設定
    サービス名 cisco-aci-collector
    リージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。
    ランタイム [Python 3.12] 以降を選択します。
  5. [トリガー(省略可)] セクションで、次の操作を行います。

    1. [+ トリガーを追加] をクリックします。
    2. [Cloud Pub/Sub] を選択します。
    3. [Cloud Pub/Sub トピックを選択してください] で、Pub/Sub トピック(cisco-aci-trigger)を選択します。
    4. [保存] をクリックします。
  6. [認証] セクションで、次の操作を行います。

    • [認証が必要] を選択します。
    • [Identity and Access Management(IAM)] を選択します。
  1. [コンテナ、ネットワーキング、セキュリティ] までスクロールして開きます。
  2. [セキュリティ] タブに移動します。
    • サービス アカウント: サービス アカウントを選択します(cisco-aci-collector-sa)。
  3. [コンテナ] タブに移動します。

    • [変数とシークレット] をクリックします。
    • 環境変数ごとに [+ 変数を追加] をクリックします。

      変数名 値の例 説明
      GCS_BUCKET cisco-aci-logs GCS バケット名
      GCS_PREFIX cisco-aci-events ログファイルの接頭辞
      STATE_KEY cisco-aci-events/state.json 状態ファイルのパス
      APIC_URL https://apic.example.com APIC HTTPS URL
      APIC_USERNAME your-apic-username APIC ユーザー名
      APIC_PASSWORD your-apic-password APIC パスワード
      PAGE_SIZE 100 1 ページあたりのレコード数
      MAX_PAGES 10 実行あたりの最大ページ数
  4. [変数とシークレット] セクションで、[リクエスト] までスクロールします。

    • リクエスト タイムアウト: 300 秒(5 分)を入力します。
  5. [設定] タブに移動します。

    • [リソース] セクションで次の操作を行います。
      • メモリ: 512 MiB 以上を選択します。
      • CPU: [1] を選択します。
  6. [リビジョン スケーリング] セクションで、次の操作を行います。

    • [インスタンスの最小数] に「0」と入力します。
    • インスタンスの最大数: 100 と入力します(または、予想される負荷に基づいて調整します)。
  7. [作成] をクリックします。

  8. サービスが作成されるまで待ちます(1 ~ 2 分)。

  9. サービスが作成されると、インライン コードエディタが自動的に開きます。

関数コードを追加する

  1. [エントリ ポイント] フィールドに「main」と入力します。
  2. インライン コードエディタで、次の 2 つのファイルを作成します。

    • 最初のファイル: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import logging
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=60.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'cisco-aci-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'cisco-aci-events/state.json')
    APIC_URL = os.environ.get('APIC_URL')
    APIC_USERNAME = os.environ.get('APIC_USERNAME')
    APIC_PASSWORD = os.environ.get('APIC_PASSWORD')
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Cisco ACI logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, APIC_URL, APIC_USERNAME, APIC_PASSWORD]):
            logger.error('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            last_timestamp = state.get('last_timestamp')
            if not last_timestamp:
                last_timestamp = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z'
    
            logger.info(f"Starting Cisco ACI data collection for bucket: {GCS_BUCKET}")
    
            # Authenticate to APIC
            session_token = authenticate_apic(APIC_URL, APIC_USERNAME, APIC_PASSWORD)
            headers = {
                'Cookie': f'APIC-cookie={session_token}',
                'Accept': 'application/json',
                'Content-Type': 'application/json'
            }
    
            # Data types to collect
            data_types = ['faultInst', 'eventRecord', 'aaaModLR']
            all_collected_data = []
    
            for data_type in data_types:
                logger.info(f"Collecting {data_type} data")
                collected_data = collect_aci_data(
                    APIC_URL,
                    headers,
                    data_type,
                    last_timestamp,
                    PAGE_SIZE,
                    MAX_PAGES
                )
    
                # Tag each record with its type
                for record in collected_data:
                    record['_data_type'] = data_type
    
                all_collected_data.extend(collected_data)
                logger.info(f"Collected {len(collected_data)} {data_type} records")
    
            logger.info(f"Total records collected: {len(all_collected_data)}")
    
            # Store data in GCS if any were collected
            if all_collected_data:
                timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f"{GCS_PREFIX}/cisco_aci_events_{timestamp_str}.ndjson"
    
                # Convert to NDJSON format (one JSON object per line)
                ndjson_content = '\n'.join(json.dumps(record) for record in all_collected_data)
    
                # Upload to GCS
                blob = bucket.blob(s3_key)
                blob.upload_from_string(
                    ndjson_content,
                    content_type='application/x-ndjson'
                )
    
                logger.info(f"Uploaded {len(all_collected_data)} records to gs://{GCS_BUCKET}/{s3_key}")
    
                # Update state file with latest timestamp from collected data
                latest_timestamp = get_latest_timestamp_from_records(all_collected_data)
                if not latest_timestamp:
                    latest_timestamp = datetime.utcnow().isoformat() + 'Z'
    
                update_state(bucket, STATE_KEY, latest_timestamp)
            else:
                logger.info("No new log records found.")
    
            logger.info(f"Successfully processed {len(all_collected_data)} records")
    
        except Exception as e:
            logger.error(f'Error processing logs: {str(e)}')
            raise
    
    def authenticate_apic(apic_url, username, password):
        """Authenticate to APIC and return session token"""
        login_url = f"{apic_url}/api/aaaLogin.json"
        login_data = {
            "aaaUser": {
                "attributes": {
                    "name": username,
                    "pwd": password
                }
            }
        }
    
        response = http.request(
            'POST',
            login_url,
            body=json.dumps(login_data).encode('utf-8'),
            headers={'Content-Type': 'application/json'},
            timeout=30
        )
    
        if response.status != 200:
            raise RuntimeError(f"APIC authentication failed: {response.status} {response.data[:256]!r}")
    
        response_data = json.loads(response.data.decode('utf-8'))
        token = response_data['imdata'][0]['aaaLogin']['attributes']['token']
        logger.info("Successfully authenticated to APIC")
        return token
    
    def collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages):
        """Collect data from APIC REST API with pagination"""
        all_data = []
        page = 0
    
        while page < max_pages:
            # Build API URL with pagination and time filters
            api_url = f"{apic_url}/api/class/{data_type}.json"
            params = [
                f'page-size={page_size}',
                f'page={page}',
                f'order-by={data_type}.created|asc'
            ]
    
            # Add time filter to prevent duplicates
            if last_timestamp:
                params.append(f'query-target-filter=gt({data_type}.created,"{last_timestamp}")')
    
            full_url = f"{api_url}?{'&'.join(params)}"
    
            logger.info(f"Fetching {data_type} page {page} from APIC")
    
            # Make API request
            response = http.request('GET', full_url, headers=headers, timeout=60)
    
            if response.status != 200:
                logger.error(f"API request failed: {response.status} {response.data[:256]!r}")
                break
    
            data = json.loads(response.data.decode('utf-8'))
            records = data.get('imdata', [])
    
            if not records:
                logger.info(f"No more {data_type} records found")
                break
    
            # Extract the actual data from APIC format
            extracted_records = []
            for record in records:
                if data_type in record:
                    extracted_records.append(record[data_type])
    
            all_data.extend(extracted_records)
            page += 1
    
            # If we got less than page_size records, we've reached the end
            if len(records) < page_size:
                break
    
        return all_data
    
    def get_last_timestamp(bucket, state_key):
        """Get the last run timestamp from GCS state file"""
        try:
            blob = bucket.blob(state_key)
            if blob.exists():
                state_data = blob.download_as_text()
                state = json.loads(state_data)
                return state.get('last_timestamp')
        except Exception as e:
            logger.warning(f"Error reading state file: {str(e)}")
    
        return None
    
    def get_latest_timestamp_from_records(records):
        """Get the latest timestamp from collected records to prevent missing events"""
        if not records:
            return None
    
        latest = None
        latest_time = None
    
        for record in records:
            try:
                # Handle both direct attributes and nested structure
                attrs = record.get('attributes', record)
                created = attrs.get('created')
                modTs = attrs.get('modTs')
    
                # Use created or modTs as fallback
                timestamp = created or modTs
    
                if timestamp:
                    if latest_time is None or timestamp > latest_time:
                        latest_time = timestamp
                        latest = record
            except Exception as e:
                logger.debug(f"Error parsing timestamp from record: {e}")
                continue
    
        return latest_time
    
    def update_state(bucket, state_key, timestamp):
        """Update the state file with the current timestamp"""
        try:
            state_data = {
                'last_timestamp': timestamp,
                'updated_at': datetime.utcnow().isoformat() + 'Z'
            }
            blob = bucket.blob(state_key)
            blob.upload_from_string(
                json.dumps(state_data),
                content_type='application/json'
            )
            logger.info(f"Updated state file with timestamp: {timestamp}")
        except Exception as e:
            logger.error(f"Error updating state file: {str(e)}")
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            logger.warning(f"Could not load state: {e}")
    
        return {}
    
    • 2 つ目のファイル: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. [デプロイ] をクリックして、関数を保存してデプロイします。

  4. デプロイが完了するまで待ちます(2 ~ 3 分)。

Cloud Scheduler ジョブの作成

Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。

  1. GCP Console で、[Cloud Scheduler] に移動します。
  2. [ジョブを作成] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 cisco-aci-collector-15m
    リージョン Cloud Run functions と同じリージョンを選択する
    周波数 */15 * * * *(15 分ごと)
    タイムゾーン タイムゾーンを選択します(UTC を推奨)。
    ターゲット タイプ Pub/Sub
    トピック Pub/Sub トピック(cisco-aci-trigger)を選択する
    メッセージ本文 {}(空の JSON オブジェクト)
  4. [作成] をクリックします。

スケジュールの頻度のオプション

  • ログの量とレイテンシの要件に基づいて頻度を選択します。

    頻度 CRON 式 ユースケース
    5 分毎 */5 * * * * 大容量、低レイテンシ
    15 分ごと */15 * * * * 中程度の音量(推奨)
    1 時間ごと 0 * * * * 標準
    6 時間ごと 0 */6 * * * 少量、バッチ処理
    毎日 0 0 * * * 履歴データの収集

統合をテストする

  1. Cloud Scheduler コンソールで、ジョブ(cisco-aci-collector-15m)を見つけます。
  2. [強制実行] をクリックして、ジョブを手動でトリガーします。
  3. 数秒待ちます。
  4. Cloud Run > サービスに移動します。
  5. 関数名(cisco-aci-collector)をクリックします。
  6. [Logs] タブをクリックします。
  7. 関数が正常に実行されたことを確認します。次の内容を確認します。

    Starting Cisco ACI data collection for bucket: cisco-aci-logs
    Successfully authenticated to APIC
    Collecting faultInst data
    Collected X faultInst records
    Collecting eventRecord data
    Collected X eventRecord records
    Collecting aaaModLR data
    Collected X aaaModLR records
    Total records collected: X
    Uploaded X records to gs://cisco-aci-logs/cisco-aci-events/cisco_aci_events_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. [Cloud Storage] > [バケット] に移動します。

  9. バケット名(cisco-aci-logs)をクリックします。

  10. プレフィックス フォルダ(cisco-aci-events/)に移動します。

  11. 現在のタイムスタンプで新しい .ndjson ファイルが作成されたことを確認します。

ログにエラーが表示された場合:

  • HTTP 401: 環境変数で APIC 認証情報を確認する
  • HTTP 403: APIC アカウントに faultInsteventRecordaaaModLR クラスに対する読み取り権限があることを確認します
  • 接続エラー: Cloud Run 関数が TCP/443 で APIC URL に到達できることを確認する
  • 環境変数が不足している: 必要な変数がすべて設定されていることを確認します

Google SecOps サービス アカウントを取得する

Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。

サービス アカウントのメールアドレスを取得する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Cisco ACI JSON logs)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [Log type] として [Cisco Application Centric Infrastructure] を選択します。
  7. [サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. このメールアドレスをコピーします。次のステップでこれを使用します。

Google SecOps サービス アカウントに IAM 権限を付与する

Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名(cisco-aci-logs)をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
    • ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
  6. [保存] をクリックします。

Cisco ACI のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Cisco ACI JSON logs)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [Log type] として [Cisco Application Centric Infrastructure] を選択します。
  7. [次へ] をクリックします。
  8. 次の入力パラメータの値を指定します。

    • ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。

      gs://cisco-aci-logs/cisco-aci-events/
      
      • 次のように置き換えます。
        • cisco-aci-logs: GCS バケット名。
        • cisco-aci-events: ログが保存される接頭辞/フォルダパス。
    • Source deletion option: 必要に応じて削除オプションを選択します。

      • なし: 転送後にファイルを削除しません(テストにおすすめ)。
      • 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
      • 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。

    • アセットの名前空間: アセットの名前空間

    • Ingestion labels: このフィードのイベントに適用されるラベル。

  9. [次へ] をクリックします。

  10. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング 論理
@timestamp read_only_udm.metadata.event_timestamp 値は未加工ログの「@timestamp」フィールドから取得され、タイムスタンプとして解析されます。
aci_tag read_only_udm.metadata.product_log_id 値は、未加工ログのフィールド「aci_tag」から取得されます。
cisco_timestamp - マッピングされていません。
DIP read_only_udm.target.ip 値は、未加工ログのフィールド「DIP」から取得されます。
DPort read_only_udm.target.port 値は未加工ログのフィールド「DPort」から取得され、整数に変換されます。
説明 read_only_udm.security_result.description 値は、未加工ログのフィールド「description」から取得されます。
fault_cause read_only_udm.additional.fields.value.string_value 値は、未加工ログのフィールド「fault_cause」から取得されます。キーは「Fault Cause」に設定されます。
hostname read_only_udm.principal.hostname 値は、未加工ログの「hostname」フィールドから取得されます。
lifecycle_state read_only_udm.metadata.product_event_type 値は、未加工ログのフィールド「lifecycle_state」から取得されます。
log.source.address - マッピングされていません。
logstash.collect.host - マッピングされていません。
logstash.collect.timestamp read_only_udm.metadata.collected_timestamp 値は、未加工ログのフィールド「logstash.collect.timestamp」から取得され、タイムスタンプとして解析されます。
logstash.ingest.host read_only_udm.intermediary.hostname 値は、未加工ログのフィールド「logstash.ingest.host」から取得されます。
logstash.irm_environment read_only_udm.additional.fields.value.string_value 値は、未加工ログのフィールド「logstash.irm_environment」から取得されます。キーは「IRM_Environment」に設定されます。
logstash.irm_region read_only_udm.additional.fields.value.string_value 値は、未加工ログのフィールド「logstash.irm_region」から取得されます。キーは「IRM_Region」に設定されます。
logstash.irm_site read_only_udm.additional.fields.value.string_value 値は、未加工ログのフィールド「logstash.irm_site」から取得されます。キーは「IRM_Site」に設定されます。
logstash.process.host read_only_udm.intermediary.hostname 値は、未加工ログのフィールド「logstash.process.host」から取得されます。
メッセージ - マッピングされていません。
message_class - マッピングされていません。
message_code - マッピングされていません。
message_content - マッピングされていません。
message_dn - マッピングされていません。
message_type read_only_udm.metadata.product_event_type 値は、角かっこを削除した後に、未加工ログのフィールド「message_type」から取得されます。
node_link read_only_udm.principal.process.file.full_path 値は、未加工ログのフィールド「node_link」から取得されます。
PktLen read_only_udm.network.received_bytes 値は未加工ログのフィールド「PktLen」から取得され、符号なし整数に変換されます。
プログラム - マッピングされていません。
.proto read_only_udm.network.ip_protocol 値は未加工ログのフィールド「Proto」から取得され、整数に変換されて、対応する IP プロトコル名(6 -> TCP)。
SIP read_only_udm.principal.ip 値は、未加工ログのフィールド「SIP」から取得されます。
SPort read_only_udm.principal.port 値は未加工ログのフィールド「SPort」から取得され、整数に変換されます。
syslog_facility - マッピングされていません。
syslog_facility_code - マッピングされていません。
syslog_host read_only_udm.principal.ip、read_only_udm.observer.ip 値は、未加工ログのフィールド「syslog_host」から取得されます。
syslog_prog - マッピングされていません。
syslog_severity read_only_udm.security_result.severity_details 値は、未加工ログのフィールド「syslog_severity」から取得されます。
syslog_severity_code read_only_udm.security_result.severity 値は未加工ログのフィールド「syslog_severity_code」から取得され、対応する重大度レベルにマッピングされます。5、6、7 -> INFORMATIONAL、3、4 -> MEDIUM、0、1、2 -> HIGH。
syslog5424_pri - マッピングされていません。
Vlan-Id read_only_udm.principal.resource.id 値は、未加工ログのフィールド「Vlan-Id」から取得されます。
- read_only_udm.metadata.event_type ロジック: 「SIP」または「hostname」が存在し、「Proto」が存在する場合は、「NETWORK_CONNECTION」に設定します。それ以外の場合で、「SIP」、「hostname」、「syslog_host」のいずれかが存在する場合は、「STATUS_UPDATE」に設定します。それ以外の場合は、「GENERIC_EVENT」に設定されます。
- read_only_udm.metadata.log_type ロジック: 「CISCO_ACI」に設定されます。
- read_only_udm.metadata.vendor_name ロジック: 「Cisco」に設定します。
- read_only_udm.metadata.product_name ロジック: 「ACI」に設定します。

ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。