收集 Cisco Application Centric Infrastructure (ACI) 日志

支持的平台:

本文档介绍了如何将 Cisco Application Centric Infrastructure (ACI) 日志提取到 Google Security Operations。解析器首先尝试使用 Grok 模式将传入的 Cisco ACI 日志处理为 syslog 消息。如果 syslog 解析失败,系统会假定消息采用 JSON 格式,并相应地进行解析。最后,它会将提取的字段映射到统一数据模型 (UDM)。

此集成支持两种方法:

  • 方法 1:使用 Bindplane 代理的 Syslog 格式
  • 方式 2:使用 APIC REST API 以 JSON 格式使用 Google Cloud Storage

每个选项都是独立的,您可以根据自己的基础架构要求和日志格式偏好单独实现。

方法 1:使用 Bindplane 代理的 Syslog

此选项用于配置 Cisco ACI 交换矩阵,以将 syslog 消息发送到 Bindplane 代理,然后由该代理将消息转发到 Google Security Operations 进行分析。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 搭载 systemd 的 Windows 2016 或更高版本或 Linux 主机
  • 如果在代理后面运行,请确保防火墙端口根据 Bindplane 代理要求处于开放状态
  • 对 Cisco APIC 控制台的特权访问

获取 Google SecOps 注入身份验证文件

  1. 依次前往 SIEM 设置 > 收集代理
  2. 下载注入身份验证文件
  3. 将文件安全地保存在将要安装 Bindplane 的系统上。

获取 Google SecOps 客户 ID

  1. 依次前往 SIEM 设置 > 个人资料
  2. 复制并保存组织详细信息部分中的客户 ID

安装 Bindplane 代理

按照以下说明在 Windows 或 Linux 操作系统上安装 Bindplane 代理。

Windows 安装

  1. 以管理员身份打开命令提示符或 PowerShell。
  2. 运行以下命令:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Linux 安装

  1. 打开具有 root 或 sudo 权限的终端。
  2. 运行以下命令:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

如需了解其他安装选项,请参阅 Bindplane 代理安装指南

配置 Bindplane 代理以注入 Syslog 并将其发送到 Google SecOps

访问配置文件

  1. 找到 config.yaml 文件。通常,它位于 Linux 上的 /etc/bindplane-agent/ 目录中或 Windows 上的安装目录中。
  2. 使用文本编辑器(例如 nanovi 或记事本)打开该文件。
  3. 修改 config.yaml 文件:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'CISCO_ACI'
        raw_log_field: body
        ingestion_labels:
          service:
    
    pipelines:
      logs/source0__chronicle_w_labels-0:
        receivers:
          - udplog
        exporters:
          - chronicle/chronicle_w_labels
    
    • 替换以下内容:
      • 根据您的基础架构需要替换端口和 IP 地址。
      • <CUSTOMER_ID> 替换为实际的客户 ID。
      • /path/to/ingestion-authentication-file.json 更新为保存身份验证文件的路径。

重启 Bindplane 代理以应用更改

  • 如需在 Linux 中重启 Bindplane 代理,请运行以下命令:

    sudo systemctl restart bindplane-agent
    
  • 如需在 Windows 中重启 Bindplane 代理,您可以使用“服务”控制台,也可以输入以下命令:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

在 Cisco ACI 上配置 Syslog 转发

配置带外管理合同

  1. 登录 Cisco APIC 控制台。
  2. 依次前往租户> 管理 > 合同 > 过滤条件
  3. 点击创建过滤器
  4. 提供以下配置详细信息:
    • 名称:输入 syslog-udp-514
    • 条目名称:输入 syslog
    • EtherType:选择 IP
    • IP 协议:选择 UDP
    • 目标端口范围(起始):输入 514
    • 目标端口范围至:输入 514
  5. 点击提交

创建管理合同

  1. 依次前往租户> 管理 > 合同 > 标准
  2. 点击创建合同
  3. 提供以下配置详细信息:
    • 名称:输入 mgmt-syslog-contract
    • 范围:选择上下文
  4. 点击提交
  5. 展开合同,然后点击主题
  6. 点击创建合同主题
  7. 提供以下配置详细信息:
    • 名称:输入 syslog-subject
    • 同时应用两个方向:选中此选项。
  8. 点击提交
  9. 展开相应主题,然后点击过滤条件
  10. 点击创建过滤器绑定
  11. 选择 syslog-udp-514 过滤条件。
  12. 点击提交

