Coletar registros do Cisco Application Centric Infrastructure (ACI)

Compatível com:

Este documento explica como ingerir registros da infraestrutura centrada em aplicativos (ACI, na sigla em inglês) da Cisco no Google Security Operations. Primeiro, o analisador tenta processar os registros do Cisco ACI como mensagens syslog usando padrões Grok. Se a análise do syslog falhar, a mensagem será considerada no formato JSON e analisada de acordo. Por fim, ele mapeia os campos extraídos para o modelo de dados unificado (UDM).

Essa integração é compatível com dois métodos:

  • Opção 1: formato Syslog usando o agente do Bindplane
  • Opção 2: formato JSON usando o Google Cloud Storage com a API REST do APIC

Cada opção é independente e pode ser implementada separadamente com base nos requisitos de infraestrutura e nas preferências de formato de registro.

Opção 1: Syslog usando o agente do Bindplane

Essa opção configura a estrutura do Cisco ACI para enviar mensagens syslog a um agente do Bindplane, que as encaminha para o Google Security Operations para análise.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Uma instância do Google SecOps
  • Um host Windows 2016 ou mais recente ou Linux com systemd
  • Se você estiver executando por trás de um proxy, verifique se as portas do firewall estão abertas de acordo com os requisitos do agente do Bindplane.
  • Acesso privilegiado ao console do Cisco APIC

Receber o arquivo de autenticação de ingestão do Google SecOps

  1. Acesse Configurações do SIEM > Agentes de coleta.
  2. Baixe o arquivo de autenticação de ingestão.
  3. Salve o arquivo de forma segura no sistema em que o Bindplane será instalado.

Receber o ID de cliente do Google SecOps

  1. Acesse Configurações do SIEM > Perfil.
  2. Copie e salve o ID do cliente na seção Detalhes da organização.

Instalar o agente do Bindplane

Instale o agente do Bindplane no seu sistema operacional Windows ou Linux de acordo com as instruções a seguir.

Instalação do Windows

  1. Abra o prompt de comando ou o PowerShell como administrador.
  2. Execute este comando:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Instalação do Linux

  1. Abra um terminal com privilégios de root ou sudo.
  2. Execute este comando:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

Para mais opções de instalação, consulte o guia de instalação do agente do Bindplane.

Configurar o agente do Bindplane para ingerir o Syslog e enviar ao Google SecOps

Acessar o arquivo de configuração

  1. Localize o arquivo config.yaml. Normalmente, ele fica no diretório /etc/bindplane-agent/ no Linux ou no diretório de instalação no Windows.
  2. Abra o arquivo usando um editor de texto (por exemplo, nano, vi ou Bloco de Notas).
  3. Edite o arquivo config.yaml:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'CISCO_ACI'
        raw_log_field: body
        ingestion_labels:
          service:
    
    pipelines:
      logs/source0__chronicle_w_labels-0:
        receivers:
          - udplog
        exporters:
          - chronicle/chronicle_w_labels
    
    • Substitua:
      • Substitua a porta e o endereço IP conforme necessário na sua infraestrutura.
      • Substitua <CUSTOMER_ID> pelo ID do cliente real.
      • Atualize /path/to/ingestion-authentication-file.json para o caminho em que o arquivo de autenticação foi salvo.

Reinicie o agente do Bindplane para aplicar as mudanças

  • Para reiniciar o agente do Bindplane em Linux, execute o seguinte comando:

    sudo systemctl restart bindplane-agent
    
  • Para reiniciar o agente do Bindplane no Windows, use o console de serviços ou insira o seguinte comando:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Configurar o encaminhamento de syslog no Cisco ACI

Configurar o contrato de gerenciamento fora da banda

  1. Faça login no console do Cisco APIC.
  2. Acesse Locatários > gerenciamento > Contratos > Filtros.
  3. Clique em Criar filtro.
  4. Informe os seguintes detalhes de configuração:
    • Nome: insira syslog-udp-514.
    • Nome da entrada: insira syslog.
    • EtherType: selecione IP.
    • Protocolo IP: selecione UDP.
    • Intervalo de porta de destino de: insira 514.
    • Intervalo de porta de destino até: insira 514.
  5. Clique em Enviar.

