Cisco Application Centric Infrastructure(ACI)のログを収集する
このドキュメントでは、Cisco Application Centric Infrastructure(ACI)ログを Google Security Operations に取り込む方法について説明します。パーサーは、まず Grok パターンを使用して、着信した Cisco ACI ログを syslog メッセージとして処理しようとします。syslog の解析に失敗すると、メッセージが JSON 形式であると想定して、それに応じて解析します。最後に、抽出されたフィールドを統合データモデル(UDM)にマッピングします。
この統合では、次の 2 つの方法がサポートされています。
- オプション 1: Bindplane エージェントを使用した Syslog 形式
- オプション 2: APIC REST API を使用して Google Cloud Storage を使用する JSON 形式
各オプションは自己完結型であり、インフラストラクチャの要件とログ形式の好みに基づいて個別に実装できます。
オプション 1: Bindplane エージェントを使用した Syslog
このオプションは、Syslog メッセージを Bindplane エージェントに送信するように Cisco ACI ファブリックを構成します。Bindplane エージェントは、分析のために Google Security Operations にメッセージを転送します。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
systemdを使用する Windows 2016 以降または Linux ホスト- プロキシの背後で実行している場合は、Bindplane エージェントの要件に従ってファイアウォール ポートが開いていることを確認します。
- Cisco APIC コンソールへの特権アクセス
Google SecOps の取り込み認証ファイルを取得する
- [SIEM 設定] > [収集エージェント] に移動します。
- Ingestion Authentication File をダウンロードします。
- Bindplane をインストールするシステムにファイルを安全に保存します。
Google SecOps のお客様 ID を取得する
- [SIEM 設定] > [プロファイル] に移動します。
- [組織の詳細情報] セクションから [お客様 ID] をコピーして保存します。
Bindplane エージェントをインストールする
次の手順に沿って、Windows または Linux オペレーティング システムに Bindplane エージェントをインストールします。
Windows のインストール
- 管理者としてコマンド プロンプトまたは PowerShell を開きます。
次のコマンドを実行します。
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Linux のインストール
- root 権限または sudo 権限でターミナルを開きます。
次のコマンドを実行します。
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
その他のインストール オプションについては、Bindplane エージェントのインストール ガイドをご覧ください。
Syslog を取り込んで Google SecOps に送信するように Bindplane エージェントを構成する
構成ファイルにアクセスする
config.yamlファイルを見つけます。通常、Linux では/etc/bindplane-agent/ディレクトリに、Windows ではインストール ディレクトリにあります。- テキスト エディタ(
nano、vi、メモ帳など)を使用してファイルを開きます。 config.yaml ファイルを編集します。
receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'CISCO_ACI' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- 次のように置き換えます。
- 自社のインフラストラクチャでの必要性に応じて、ポートと IP アドレスを置き換えます。
<CUSTOMER_ID>は、実際の顧客 ID に置き換えます。/path/to/ingestion-authentication-file.jsonの値を、認証ファイルを保存したパスに更新します。
- 次のように置き換えます。
Bindplane エージェントを再起動して変更を適用する
Linux で Bindplane エージェントを再起動するには、次のコマンドを実行します。
sudo systemctl restart bindplane-agentWindows で Bindplane エージェントを再起動するには、サービス コンソールを使用するか、次のコマンドを入力します。
net stop BindPlaneAgent && net start BindPlaneAgent
Cisco ACI で Syslog 転送を構成する
帯域外管理契約を構成する
- Cisco APIC コンソールにログインします。
- [テナント > 管理 > 契約 > フィルタ] に移動します。
- [フィルタを作成] をクリックします。
- 次の構成の詳細を指定します。
- 名前: 「
syslog-udp-514」と入力します。 - エントリ名: 「
syslog」と入力します。 - EtherType: [IP] を選択します。
- IP Protocol: [UDP] を選択します。
- 宛先ポートの範囲(開始):
514と入力します。 - 宛先ポートの範囲(終了): 「
514」と入力します。
- 名前: 「
- [送信] をクリックします。
管理契約を作成する
- [テナント> 管理 > 契約 > 標準] に移動します。
- [Create Contract] をクリックします。
- 次の構成の詳細を指定します。
- 名前: 「
mgmt-syslog-contract」と入力します。 - スコープ: [コンテキスト] を選択します。
- 名前: 「
- [送信] をクリックします。
- 契約を開き、[Subjects] をクリックします。
- [Create Contract Subject] をクリックします。
- 次の構成の詳細を指定します。
- 名前: 「
syslog-subject」と入力します。 - 両方向に適用: このオプションをオンにします。
- 名前: 「
- [送信] をクリックします。
- 件名を開き、[フィルタ] をクリックします。
- [フィルタ バインディングを作成] をクリックします。
syslog-udp-514フィルタを選択します。- [送信] をクリックします。
Syslog 宛先グループを構成する
- [管理者] > [外部データ コレクタ] > [モニタリングの宛先] > [Syslog] に移動します。
- [Syslog] を右クリックし、[Syslog モニタリング移行先グループを作成] を選択します。
- 次の構成の詳細を指定します。
- 名前: 「
Chronicle-Syslog-Group」と入力します。 - Admin State: [Enabled] を選択します。
- 形式: aci を選択します。
- 名前: 「
- [次へ] をクリックします。
- [Syslog モニタリングの宛先を作成] ダイアログで、次の操作を行います。
- 名前: 「
Chronicle-BindPlane」と入力します。 - ホスト: Bindplane エージェント サーバーの IP アドレスを入力します。
- ポート: 「
514」と入力します。 - Admin State: [Enabled] を選択します。
- 重大度: [情報] を選択します(詳細なログをキャプチャするため)。
- 名前: 「
- [送信] をクリックします。
モニタリング ポリシーを構成する
ファブリック モニタリング ポリシー
- [Fabric] > [Fabric Policies] > [Policies] > [Monitoring] > [Common Policy] に移動します。
- [Callhome/Smart Callhome/SNMP/Syslog/TACACS] を展開します。
- [Syslog] を右クリックし、[Create Syslog Source] を選択します。
- 次の構成の詳細を指定します。
- 名前: 「
Chronicle-Fabric-Syslog」と入力します。 - 監査ログ: 監査イベントを含める場合にオンにします。
- イベント: システム イベントを含める場合にオンにします。
- Faults: 障害イベントを含める場合にオンにします。
- セッションログ: セッションログを含める場合にオンにします。
- 宛先グループ:
Chronicle-Syslog-Groupを選択します。
- 名前: 「
- [送信] をクリックします。
アクセス モニタリング ポリシー
- [Fabric] > [Access Policies] > [Policies] > [Monitoring] > [Default Policy] に移動します。
- [Callhome/Smart Callhome/SNMP/Syslog] を開きます。
- [Syslog] を右クリックし、[Create Syslog Source] を選択します。
- 次の構成の詳細を指定します。
- 名前: 「
Chronicle-Access-Syslog」と入力します。 - 監査ログ: 監査イベントを含める場合にオンにします。
- イベント: システム イベントを含める場合にオンにします。
- Faults: 障害イベントを含める場合にオンにします。
- セッションログ: セッションログを含める場合にオンにします。
- 宛先グループ:
Chronicle-Syslog-Groupを選択します。
- 名前: 「
- [送信] をクリックします。
システム Syslog メッセージ ポリシーを構成する
- [Fabric] > [Fabric Policies] > [Policies] > [Monitoring] > [Common Policy] に移動します。
- [Syslog Messages Policies] を開きます。
- [default] をクリックします。
- [施設フィルタ] セクションで次の操作を行います。
- ファシリティ: [default] を選択します。
- 最小重大度: [情報] に変更します。
- [送信] をクリックします。
オプション 2: Google Cloud Storage を使用した JSON
このオプションでは、APIC REST API を使用して、Cisco ACI ファブリックから JSON 形式のイベント、障害、監査ログを収集し、Google SecOps の取り込み用に Google Cloud Storage に保存します。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- Cisco APIC コンソールへの特権アクセス
Cisco ACI APIC の前提条件を収集する
APIC 認証情報を取得する
- HTTPS を使用して Cisco APIC コンソールにログインします。
[Admin] > [AAA](APIC 6.0 以降)または [Admin] > [Authentication] > [AAA](以前のリリース)。
適切な権限を持つローカル ユーザーを作成するか、既存のローカル ユーザーを使用します。
次の詳細をコピーして安全な場所に保存します。
- APIC ユーザー名: モニタリング データへの読み取りアクセス権を持つローカル ユーザー
- APIC Password: ユーザーのパスワード
- APIC URL: APIC の HTTPS URL(例:
https://apic.example.com)
Google Cloud Storage バケットを作成する
- Google Cloud コンソールに移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( cisco-aci-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
cisco-aci-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect Cisco ACI logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウント(cisco-aci-collector-sa)に付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(
cisco-aci-logsなど)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
cisco-aci-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
cisco-aci-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Cisco APIC REST API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 cisco-aci-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、Pub/Sub トピック(
cisco-aci-trigger)を選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- [Identity and Access Management(IAM)] を選択します。
- [コンテナ、ネットワーキング、セキュリティ] までスクロールして開きます。
- [セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウントを選択します(
cisco-aci-collector-sa)。
- サービス アカウント: サービス アカウントを選択します(
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKETcisco-aci-logsGCS バケット名 GCS_PREFIXcisco-aci-eventsログファイルの接頭辞 STATE_KEYcisco-aci-events/state.json状態ファイルのパス APIC_URLhttps://apic.example.comAPIC HTTPS URL APIC_USERNAMEyour-apic-usernameAPIC ユーザー名 APIC_PASSWORDyour-apic-passwordAPIC パスワード PAGE_SIZE1001 ページあたりのレコード数 MAX_PAGES10実行あたりの最大ページ数
[変数とシークレット] セクションで、[リクエスト] までスクロールします。
- リクエスト タイムアウト:
300秒(5 分)を入力します。
- リクエスト タイムアウト:
[設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [リソース] セクションで次の操作を行います。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスが作成されると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [エントリ ポイント] フィールドに「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import logging # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=60.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'cisco-aci-events') STATE_KEY = os.environ.get('STATE_KEY', 'cisco-aci-events/state.json') APIC_URL = os.environ.get('APIC_URL') APIC_USERNAME = os.environ.get('APIC_USERNAME') APIC_PASSWORD = os.environ.get('APIC_PASSWORD') PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Cisco ACI logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, APIC_URL, APIC_USERNAME, APIC_PASSWORD]): logger.error('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window last_timestamp = state.get('last_timestamp') if not last_timestamp: last_timestamp = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z' logger.info(f"Starting Cisco ACI data collection for bucket: {GCS_BUCKET}") # Authenticate to APIC session_token = authenticate_apic(APIC_URL, APIC_USERNAME, APIC_PASSWORD) headers = { 'Cookie': f'APIC-cookie={session_token}', 'Accept': 'application/json', 'Content-Type': 'application/json' } # Data types to collect data_types = ['faultInst', 'eventRecord', 'aaaModLR'] all_collected_data = [] for data_type in data_types: logger.info(f"Collecting {data_type} data") collected_data = collect_aci_data( APIC_URL, headers, data_type, last_timestamp, PAGE_SIZE, MAX_PAGES ) # Tag each record with its type for record in collected_data: record['_data_type'] = data_type all_collected_data.extend(collected_data) logger.info(f"Collected {len(collected_data)} {data_type} records") logger.info(f"Total records collected: {len(all_collected_data)}") # Store data in GCS if any were collected if all_collected_data: timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S') s3_key = f"{GCS_PREFIX}/cisco_aci_events_{timestamp_str}.ndjson" # Convert to NDJSON format (one JSON object per line) ndjson_content = '\n'.join(json.dumps(record) for record in all_collected_data) # Upload to GCS blob = bucket.blob(s3_key) blob.upload_from_string( ndjson_content, content_type='application/x-ndjson' ) logger.info(f"Uploaded {len(all_collected_data)} records to gs://{GCS_BUCKET}/{s3_key}") # Update state file with latest timestamp from collected data latest_timestamp = get_latest_timestamp_from_records(all_collected_data) if not latest_timestamp: latest_timestamp = datetime.utcnow().isoformat() + 'Z' update_state(bucket, STATE_KEY, latest_timestamp) else: logger.info("No new log records found.") logger.info(f"Successfully processed {len(all_collected_data)} records") except Exception as e: logger.error(f'Error processing logs: {str(e)}') raise def authenticate_apic(apic_url, username, password): """Authenticate to APIC and return session token""" login_url = f"{apic_url}/api/aaaLogin.json" login_data = { "aaaUser": { "attributes": { "name": username, "pwd": password } } } response = http.request( 'POST', login_url, body=json.dumps(login_data).encode('utf-8'), headers={'Content-Type': 'application/json'}, timeout=30 ) if response.status != 200: raise RuntimeError(f"APIC authentication failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) token = response_data['imdata'][0]['aaaLogin']['attributes']['token'] logger.info("Successfully authenticated to APIC") return token def collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages): """Collect data from APIC REST API with pagination""" all_data = [] page = 0 while page < max_pages: # Build API URL with pagination and time filters api_url = f"{apic_url}/api/class/{data_type}.json" params = [ f'page-size={page_size}', f'page={page}', f'order-by={data_type}.created|asc' ] # Add time filter to prevent duplicates if last_timestamp: params.append(f'query-target-filter=gt({data_type}.created,"{last_timestamp}")') full_url = f"{api_url}?{'&'.join(params)}" logger.info(f"Fetching {data_type} page {page} from APIC") # Make API request response = http.request('GET', full_url, headers=headers, timeout=60) if response.status != 200: logger.error(f"API request failed: {response.status} {response.data[:256]!r}") break data = json.loads(response.data.decode('utf-8')) records = data.get('imdata', []) if not records: logger.info(f"No more {data_type} records found") break # Extract the actual data from APIC format extracted_records = [] for record in records: if data_type in record: extracted_records.append(record[data_type]) all_data.extend(extracted_records) page += 1 # If we got less than page_size records, we've reached the end if len(records) < page_size: break return all_data def get_last_timestamp(bucket, state_key): """Get the last run timestamp from GCS state file""" try: blob = bucket.blob(state_key) if blob.exists(): state_data = blob.download_as_text() state = json.loads(state_data) return state.get('last_timestamp') except Exception as e: logger.warning(f"Error reading state file: {str(e)}") return None def get_latest_timestamp_from_records(records): """Get the latest timestamp from collected records to prevent missing events""" if not records: return None latest = None latest_time = None for record in records: try: # Handle both direct attributes and nested structure attrs = record.get('attributes', record) created = attrs.get('created') modTs = attrs.get('modTs') # Use created or modTs as fallback timestamp = created or modTs if timestamp: if latest_time is None or timestamp > latest_time: latest_time = timestamp latest = record except Exception as e: logger.debug(f"Error parsing timestamp from record: {e}") continue return latest_time def update_state(bucket, state_key, timestamp): """Update the state file with the current timestamp""" try: state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } blob = bucket.blob(state_key) blob.upload_from_string( json.dumps(state_data), content_type='application/json' ) logger.info(f"Updated state file with timestamp: {timestamp}") except Exception as e: logger.error(f"Error updating state file: {str(e)}") def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: logger.warning(f"Could not load state: {e}") return {}- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 cisco-aci-collector-15mリージョン Cloud Run functions と同じリージョンを選択する 周波数 */15 * * * *(15 分ごと)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック Pub/Sub トピック( cisco-aci-trigger)を選択するメッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 5 分毎 */5 * * * *大容量、低レイテンシ 15 分ごと */15 * * * *中程度の音量(推奨) 1 時間ごと 0 * * * *標準 6 時間ごと 0 */6 * * *少量、バッチ処理 毎日 0 0 * * *履歴データの収集
統合をテストする
- Cloud Scheduler コンソールで、ジョブ(
cisco-aci-collector-15m)を見つけます。 - [強制実行] をクリックして、ジョブを手動でトリガーします。
- 数秒待ちます。
- Cloud Run > サービスに移動します。
- 関数名(
cisco-aci-collector)をクリックします。 - [Logs] タブをクリックします。
関数が正常に実行されたことを確認します。次の内容を確認します。
Starting Cisco ACI data collection for bucket: cisco-aci-logs Successfully authenticated to APIC Collecting faultInst data Collected X faultInst records Collecting eventRecord data Collected X eventRecord records Collecting aaaModLR data Collected X aaaModLR records Total records collected: X Uploaded X records to gs://cisco-aci-logs/cisco-aci-events/cisco_aci_events_YYYYMMDD_HHMMSS.ndjson Successfully processed X records[Cloud Storage] > [バケット] に移動します。
バケット名(
cisco-aci-logs)をクリックします。プレフィックス フォルダ(
cisco-aci-events/)に移動します。現在のタイムスタンプで新しい
.ndjsonファイルが作成されたことを確認します。
ログにエラーが表示された場合:
- HTTP 401: 環境変数で APIC 認証情報を確認する
- HTTP 403: APIC アカウントに
faultInst、eventRecord、aaaModLRクラスに対する読み取り権限があることを確認します - 接続エラー: Cloud Run 関数が TCP/443 で APIC URL に到達できることを確認する
- 環境変数が不足している: 必要な変数がすべて設定されていることを確認します
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Cisco ACI JSON logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [Log type] として [Cisco Application Centric Infrastructure] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーします。次のステップでこれを使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(
cisco-aci-logs)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
- [保存] をクリックします。
Cisco ACI のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Cisco ACI JSON logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [Log type] として [Cisco Application Centric Infrastructure] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://cisco-aci-logs/cisco-aci-events/- 次のように置き換えます。
cisco-aci-logs: GCS バケット名。cisco-aci-events: ログが保存される接頭辞/フォルダパス。
- 次のように置き換えます。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
- 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | 論理 |
|---|---|---|
| @timestamp | read_only_udm.metadata.event_timestamp | 値は未加工ログの「@timestamp」フィールドから取得され、タイムスタンプとして解析されます。 |
| aci_tag | read_only_udm.metadata.product_log_id | 値は、未加工ログのフィールド「aci_tag」から取得されます。 |
| cisco_timestamp | - | マッピングされていません。 |
| DIP | read_only_udm.target.ip | 値は、未加工ログのフィールド「DIP」から取得されます。 |
| DPort | read_only_udm.target.port | 値は未加工ログのフィールド「DPort」から取得され、整数に変換されます。 |
| 説明 | read_only_udm.security_result.description | 値は、未加工ログのフィールド「description」から取得されます。 |
| fault_cause | read_only_udm.additional.fields.value.string_value | 値は、未加工ログのフィールド「fault_cause」から取得されます。キーは「Fault Cause」に設定されます。 |
| hostname | read_only_udm.principal.hostname | 値は、未加工ログの「hostname」フィールドから取得されます。 |
| lifecycle_state | read_only_udm.metadata.product_event_type | 値は、未加工ログのフィールド「lifecycle_state」から取得されます。 |
| log.source.address | - | マッピングされていません。 |
| logstash.collect.host | - | マッピングされていません。 |
| logstash.collect.timestamp | read_only_udm.metadata.collected_timestamp | 値は、未加工ログのフィールド「logstash.collect.timestamp」から取得され、タイムスタンプとして解析されます。 |
| logstash.ingest.host | read_only_udm.intermediary.hostname | 値は、未加工ログのフィールド「logstash.ingest.host」から取得されます。 |
| logstash.irm_environment | read_only_udm.additional.fields.value.string_value | 値は、未加工ログのフィールド「logstash.irm_environment」から取得されます。キーは「IRM_Environment」に設定されます。 |
| logstash.irm_region | read_only_udm.additional.fields.value.string_value | 値は、未加工ログのフィールド「logstash.irm_region」から取得されます。キーは「IRM_Region」に設定されます。 |
| logstash.irm_site | read_only_udm.additional.fields.value.string_value | 値は、未加工ログのフィールド「logstash.irm_site」から取得されます。キーは「IRM_Site」に設定されます。 |
| logstash.process.host | read_only_udm.intermediary.hostname | 値は、未加工ログのフィールド「logstash.process.host」から取得されます。 |
| メッセージ | - | マッピングされていません。 |
| message_class | - | マッピングされていません。 |
| message_code | - | マッピングされていません。 |
| message_content | - | マッピングされていません。 |
| message_dn | - | マッピングされていません。 |
| message_type | read_only_udm.metadata.product_event_type | 値は、角かっこを削除した後に、未加工ログのフィールド「message_type」から取得されます。 |
| node_link | read_only_udm.principal.process.file.full_path | 値は、未加工ログのフィールド「node_link」から取得されます。 |
| PktLen | read_only_udm.network.received_bytes | 値は未加工ログのフィールド「PktLen」から取得され、符号なし整数に変換されます。 |
| プログラム | - | マッピングされていません。 |
| .proto | read_only_udm.network.ip_protocol | 値は未加工ログのフィールド「Proto」から取得され、整数に変換されて、対応する IP プロトコル名(6 -> TCP)。 |
| SIP | read_only_udm.principal.ip | 値は、未加工ログのフィールド「SIP」から取得されます。 |
| SPort | read_only_udm.principal.port | 値は未加工ログのフィールド「SPort」から取得され、整数に変換されます。 |
| syslog_facility | - | マッピングされていません。 |
| syslog_facility_code | - | マッピングされていません。 |
| syslog_host | read_only_udm.principal.ip、read_only_udm.observer.ip | 値は、未加工ログのフィールド「syslog_host」から取得されます。 |
| syslog_prog | - | マッピングされていません。 |
| syslog_severity | read_only_udm.security_result.severity_details | 値は、未加工ログのフィールド「syslog_severity」から取得されます。 |
| syslog_severity_code | read_only_udm.security_result.severity | 値は未加工ログのフィールド「syslog_severity_code」から取得され、対応する重大度レベルにマッピングされます。5、6、7 -> INFORMATIONAL、3、4 -> MEDIUM、0、1、2 -> HIGH。 |
| syslog5424_pri | - | マッピングされていません。 |
| Vlan-Id | read_only_udm.principal.resource.id | 値は、未加工ログのフィールド「Vlan-Id」から取得されます。 |
| - | read_only_udm.metadata.event_type | ロジック: 「SIP」または「hostname」が存在し、「Proto」が存在する場合は、「NETWORK_CONNECTION」に設定します。それ以外の場合で、「SIP」、「hostname」、「syslog_host」のいずれかが存在する場合は、「STATUS_UPDATE」に設定します。それ以外の場合は、「GENERIC_EVENT」に設定されます。 |
| - | read_only_udm.metadata.log_type | ロジック: 「CISCO_ACI」に設定されます。 |
| - | read_only_udm.metadata.vendor_name | ロジック: 「Cisco」に設定します。 |
| - | read_only_udm.metadata.product_name | ロジック: 「ACI」に設定します。 |
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。