配置 Syslog 目标群组

  1. 依次前往管理 > 外部数据收集器 > 监控目标 > Syslog
  2. 右键点击 Syslog,然后选择创建 Syslog 监控目标组
  3. 提供以下配置详细信息:
    • 名称:输入 Chronicle-Syslog-Group
    • Admin State(管理员状态):选择 Enabled(已启用)。
    • 格式:选择 aci
  4. 点击下一步
  5. 创建 Syslog 监控目标对话框中执行以下操作:
    • 名称:输入 Chronicle-BindPlane
    • 主机:输入 Bindplane 代理服务器的 IP 地址。
    • 端口:输入 514
    • Admin State(管理员状态):选择 Enabled(已启用)。
    • 严重程度:选择信息(以捕获详细日志)。
  6. 点击提交

配置监控政策

Fabric 监控政策
  1. 依次前往 Fabric > Fabric Policies > Policies > Monitoring > Common Policy
  2. 展开 Callhome/Smart Callhome/SNMP/Syslog/TACACS
  3. 右键点击 Syslog,然后选择 Create Syslog Source
  4. 提供以下配置详细信息:
    • 名称:输入 Chronicle-Fabric-Syslog
    • 审核日志:选中此复选框可包含审核事件。
    • 事件:选中此复选框可包含系统事件。
    • 故障:选中此复选框可包含故障事件。
    • 会话日志:选中此复选框可包含会话日志。
    • 目标群组:选择 Chronicle-Syslog-Group
  5. 点击提交
访问监控政策
  1. 依次前往 Fabric > Access Policies > Policies > Monitoring > Default Policy
  2. 展开 Callhome/Smart Callhome/SNMP/Syslog
  3. 右键点击 Syslog,然后选择 Create Syslog Source
  4. 提供以下配置详细信息:
    • 名称:输入 Chronicle-Access-Syslog
    • 审核日志:选中此复选框可包含审核事件。
    • 事件:选中此复选框可包含系统事件。
    • 故障:选中此复选框可包含故障事件。
    • 会话日志:选中此复选框可包含会话日志。
    • 目标群组:选择 Chronicle-Syslog-Group
  5. 点击提交

配置系统 Syslog 消息政策

  1. 依次前往 Fabric > Fabric Policies > Policies > Monitoring > Common Policy
  2. 展开 Syslog 消息政策
  3. 点击默认
  4. 设施过滤条件部分中:
    • 设施:选择默认
    • 最低严重程度:更改为信息
  5. 点击提交

方案 2:使用 Google Cloud Storage 的 JSON

此选项使用 APIC REST API 从 Cisco ACI 交换矩阵收集 JSON 格式的事件、故障和审核日志,并将它们存储在 Google Cloud Storage 中,以供 Google SecOps 提取。

准备工作

确保您满足以下前提条件:

  • Google SecOps 实例
  • 启用了 Cloud Storage API 的 GCP 项目
  • 创建和管理 GCS 存储分区的权限
  • 管理 GCS 存储分区的 IAM 政策的权限
  • 创建 Cloud Run 服务、Pub/Sub 主题和 Cloud Scheduler 作业的权限
  • 对 Cisco APIC 控制台的特权访问权限

收集 Cisco ACI APIC 前提条件