Criar contrato de gestão

  1. Acesse Locatários > gerenciamento > Contratos > Padrão.
  2. Clique em Criar contrato.
  3. Informe os seguintes detalhes de configuração:
    • Nome: insira mgmt-syslog-contract.
    • Escopo: selecione Contexto.
  4. Clique em Enviar.
  5. Abra o contrato e clique em Assuntos.
  6. Clique em Criar assunto do contrato.
  7. Informe os seguintes detalhes de configuração:
    • Nome: insira syslog-subject.
    • Aplicar ambas as direções: marque essa opção.
  8. Clique em Enviar.
  9. Expanda o assunto e clique em Filtros.
  10. Clique em Criar vinculação de filtro.
  11. Selecione o filtro syslog-udp-514.
  12. Clique em Enviar.

Configurar grupo de destino do Syslog

  1. Acesse Administrador > Coletores de dados externos > Destinos de monitoramento > Syslog.
  2. Clique com o botão direito do mouse em Syslog e selecione Criar grupo de destino de monitoramento do Syslog.
  3. Informe os seguintes detalhes de configuração:
    • Nome: insira Chronicle-Syslog-Group.
    • Estado do administrador: selecione Ativado.
    • Formato: selecione aci.
  4. Clique em Próxima.
  5. Na caixa de diálogo Criar destino de monitoramento do Syslog:
    • Nome: insira Chronicle-BindPlane.
    • Host: insira o endereço IP do servidor do agente do Bindplane.
    • Porta: insira 514.
    • Estado do administrador: selecione Ativado.
    • Gravidade: selecione Informações para capturar registros detalhados.
  6. Clique em Enviar.

Configurar políticas de monitoramento

Política de monitoramento do Fabric
  1. Acesse Fábrica > Políticas da fábrica > Políticas > Monitoramento > Política comum.
  2. Expanda Callhome/Smart Callhome/SNMP/Syslog/TACACS.
  3. Clique com o botão direito do mouse em Syslog e selecione Criar fonte Syslog.
  4. Informe os seguintes detalhes de configuração:
    • Nome: insira Chronicle-Fabric-Syslog.
    • Registros de auditoria: marque para incluir eventos de auditoria.
    • Eventos: marque para incluir eventos do sistema.
    • Falhas: marque para incluir eventos de falha.
    • Registros de sessão: marque para incluir registros de sessão.
    • Grupo de destino: selecione Chronicle-Syslog-Group.
  5. Clique em Enviar.
Política de monitoramento de acesso
  1. Acesse Fabric > Políticas de acesso > Políticas > Monitoramento > Política padrão.
  2. Expanda Callhome/Smart Callhome/SNMP/Syslog.
  3. Clique com o botão direito do mouse em Syslog e selecione Criar fonte Syslog.
  4. Informe os seguintes detalhes de configuração:
    • Nome: insira Chronicle-Access-Syslog.
    • Registros de auditoria: marque para incluir eventos de auditoria.
    • Eventos: marque para incluir eventos do sistema.
    • Falhas: marque para incluir eventos de falha.
    • Registros de sessão: marque para incluir registros de sessão.
    • Grupo de destino: selecione Chronicle-Syslog-Group.
  5. Clique em Enviar.

Configurar a política de mensagens do Syslog do sistema

  1. Acesse Fábrica > Políticas da fábrica > Políticas > Monitoramento > Política comum.
  2. Expanda Políticas de mensagens Syslog.
  3. Selecione padrão.
  4. Na seção Filtro de unidade:
    • Unidade: selecione padrão.
    • Gravidade mínima: mude para informações.
  5. Clique em Enviar.

Opção 2: JSON usando o Google Cloud Storage

Essa opção usa a API REST do APIC para coletar eventos, falhas e registros de auditoria formatados em JSON da estrutura do Cisco ACI e os armazena no Google Cloud Storage para ingestão do Google SecOps.

Antes de começar

Verifique se você atende os seguintes pré-requisitos:

  • Uma instância do Google SecOps
  • Projeto do GCP com a API Cloud Storage ativada
  • Permissões para criar e gerenciar buckets do GCS
  • Permissões para gerenciar políticas do IAM em buckets do GCS
  • Permissões para criar serviços do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
  • Acesso privilegiado ao console do Cisco APIC

Coletar pré-requisitos do Cisco ACI APIC

Receber credenciais do APIC

  1. Faça login no console do Cisco APIC usando HTTPS.
  2. Acesse Admin > AAA (no APIC 6.0 ou mais recente) ou Admin > Authentication > AAA (em versões mais antigas).

  3. Crie ou use um usuário local com os privilégios adequados.

  4. Copie e salve em um local seguro os seguintes detalhes:

    • Nome de usuário do APIC: usuário local com acesso de leitura aos dados de monitoramento
    • Senha do APIC: senha do usuário
    • URL do APIC: o URL HTTPS do seu APIC (por exemplo, https://apic.example.com)

Criar um bucket do Google Cloud Storage

  1. Acesse o console doGoogle Cloud .
  2. Selecione seu projeto ou crie um novo.
  3. No menu de navegação, acesse Cloud Storage > Buckets.
  4. Clique em Criar bucket.
  5. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, cisco-aci-logs.
    Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional)
    Local Selecione o local (por exemplo, us-central1).
    Classe de armazenamento Padrão (recomendado para registros acessados com frequência)
    Controle de acesso Uniforme (recomendado)
    Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção
  6. Clique em Criar.

Criar uma conta de serviço para a função do Cloud Run

A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS e ser invocada pelo Pub/Sub.

Criar conta de serviço

  1. No Console do GCP, acesse IAM e administrador > Contas de serviço.
  2. Clique em Criar conta de serviço.
  3. Informe os seguintes detalhes de configuração:
    • Nome da conta de serviço: insira cisco-aci-collector-sa.
    • Descrição da conta de serviço: insira Service account for Cloud Run function to collect Cisco ACI logs.
  4. Clique em Criar e continuar.
  5. Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
    1. Clique em Selecionar papel.
    2. Pesquise e selecione Administrador de objetos do Storage.
    3. Clique em + Adicionar outro papel.
    4. Pesquise e selecione Invocador do Cloud Run.
    5. Clique em + Adicionar outro papel.
    6. Pesquise e selecione Invocador do Cloud Functions.
  6. Clique em Continuar.
  7. Clique em Concluído.

Esses papéis são necessários para:

  • Administrador de objetos do Storage: grava registros em um bucket do GCS e gerencia arquivos de estado.
  • Invocador do Cloud Run: permite que o Pub/Sub invoque a função
  • Invocador do Cloud Functions: permite a invocação de funções

Conceder permissões do IAM no bucket do GCS

Conceda à conta de serviço (cisco-aci-collector-sa) permissões de gravação no bucket do GCS:

  1. Acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket (por exemplo, cisco-aci-logs).
  3. Acesse a guia Permissões.
  4. Clique em Conceder acesso.
  5. Informe os seguintes detalhes de configuração:
    • Adicionar principais: insira o e-mail da conta de serviço (por exemplo, cisco-aci-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Atribuir papéis: selecione Administrador de objetos do Storage.
  6. Clique em Salvar.

Criar tópico Pub/Sub

Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.

  1. No Console do GCP, acesse Pub/Sub > Tópicos.
  2. Selecione Criar tópico.
  3. Informe os seguintes detalhes de configuração:
    • ID do tópico: insira cisco-aci-trigger.
    • Não altere as outras configurações.
  4. Clique em Criar.

Criar uma função do Cloud Run para coletar registros

A função do Cloud Run será acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar registros da API REST do Cisco APIC e gravá-los no GCS.

  1. No console do GCP, acesse o Cloud Run.
  2. Clique em Criar serviço.
  3. Selecione Função (use um editor in-line para criar uma função).
  4. Na seção Configurar, forneça os seguintes detalhes de configuração:

    Configuração Valor
    Nome do serviço cisco-aci-collector
    Região Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).
    Ambiente de execução Selecione Python 3.12 ou uma versão mais recente.
  5. Na seção Acionador (opcional):

    1. Clique em + Adicionar gatilho.
    2. Selecione Cloud Pub/Sub.
    3. Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico do Pub/Sub (cisco-aci-trigger).
    4. Clique em Salvar.
  6. Na seção Autenticação:

    • Selecione Exigir autenticação.
    • Selecione Identity and Access Management (IAM).
  1. Role a tela para baixo e abra Contêineres, rede, segurança.
  2. Acesse a guia Segurança:
    • Conta de serviço: selecione a conta de serviço (cisco-aci-collector-sa).
  3. Acesse a guia Contêineres:

    • Clique em Variáveis e secrets.
    • Clique em + Adicionar variável para cada variável de ambiente:

      Nome da variável Valor de exemplo Descrição
      GCS_BUCKET cisco-aci-logs Nome do bucket do GCS
      GCS_PREFIX cisco-aci-events Prefixo para arquivos de registro
      STATE_KEY cisco-aci-events/state.json Caminho do arquivo de estado
      APIC_URL https://apic.example.com URL HTTPS do APIC
      APIC_USERNAME your-apic-username Nome de usuário do APIC
      APIC_PASSWORD your-apic-password Senha do APIC
      PAGE_SIZE 100 Registros por página
      MAX_PAGES 10 Número máximo de páginas por execução
  4. Na seção Variáveis e secrets, role a tela até Solicitações:

    • Tempo limite da solicitação: insira 300 segundos (5 minutos).
  5. Acesse a guia Configurações:

    • Na seção Recursos:
      • Memória: selecione 512 MiB ou mais.
      • CPU: selecione 1.
  6. Na seção Escalonamento de revisão:

    • Número mínimo de instâncias: insira 0.
    • Número máximo de instâncias: insira 100 ou ajuste com base na carga esperada.
  7. Clique em Criar.

  8. Aguarde a criação do serviço (1 a 2 minutos).

  9. Depois que o serviço for criado, o editor de código inline será aberto automaticamente.