获取 APIC 凭据

  1. 使用 HTTPS 登录 Cisco APIC 控制台。
  2. 前往管理 > AAA(在 APIC 6.0 及更高版本中)或管理 > 身份验证 > AAA(在旧版本中)。

  3. 创建或使用具有相应权限的现有本地用户。

  4. 复制以下详细信息并将其保存在安全的位置:

    • APIC 用户名:对监控数据具有读取权限的本地用户
    • APIC 密码:用户密码
    • APIC 网址:APIC 的 HTTPS 网址(例如 https://apic.example.com

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储分区命名 输入一个全局唯一的名称(例如 cisco-aci-logs
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

为 Cloud Run 函数创建服务账号

Cloud Run 函数需要一个服务账号,该账号具有向 GCS 存储分区写入内容以及被 Pub/Sub 调用的权限。

创建服务账号

  1. GCP 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 cisco-aci-collector-sa
    • 服务账号说明:输入 Service account for Cloud Run function to collect Cisco ACI logs
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分中,添加以下角色:
    1. 点击选择角色
    2. 搜索并选择 Storage Object Admin
    3. 点击 + 添加其他角色
    4. 搜索并选择 Cloud Run Invoker
    5. 点击 + 添加其他角色
    6. 搜索并选择 Cloud Functions Invoker
  6. 点击继续
  7. 点击完成

必须拥有这些角色,才能:

  • Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
  • Cloud Run Invoker:允许 Pub/Sub 调用函数
  • Cloud Functions Invoker:允许调用函数

授予对 GCS 存储分区的 IAM 权限

向服务账号 (cisco-aci-collector-sa) 授予对 GCS 存储分区的写入权限:

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称(例如 cisco-aci-logs)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:输入服务账号电子邮件地址(例如 cisco-aci-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建发布/订阅主题

创建一个 Pub/Sub 主题,供 Cloud Scheduler 发布消息,并供 Cloud Run 函数订阅。

  1. GCP 控制台中,前往 Pub/Sub > 主题
  2. 点击创建主题
  3. 提供以下配置详细信息:
    • 主题 ID:输入 cisco-aci-trigger
    • 将其他设置保留为默认值。
  4. 点击创建

创建 Cloud Run 函数以收集日志

Cloud Run 函数将由来自 Cloud Scheduler 的 Pub/Sub 消息触发,以从 Cisco APIC REST API 中提取日志并将其写入 GCS。

  1. GCP 控制台中,前往 Cloud Run
  2. 点击创建服务
  3. 选择函数(使用内嵌编辑器创建函数)。
  4. 配置部分中,提供以下配置详细信息:

    设置
    Service 名称 cisco-aci-collector
    区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1
    运行时 选择 Python 3.12 或更高版本
  5. 触发器(可选)部分中:

    1. 点击 + 添加触发器
    2. 选择 Cloud Pub/Sub
    3. 选择 Cloud Pub/Sub 主题中,选择 Pub/Sub 主题 (cisco-aci-trigger)。
    4. 点击保存
  6. 身份验证部分中:

    • 选择需要进行身份验证
    • 选择 Identity and Access Management (IAM)
  1. 滚动到容器、网络、安全性并展开该部分。
  2. 前往安全性标签页:
    • 服务账号:选择服务账号 (cisco-aci-collector-sa)。
  3. 前往容器标签页:

    • 点击变量和密钥
    • 为每个环境变量点击 + 添加变量

      变量名称 示例值 说明
      GCS_BUCKET cisco-aci-logs GCS 存储分区名称
      GCS_PREFIX cisco-aci-events 日志文件的前缀
      STATE_KEY cisco-aci-events/state.json 状态文件路径
      APIC_URL https://apic.example.com APIC HTTPS 网址
      APIC_USERNAME your-apic-username APIC 用户名
      APIC_PASSWORD your-apic-password APIC 密码
      PAGE_SIZE 100 每页记录数
      MAX_PAGES 10 每次运行的页数上限
  4. 变量和 Secret 部分中,滚动到请求

    • 请求超时:输入 300 秒(5 分钟)。
  5. 前往设置标签页:

    • 资源部分中:
      • 内存:选择 512 MiB 或更高值。
      • CPU:选择 1
  6. 修订版本扩缩部分中:

    • 实例数下限:输入 0
    • 实例数上限:输入 100(或根据预期负载进行调整)。
  7. 点击创建

  8. 等待服务创建完成(1-2 分钟)。

  9. 创建服务后,系统会自动打开内嵌代码编辑器

添加函数代码

  1. 入口点字段中输入 main
  2. 在内嵌代码编辑器中,创建两个文件:

    • 第一个文件:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import logging
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=60.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'cisco-aci-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'cisco-aci-events/state.json')
    APIC_URL = os.environ.get('APIC_URL')
    APIC_USERNAME = os.environ.get('APIC_USERNAME')
    APIC_PASSWORD = os.environ.get('APIC_PASSWORD')
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Cisco ACI logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, APIC_URL, APIC_USERNAME, APIC_PASSWORD]):
            logger.error('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            last_timestamp = state.get('last_timestamp')
            if not last_timestamp:
                last_timestamp = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z'
    
            logger.info(f"Starting Cisco ACI data collection for bucket: {GCS_BUCKET}")
    
            # Authenticate to APIC
            session_token = authenticate_apic(APIC_URL, APIC_USERNAME, APIC_PASSWORD)
            headers = {
                'Cookie': f'APIC-cookie={session_token}',
                'Accept': 'application/json',
                'Content-Type': 'application/json'
            }
    
            # Data types to collect
            data_types = ['faultInst', 'eventRecord', 'aaaModLR']
            all_collected_data = []
    
            for data_type in data_types:
                logger.info(f"Collecting {data_type} data")
                collected_data = collect_aci_data(
                    APIC_URL,
                    headers,
                    data_type,
                    last_timestamp,
                    PAGE_SIZE,
                    MAX_PAGES
                )
    
                # Tag each record with its type
                for record in collected_data:
                    record['_data_type'] = data_type
    
                all_collected_data.extend(collected_data)
                logger.info(f"Collected {len(collected_data)} {data_type} records")
    
            logger.info(f"Total records collected: {len(all_collected_data)}")
    
            # Store data in GCS if any were collected
            if all_collected_data:
                timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f"{GCS_PREFIX}/cisco_aci_events_{timestamp_str}.ndjson"
    
                # Convert to NDJSON format (one JSON object per line)
                ndjson_content = '\n'.join(json.dumps(record) for record in all_collected_data)
    
                # Upload to GCS
                blob = bucket.blob(s3_key)
                blob.upload_from_string(
                    ndjson_content,
                    content_type='application/x-ndjson'
                )
    
                logger.info(f"Uploaded {len(all_collected_data)} records to gs://{GCS_BUCKET}/{s3_key}")
    
                # Update state file with latest timestamp from collected data
                latest_timestamp = get_latest_timestamp_from_records(all_collected_data)
                if not latest_timestamp:
                    latest_timestamp = datetime.utcnow().isoformat() + 'Z'
    
                update_state(bucket, STATE_KEY, latest_timestamp)
            else:
                logger.info("No new log records found.")
    
            logger.info(f"Successfully processed {len(all_collected_data)} records")
    
        except Exception as e:
            logger.error(f'Error processing logs: {str(e)}')
            raise
    
    def authenticate_apic(apic_url, username, password):
        """Authenticate to APIC and return session token"""
        login_url = f"{apic_url}/api/aaaLogin.json"
        login_data = {
            "aaaUser": {
                "attributes": {
                    "name": username,
                    "pwd": password
                }
            }
        }
    
        response = http.request(
            'POST',
            login_url,
            body=json.dumps(login_data).encode('utf-8'),
            headers={'Content-Type': 'application/json'},
            timeout=30
        )
    
        if response.status != 200:
            raise RuntimeError(f"APIC authentication failed: {response.status} {response.data[:256]!r}")
    
        response_data = json.loads(response.data.decode('utf-8'))
        token = response_data['imdata'][0]['aaaLogin']['attributes']['token']
        logger.info("Successfully authenticated to APIC")
        return token
    
    def collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages):
        """Collect data from APIC REST API with pagination"""
        all_data = []
        page = 0
    
        while page < max_pages:
            # Build API URL with pagination and time filters
            api_url = f"{apic_url}/api/class/{data_type}.json"
            params = [
                f'page-size={page_size}',
                f'page={page}',
                f'order-by={data_type}.created|asc'
            ]
    
            # Add time filter to prevent duplicates
            if last_timestamp:
                params.append(f'query-target-filter=gt({data_type}.created,"{last_timestamp}")')
    
            full_url = f"{api_url}?{'&'.join(params)}"
    
            logger.info(f"Fetching {data_type} page {page} from APIC")
    
            # Make API request
            response = http.request('GET', full_url, headers=headers, timeout=60)
    
            if response.status != 200:
                logger.error(f"API request failed: {response.status} {response.data[:256]!r}")
                break
    
            data = json.loads(response.data.decode('utf-8'))
            records = data.get('imdata', [])
    
            if not records:
                logger.info(f"No more {data_type} records found")
                break
    
            # Extract the actual data from APIC format
            extracted_records = []
            for record in records:
                if data_type in record:
                    extracted_records.append(record[data_type])
    
            all_data.extend(extracted_records)
            page += 1
    
            # If we got less than page_size records, we've reached the end
            if len(records) < page_size:
                break
    
        return all_data
    
    def get_last_timestamp(bucket, state_key):
        """Get the last run timestamp from GCS state file"""
        try:
            blob = bucket.blob(state_key)
            if blob.exists():
                state_data = blob.download_as_text()
                state = json.loads(state_data)
                return state.get('last_timestamp')
        except Exception as e:
            logger.warning(f"Error reading state file: {str(e)}")
    
        return None
    
    def get_latest_timestamp_from_records(records):
        """Get the latest timestamp from collected records to prevent missing events"""
        if not records:
            return None
    
        latest = None
        latest_time = None
    
        for record in records:
            try:
                # Handle both direct attributes and nested structure
                attrs = record.get('attributes', record)
                created = attrs.get('created')
                modTs = attrs.get('modTs')
    
                # Use created or modTs as fallback
                timestamp = created or modTs
    
                if timestamp:
                    if latest_time is None or timestamp > latest_time:
                        latest_time = timestamp
                        latest = record
            except Exception as e:
                logger.debug(f"Error parsing timestamp from record: {e}")
                continue
    
        return latest_time
    
    def update_state(bucket, state_key, timestamp):
        """Update the state file with the current timestamp"""
        try:
            state_data = {
                'last_timestamp': timestamp,
                'updated_at': datetime.utcnow().isoformat() + 'Z'
            }
            blob = bucket.blob(state_key)
            blob.upload_from_string(
                json.dumps(state_data),
                content_type='application/json'
            )
            logger.info(f"Updated state file with timestamp: {timestamp}")
        except Exception as e:
            logger.error(f"Error updating state file: {str(e)}")
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            logger.warning(f"Could not load state: {e}")
    
        return {}
    
    • 第二个文件:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. 点击部署以保存并部署该函数。

  4. 等待部署完成(2-3 分钟)。

创建 Cloud Scheduler 作业

Cloud Scheduler 会定期向 Pub/Sub 主题发布消息,从而触发 Cloud Run 函数。

  1. GCP Console 中,前往 Cloud Scheduler
  2. 点击创建作业
  3. 提供以下配置详细信息:

    设置
    名称 cisco-aci-collector-15m
    区域 选择与 Cloud Run 函数相同的区域
    频率 */15 * * * *(每 15 分钟)
    时区 选择时区(建议选择世界协调时间 [UTC])
    目标类型 Pub/Sub
    主题 选择 Pub/Sub 主题 (cisco-aci-trigger)
    消息正文 {}(空 JSON 对象)
  4. 点击创建

时间表频率选项

  • 根据日志量和延迟时间要求选择频次:

    频率 Cron 表达式 使用场景
    每隔 5 分钟 */5 * * * * 高容量、低延迟
    每隔 15 分钟 */15 * * * * 中等搜索量(推荐)
    每小时 0 * * * * 标准
    每 6 小时 0 */6 * * * 量小、批处理
    每天 0 0 * * * 历史数据收集

测试集成

  1. Cloud Scheduler 控制台中,找到您的作业 (cisco-aci-collector-15m)。
  2. 点击强制运行以手动触发作业。
  3. 等待几秒钟。
  4. 前往 Cloud Run > 服务
  5. 点击函数名称 (cisco-aci-collector)。
  6. 点击日志标签页。
  7. 验证函数是否已成功执行。查找以下项:

    Starting Cisco ACI data collection for bucket: cisco-aci-logs
    Successfully authenticated to APIC
    Collecting faultInst data
    Collected X faultInst records
    Collecting eventRecord data
    Collected X eventRecord records
    Collecting aaaModLR data
    Collected X aaaModLR records
    Total records collected: X
    Uploaded X records to gs://cisco-aci-logs/cisco-aci-events/cisco_aci_events_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. 前往 Cloud Storage > 存储分区

  9. 点击您的存储分区名称 (cisco-aci-logs)。

  10. 前往前缀文件夹 (cisco-aci-events/)。

  11. 验证是否已创建具有当前时间戳的新 .ndjson 文件。

如果您在日志中看到错误,请执行以下操作:

  • HTTP 401:检查环境变量中的 APIC 凭据
  • HTTP 403:验证 APIC 账号是否对 faultInsteventRecordaaaModLR 类具有读取权限
  • 连接错误:验证 Cloud Run 函数是否可以通过 TCP/443 访问 APIC 网址
  • 缺少环境变量:检查是否已设置所有必需的变量

检索 Google SecOps 服务账号

Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Cisco ACI JSON logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Cisco Application Centric Infrastructure 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址。您将在下一步骤中用到它。

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称 (cisco-aci-logs)。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

在 Google SecOps 中配置 Feed 以注入 Cisco ACI 日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Cisco ACI JSON logs)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Cisco Application Centric Infrastructure 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:

      gs://cisco-aci-logs/cisco-aci-events/
      
      • 替换:
        • cisco-aci-logs:您的 GCS 存储分区名称。
        • cisco-aci-events:存储日志的前缀/文件夹路径。
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不删除转移后的任何文件(建议用于测试)。
      • 删除已转移的文件:在成功转移后删除文件。
      • 删除已转移的文件和空目录:在成功转移后删除文件和空目录。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 资产命名空间资产命名空间

    • 注入标签:应用于此 Feed 中事件的标签。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