Adicionar código da função

  1. Insira main no campo Ponto de entrada.
  2. No editor de código em linha, crie dois arquivos:

    • Primeiro arquivo: main.py::
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import logging
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=60.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'cisco-aci-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'cisco-aci-events/state.json')
    APIC_URL = os.environ.get('APIC_URL')
    APIC_USERNAME = os.environ.get('APIC_USERNAME')
    APIC_PASSWORD = os.environ.get('APIC_PASSWORD')
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Cisco ACI logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, APIC_URL, APIC_USERNAME, APIC_PASSWORD]):
            logger.error('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            last_timestamp = state.get('last_timestamp')
            if not last_timestamp:
                last_timestamp = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z'
    
            logger.info(f"Starting Cisco ACI data collection for bucket: {GCS_BUCKET}")
    
            # Authenticate to APIC
            session_token = authenticate_apic(APIC_URL, APIC_USERNAME, APIC_PASSWORD)
            headers = {
                'Cookie': f'APIC-cookie={session_token}',
                'Accept': 'application/json',
                'Content-Type': 'application/json'
            }
    
            # Data types to collect
            data_types = ['faultInst', 'eventRecord', 'aaaModLR']
            all_collected_data = []
    
            for data_type in data_types:
                logger.info(f"Collecting {data_type} data")
                collected_data = collect_aci_data(
                    APIC_URL,
                    headers,
                    data_type,
                    last_timestamp,
                    PAGE_SIZE,
                    MAX_PAGES
                )
    
                # Tag each record with its type
                for record in collected_data:
                    record['_data_type'] = data_type
    
                all_collected_data.extend(collected_data)
                logger.info(f"Collected {len(collected_data)} {data_type} records")
    
            logger.info(f"Total records collected: {len(all_collected_data)}")
    
            # Store data in GCS if any were collected
            if all_collected_data:
                timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f"{GCS_PREFIX}/cisco_aci_events_{timestamp_str}.ndjson"
    
                # Convert to NDJSON format (one JSON object per line)
                ndjson_content = '\n'.join(json.dumps(record) for record in all_collected_data)
    
                # Upload to GCS
                blob = bucket.blob(s3_key)
                blob.upload_from_string(
                    ndjson_content,
                    content_type='application/x-ndjson'
                )
    
                logger.info(f"Uploaded {len(all_collected_data)} records to gs://{GCS_BUCKET}/{s3_key}")
    
                # Update state file with latest timestamp from collected data
                latest_timestamp = get_latest_timestamp_from_records(all_collected_data)
                if not latest_timestamp:
                    latest_timestamp = datetime.utcnow().isoformat() + 'Z'
    
                update_state(bucket, STATE_KEY, latest_timestamp)
            else:
                logger.info("No new log records found.")
    
            logger.info(f"Successfully processed {len(all_collected_data)} records")
    
        except Exception as e:
            logger.error(f'Error processing logs: {str(e)}')
            raise
    
    def authenticate_apic(apic_url, username, password):
        """Authenticate to APIC and return session token"""
        login_url = f"{apic_url}/api/aaaLogin.json"
        login_data = {
            "aaaUser": {
                "attributes": {
                    "name": username,
                    "pwd": password
                }
            }
        }
    
        response = http.request(
            'POST',
            login_url,
            body=json.dumps(login_data).encode('utf-8'),
            headers={'Content-Type': 'application/json'},
            timeout=30
        )
    
        if response.status != 200:
            raise RuntimeError(f"APIC authentication failed: {response.status} {response.data[:256]!r}")
    
        response_data = json.loads(response.data.decode('utf-8'))
        token = response_data['imdata'][0]['aaaLogin']['attributes']['token']
        logger.info("Successfully authenticated to APIC")
        return token
    
    def collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages):
        """Collect data from APIC REST API with pagination"""
        all_data = []
        page = 0
    
        while page < max_pages:
            # Build API URL with pagination and time filters
            api_url = f"{apic_url}/api/class/{data_type}.json"
            params = [
                f'page-size={page_size}',
                f'page={page}',
                f'order-by={data_type}.created|asc'
            ]
    
            # Add time filter to prevent duplicates
            if last_timestamp:
                params.append(f'query-target-filter=gt({data_type}.created,"{last_timestamp}")')
    
            full_url = f"{api_url}?{'&'.join(params)}"
    
            logger.info(f"Fetching {data_type} page {page} from APIC")
    
            # Make API request
            response = http.request('GET', full_url, headers=headers, timeout=60)
    
            if response.status != 200:
                logger.error(f"API request failed: {response.status} {response.data[:256]!r}")
                break
    
            data = json.loads(response.data.decode('utf-8'))
            records = data.get('imdata', [])
    
            if not records:
                logger.info(f"No more {data_type} records found")
                break
    
            # Extract the actual data from APIC format
            extracted_records = []
            for record in records:
                if data_type in record:
                    extracted_records.append(record[data_type])
    
            all_data.extend(extracted_records)
            page += 1
    
            # If we got less than page_size records, we've reached the end
            if len(records) < page_size:
                break
    
        return all_data
    
    def get_last_timestamp(bucket, state_key):
        """Get the last run timestamp from GCS state file"""
        try:
            blob = bucket.blob(state_key)
            if blob.exists():
                state_data = blob.download_as_text()
                state = json.loads(state_data)
                return state.get('last_timestamp')
        except Exception as e:
            logger.warning(f"Error reading state file: {str(e)}")
    
        return None
    
    def get_latest_timestamp_from_records(records):
        """Get the latest timestamp from collected records to prevent missing events"""
        if not records:
            return None
    
        latest = None
        latest_time = None
    
        for record in records:
            try:
                # Handle both direct attributes and nested structure
                attrs = record.get('attributes', record)
                created = attrs.get('created')
                modTs = attrs.get('modTs')
    
                # Use created or modTs as fallback
                timestamp = created or modTs
    
                if timestamp:
                    if latest_time is None or timestamp > latest_time:
                        latest_time = timestamp
                        latest = record
            except Exception as e:
                logger.debug(f"Error parsing timestamp from record: {e}")
                continue
    
        return latest_time
    
    def update_state(bucket, state_key, timestamp):
        """Update the state file with the current timestamp"""
        try:
            state_data = {
                'last_timestamp': timestamp,
                'updated_at': datetime.utcnow().isoformat() + 'Z'
            }
            blob = bucket.blob(state_key)
            blob.upload_from_string(
                json.dumps(state_data),
                content_type='application/json'
            )
            logger.info(f"Updated state file with timestamp: {timestamp}")
        except Exception as e:
            logger.error(f"Error updating state file: {str(e)}")
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            logger.warning(f"Could not load state: {e}")
    
        return {}
    
    • Segundo arquivo: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. Clique em Implantar para salvar e implantar a função.

  4. Aguarde a conclusão da implantação (2 a 3 minutos).

Criar o job do Cloud Scheduler

O Cloud Scheduler publica mensagens no tópico do Pub/Sub em intervalos regulares, acionando a função do Cloud Run.

  1. No Console do GCP, acesse o Cloud Scheduler.
  2. Clique em Criar job.
  3. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nome cisco-aci-collector-15m
    Região Selecione a mesma região da função do Cloud Run
    Frequência */15 * * * * (a cada 15 minutos)
    Fuso horário Selecione o fuso horário (UTC recomendado)
    Tipo de destino Pub/Sub
    Tópico Selecione o tópico do Pub/Sub (cisco-aci-trigger).
    Corpo da mensagem {} (objeto JSON vazio)
  4. Clique em Criar.

Opções de frequência de programação

  • Escolha a frequência com base no volume de registros e nos requisitos de latência:

    Frequência Expressão Cron Caso de uso
    A cada 5 minutos */5 * * * * Alto volume e baixa latência
    A cada 15 minutos */15 * * * * Volume médio (recomendado)
    A cada hora 0 * * * * Padrão
    A cada 6 horas 0 */6 * * * Baixo volume, processamento em lote
    Diário 0 0 * * * Coleta de dados históricos

Testar a integração

  1. No console do Cloud Scheduler, encontre seu job (cisco-aci-collector-15m).
  2. Clique em Executar à força para acionar o job manualmente.
  3. Aguarde alguns segundos.
  4. Acesse Cloud Run > Serviços.
  5. Clique no nome da função (cisco-aci-collector).
  6. Clique na guia Registros.
  7. Verifique se a função foi executada com sucesso. Procure o seguinte:

    Starting Cisco ACI data collection for bucket: cisco-aci-logs
    Successfully authenticated to APIC
    Collecting faultInst data
    Collected X faultInst records
    Collecting eventRecord data
    Collected X eventRecord records
    Collecting aaaModLR data
    Collected X aaaModLR records
    Total records collected: X
    Uploaded X records to gs://cisco-aci-logs/cisco-aci-events/cisco_aci_events_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Acesse Cloud Storage > Buckets.

  9. Clique no nome do bucket (cisco-aci-logs).

  10. Navegue até a pasta de prefixo (cisco-aci-events/).

  11. Verifique se um novo arquivo .ndjson foi criado com o carimbo de data/hora atual.