@timestamp read_only_udm.metadata.event_timestamp 值取自原始日志字段“@timestamp”,并解析为时间戳。
aci_tag read_only_udm.metadata.product_log_id 值取自原始日志字段“aci_tag”。
cisco_timestamp - 未映射。
DIP read_only_udm.target.ip 值取自原始日志字段“DIP”。
DPort read_only_udm.target.port 值取自原始日志字段“DPort”,并转换为整数。
说明 read_only_udm.security_result.description 值取自原始日志字段“description”。
fault_cause read_only_udm.additional.fields.value.string_value 值取自原始日志字段“fault_cause”。该键设置为“故障原因”。
主机名 read_only_udm.principal.hostname 值取自原始日志字段“hostname”。
lifecycle_state read_only_udm.metadata.product_event_type 值取自原始日志字段“lifecycle_state”。
log.source.address - 未映射。
logstash.collect.host - 未映射。
logstash.collect.timestamp read_only_udm.metadata.collected_timestamp 值取自原始日志字段“logstash.collect.timestamp”,并解析为时间戳。
logstash.ingest.host read_only_udm.intermediary.hostname 值取自原始日志字段“logstash.ingest.host”。
logstash.irm_environment read_only_udm.additional.fields.value.string_value 值取自原始日志字段“logstash.irm_environment”。该键设置为“IRM_Environment”。
logstash.irm_region read_only_udm.additional.fields.value.string_value 值取自原始日志字段“logstash.irm_region”。该键设置为“IRM_Region”。
logstash.irm_site read_only_udm.additional.fields.value.string_value 值取自原始日志字段“logstash.irm_site”。该键设置为“IRM_Site”。
logstash.process.host read_only_udm.intermediary.hostname 值取自原始日志字段“logstash.process.host”。
私信 - 未映射。
message_class - 未映射。
message_code - 未映射。
message_content - 未映射。
message_dn - 未映射。
message_type read_only_udm.metadata.product_event_type 该值取自原始日志字段“message_type”,并移除了方括号。
node_link read_only_udm.principal.process.file.full_path 值取自原始日志字段“node_link”。
PktLen read_only_udm.network.received_bytes 值取自原始日志字段“PktLen”,并转换为无符号整数。
程序 - 未映射。
Proto read_only_udm.network.ip_protocol 值取自原始日志字段“Proto”,转换为整数,并映射到相应的 IP 协议名称(例如,6 -> TCP)。
SIP read_only_udm.principal.ip 值取自原始日志字段“SIP”。
SPort read_only_udm.principal.port 值取自原始日志字段“SPort”,并转换为整数。
syslog_facility - 未映射。
syslog_facility_code - 未映射。
syslog_host read_only_udm.principal.ip、read_only_udm.observer.ip 值取自原始日志字段“syslog_host”。
syslog_prog - 未映射。
syslog_severity read_only_udm.security_result.severity_details 值取自原始日志字段“syslog_severity”。
syslog_severity_code read_only_udm.security_result.severity 该值取自原始日志字段“syslog_severity_code”,并映射到相应的严重程度级别:5、6、7 -> INFORMATIONAL;3、4 -> MEDIUM;0、1、2 -> HIGH。
syslog5424_pri - 未映射。
Vlan-Id read_only_udm.principal.resource.id 值取自原始日志字段“Vlan-Id”。
- read_only_udm.metadata.event_type 逻辑:如果存在“SIP”或“hostname”,并且存在“Proto”,则设置为“NETWORK_CONNECTION”。否则,如果存在“SIP”“hostname”或“syslog_host”,则设置为“STATUS_UPDATE”。否则,设置为“GENERIC_EVENT”。
- read_only_udm.metadata.log_type 逻辑:设置为“CISCO_ACI”。
- read_only_udm.metadata.vendor_name 逻辑:设置为“Cisco”。
- read_only_udm.metadata.product_name 逻辑:设置为“ACI”。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。