Se você encontrar erros nos registros:

  • HTTP 401: verifique as credenciais do APIC nas variáveis de ambiente
  • HTTP 403: verifique se a conta do APIC tem permissões de leitura para as classes faultInst, eventRecord e aaaModLR.
  • Erros de conexão: verifique se a função do Cloud Run pode acessar o URL da APIC em TCP/443.
  • Variáveis de ambiente ausentes: verifique se todas as variáveis necessárias estão definidas.

Recuperar a conta de serviço do Google SecOps

O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.

Receber o e-mail da conta de serviço

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. Clique em Configurar um único feed.
  4. No campo Nome do feed, insira um nome para o feed (por exemplo, Cisco ACI JSON logs).
  5. Selecione Google Cloud Storage V2 como o Tipo de origem.
  6. Selecione Infraestrutura centrada em aplicativos da Cisco como o Tipo de registro.
  7. Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço é exibido, por exemplo:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copie esse endereço de e-mail. Você vai usá-la na próxima etapa.

Conceder permissões do IAM à conta de serviço do Google SecOps

A conta de serviço do Google SecOps precisa do papel de Leitor de objetos do Storage no seu bucket do GCS.

  1. Acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket (cisco-aci-logs).
  3. Acesse a guia Permissões.
  4. Clique em Conceder acesso.
  5. Informe os seguintes detalhes de configuração:
    • Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps.
    • Atribuir papéis: selecione Leitor de objetos do Storage.
  6. Clique em Salvar.

Configurar um feed no Google SecOps para ingerir registros do Cisco ACI

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. Clique em Configurar um único feed.
  4. No campo Nome do feed, insira um nome para o feed (por exemplo, Cisco ACI JSON logs).
  5. Selecione Google Cloud Storage V2 como o Tipo de origem.
  6. Selecione Infraestrutura centrada em aplicativos da Cisco como o Tipo de registro.
  7. Clique em Próxima.
  8. Especifique valores para os seguintes parâmetros de entrada:

    • URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:

      gs://cisco-aci-logs/cisco-aci-events/
      
      • Substitua:
        • cisco-aci-logs: o nome do bucket do GCS.
        • cisco-aci-events: prefixo/caminho da pasta onde os registros são armazenados.
    • Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:

      • Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
      • Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
      • Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.
    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.

    • Namespace do recurso: o namespace do recurso.

    • Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.

  9. Clique em Próxima.

  10. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
@timestamp read_only_udm.metadata.event_timestamp O valor é extraído do campo de registro bruto "@timestamp" e analisado como um carimbo de data/hora.
aci_tag read_only_udm.metadata.product_log_id O valor é extraído do campo de registro bruto "aci_tag".
cisco_timestamp - Não mapeado.
DIP read_only_udm.target.ip O valor é extraído do campo de registro bruto "DIP".
DPort read_only_udm.target.port O valor é extraído do campo de registro bruto "DPort" e convertido em número inteiro.
descrição read_only_udm.security_result.description O valor é extraído do campo de registro bruto "description".
fault_cause read_only_udm.additional.fields.value.string_value O valor é extraído do campo de registro bruto "fault_cause". A chave é definida como "Causa da falha".
nome do host read_only_udm.principal.hostname O valor é extraído do campo de registro bruto "hostname".
lifecycle_state read_only_udm.metadata.product_event_type O valor é extraído do campo de registro bruto "lifecycle_state".
log.source.address - Não mapeado.
logstash.collect.host - Não mapeado.
logstash.collect.timestamp read_only_udm.metadata.collected_timestamp O valor é extraído do campo de registro bruto "logstash.collect.timestamp" e analisado como um carimbo de data/hora.
logstash.ingest.host read_only_udm.intermediary.hostname O valor é extraído do campo de registro bruto "logstash.ingest.host".
logstash.irm_environment read_only_udm.additional.fields.value.string_value O valor é extraído do campo de registro bruto "logstash.irm_environment". A chave é definida como "IRM_Environment".
logstash.irm_region read_only_udm.additional.fields.value.string_value O valor é extraído do campo de registro bruto "logstash.irm_region". A chave é definida como "IRM_Region".
logstash.irm_site read_only_udm.additional.fields.value.string_value O valor é extraído do campo de registro bruto "logstash.irm_site". A chave é definida como "IRM_Site".
logstash.process.host read_only_udm.intermediary.hostname O valor é extraído do campo de registro bruto "logstash.process.host".
mensagem - Não mapeado.
message_class - Não mapeado.
message_code - Não mapeado.
message_content - Não mapeado.
message_dn - Não mapeado.
message_type read_only_udm.metadata.product_event_type O valor é extraído do campo de registro bruto "message_type" após a remoção dos colchetes.
node_link read_only_udm.principal.process.file.full_path O valor é extraído do campo de registro bruto "node_link".
PktLen read_only_udm.network.received_bytes O valor é extraído do campo de registro bruto "PktLen" e convertido em um número inteiro sem sinal.
programa - Não mapeado.
Proto read_only_udm.network.ip_protocol O valor é extraído do campo de registro bruto "Proto", convertido em número inteiro e mapeado para o nome do protocolo IP correspondente (por exemplo, 6 -> TCP).
SIP read_only_udm.principal.ip O valor é extraído do campo de registro bruto "SIP".
SPort read_only_udm.principal.port O valor é extraído do campo de registro bruto "SPort" e convertido em número inteiro.
syslog_facility - Não mapeado.
syslog_facility_code - Não mapeado.
syslog_host read_only_udm.principal.ip, read_only_udm.observer.ip O valor é extraído do campo de registro bruto "syslog_host".
syslog_prog - Não mapeado.
syslog_severity read_only_udm.security_result.severity_details O valor é extraído do campo de registro bruto "syslog_severity".
syslog_severity_code read_only_udm.security_result.severity O valor é extraído do campo de registro bruto "syslog_severity_code" e mapeado para o nível de gravidade correspondente: 5, 6, 7 -> INFORMATIONAL; 3, 4 -> MEDIUM; 0, 1, 2 -> HIGH.
syslog5424_pri - Não mapeado.
Vlan-Id read_only_udm.principal.resource.id O valor é extraído do campo de registro bruto "Vlan-Id".
- read_only_udm.metadata.event_type Lógica: se "SIP" ou "hostname" estiver presente e "Proto" estiver presente, defina como "NETWORK_CONNECTION". Caso contrário, se "SIP", "hostname" ou "syslog_host" estiverem presentes, defina como "STATUS_UPDATE". Caso contrário, defina como "GENERIC_EVENT".
- read_only_udm.metadata.log_type Lógica: definida como "CISCO_ACI".
- read_only_udm.metadata.vendor_name Lógica: definido como "Cisco".
- read_only_udm.metadata.product_name Lógica: definida como "ACI".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.