Coletar registros da MFA do HYPR

Compatível com:

Este documento explica como ingerir registros de MFA do HYPR no Google Security Operations usando webhooks ou o Google Cloud Storage V2.

A HYPR MFA é uma solução de autenticação multifator sem senha que oferece autenticação resistente a phishing usando chaves de acesso FIDO2, biometria e login iniciado em dispositivos móveis. O HYPR substitui as senhas tradicionais por criptografia segura de chave pública para eliminar ataques baseados em credenciais e simplificar a autenticação do usuário em estações de trabalho, aplicativos da Web e serviços de nuvem.

Antes de começar

Verifique se você atende os seguintes pré-requisitos:

  • Uma instância do Google SecOps
  • Acesso administrativo ao HYPR Control Center.
  • Entre em contato com o suporte da HYPR para ativar os hooks de eventos personalizados no aplicativo RP que você quer monitorar.

Diferenças no método de coleta

O HYPR MFA oferece suporte a dois métodos para enviar registros ao Google Security Operations:

  • Webhook (recomendado): o HYPR envia eventos em tempo real para o Google Security Operations usando hooks de eventos personalizados. Esse método oferece entrega imediata de eventos e não exige infraestrutura adicional.
  • Google Cloud Storage: os eventos do HYPR são coletados por API e armazenados no GCS, sendo ingeridos pelo Google Security Operations. Esse método oferece processamento em lote e retenção de dados históricos.

Escolha o método mais adequado às suas necessidades:

Recurso Webhook Google Cloud Storage
Latência Em tempo real (segundos) Lote (minutos a horas)
Infraestrutura Nenhum é necessário Projeto do GCP com função do Cloud Run
Dados históricos Limitado ao fluxo de eventos Retenção total no GCS
Complexidade da configuração Simples Moderado
Custo Mínimo Custos de computação e armazenamento do GCP

Opção 1: configurar a integração do webhook

Criar um feed de webhook no Google SecOps

Criar o feed

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. Na próxima página, clique em Configurar um único feed.
  4. No campo Nome do feed, insira um nome para o feed (por exemplo, HYPR MFA Events).
  5. Selecione Webhook como o Tipo de origem.
  6. Selecione HYPR MFA como o Tipo de registro.
  7. Clique em Próxima.
  8. Especifique valores para os seguintes parâmetros de entrada:
    • Delimitador de divisão (opcional): deixe em branco. Cada solicitação de webhook contém um único evento JSON.
    • Namespace do recurso: o namespace do recurso.
    • Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.
  9. Clique em Próxima.
  10. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Gerar e salvar a chave secreta

Depois de criar o feed, gere uma chave secreta para autenticação:

  1. Na página de detalhes do feed, clique em Gerar chave secreta.
  2. Uma caixa de diálogo mostra a chave secreta.
  3. Copie e salve a chave secreta com segurança.

Receber o URL do endpoint do feed

  1. Acesse a guia Detalhes do feed.
  2. Na seção Informações do endpoint, copie o URL do endpoint do feed.
  3. O formato do URL é:

    https://malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    

    ou

    https://<REGION>-malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    
  4. Salve esse URL para as próximas etapas.

  5. Clique em Concluído.

Criar chave de API do Google Cloud

O Chronicle exige uma chave de API para autenticação. Crie uma chave de API restrita no console do Google Cloud.

Criar a chave de API

  1. Acesse a página "Credenciais" do Console do Google Cloud.
  2. Selecione seu projeto (o projeto associado à sua instância do Chronicle).
  3. Clique em Criar credenciais > Chave de API.
  4. Uma chave de API é criada e exibida em uma caixa de diálogo.
  5. Clique em Editar chave de API para restringir a chave.

Restringir a chave de API

  1. Na página de configurações da chave de API:
    • Nome: insira um nome descritivo, por exemplo, Chronicle Webhook API Key.
  2. Em Restrições de API:
    1. Selecione Restringir chave.
    2. No menu suspenso Selecionar APIs, pesquise e selecione API Google SecOps (ou API Chronicle).
  3. Clique em Salvar.
  4. Copie o valor da chave de API do campo Chave de API na parte de cima da página.
  5. Salve a chave de API com segurança.

Configurar o hook de evento personalizado da MFA do HYPR

Criar o URL do webhook com cabeçalhos

O HYPR aceita cabeçalhos personalizados para autenticação. Use o método de autenticação de cabeçalhos para aumentar a segurança.

  • URL do endpoint (sem parâmetros):

    <ENDPOINT_URL>
    
  • Cabeçalhos:

    x-goog-chronicle-auth: <API_KEY>
    x-chronicle-auth: <SECRET_KEY>
    
    • Substitua:
      • <ENDPOINT_URL>: o URL do endpoint do feed da etapa anterior.
      • <API_KEY>: a chave de API do Google Cloud que você criou.
      • <SECRET_KEY>: a chave secreta da criação do feed do Chronicle.

Preparar a configuração JSON do hook de evento personalizado

  • Os hooks de eventos personalizados do HYPR são configurados usando JSON. Prepare a seguinte configuração JSON, substituindo os valores de marcador:

    {
      "name": "Chronicle SIEM Integration",
      "eventType": "ALL",
      "invocationEndpoint": "<ENDPOINT_URL>",
      "httpMethod": "POST",
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
    • Substitua:

      • <ENDPOINT_URL>: o URL do endpoint do feed do Chronicle.
      • <API_KEY>: a chave de API do Google Cloud.
      • <SECRET_KEY>: a chave secreta do Chronicle.
    • Parâmetros de configuração:

    • name: um nome descritivo para o hook de evento (por exemplo, Chronicle SIEM Integration).

    • eventType: defina como ALL para enviar todos os eventos do HYPR ou especifique tags de eventos específicas, como AUTHENTICATION, REGISTRATION ou ACCESS_TOKEN.

    • invocationEndpoint: o URL do endpoint do feed do Chronicle.

    • httpMethod: defina como POST.

    • authType: definido como API_KEY para autenticação com chave de API.

    • apiKeyName: o nome do cabeçalho da chave de API (x-goog-chronicle-auth).

    • apiKeyValue: o valor da chave de API do Google Cloud.

    • headerParameters: cabeçalhos adicionais, incluindo Content-Type: application/json e a chave secreta do Chronicle no cabeçalho x-chronicle-auth.

Criar o hook de evento personalizado no HYPR Control Center

  1. Faça login no HYPR Control Center como administrador.
  2. No menu de navegação à esquerda, clique em Integrations.
  3. Na página Integrações, clique em Adicionar nova integração.
  4. O HYPR Control Center mostra as integrações disponíveis.
  5. Clique no bloco em Hooks de eventos para Eventos personalizados.
  6. Clique em Adicionar novo hook de evento.
  7. Na caixa de diálogo Adicionar novo hook de evento, cole o conteúdo JSON preparado no campo de texto.
  8. Clique em Adicionar hook de evento.
  9. O HYPR Control Center volta para a página Hooks de eventos.

O hook de evento personalizado agora está configurado e vai começar a enviar eventos para o Google SecOps.

Verificar se o webhook está funcionando

Verificar o status do hook de evento da Central de controle do HYPR

  1. Faça login na Central de controle do HYPR.
  2. Acesse Integrações.
  3. Clique na integração Eventos personalizados.
  4. Na tabela Hooks de eventos, verifique se o hook de evento está listado.
  5. Clique no nome do hook de evento para ver os detalhes.
  6. Verifique se a configuração corresponde às suas configurações.

Verificar o status do feed do Chronicle

  1. Acesse Configurações do SIEM > Feeds no Chronicle.
  2. Localize seu feed de webhook.
  3. Verifique a coluna Status (deve ser Ativo).
  4. Verifique a contagem de Eventos recebidos (ela precisa estar aumentando).
  5. Verifique o carimbo de data/hora Última execução bem-sucedida em (deve ser recente).

Verificar registros no Chronicle

  1. Acesse Pesquisar > Pesquisa do UDM.
  2. Use a seguinte consulta:

    metadata.vendor_name = "HYPR" AND metadata.product_name = "MFA"
    
  3. Ajuste o período para a última hora.

  4. Verifique se os eventos aparecem nos resultados.

Referência de métodos de autenticação

Os hooks de eventos personalizados do HYPR são compatíveis com vários métodos de autenticação. O método recomendado para o Chronicle é a autenticação de chave de API com cabeçalhos personalizados.

  • Configuração:

    {
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
  • Vantagens:

    • Chave de API e chave secreta enviadas em cabeçalhos (mais seguro do que parâmetros de URL).
    • Compatível com vários cabeçalhos de autenticação.
    • Cabeçalhos não registrados nos registros de acesso do servidor da Web.

Autenticação básica

  • Configuração:

    {
      "authType": "BASIC",
      "authParams": {
        "basicAuthParameters": {
          "username": "your-username",
          "password": "your-password"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • Caso de uso:quando o sistema de destino exige autenticação HTTP básica.

Credenciais do cliente OAuth 2.0

  • Configuração:

    {
      "authType": "OAUTH_CLIENT_CREDENTIALS",
      "authParams": {
        "oauthParameters": {
          "clientParameters": {
            "clientId": "your-client-id",
            "clientSecret": "your-client-secret"
          },
          "authorizationEndpoint": "https://login.example.com/oauth2/v2.0/token",
          "httpMethod": "POST",
          "oauthHttpParameters": {
            "bodyParameters": [
              {
                "key": "scope",
                "value": "api://your-api/.default",
                "isValueSecret": false
              },
              {
                "key": "grant_type",
                "value": "client_credentials",
                "isValueSecret": false
              }
            ]
          }
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • Caso de uso:quando o sistema de destino exige autenticação OAuth 2.0.

Tipos de eventos e filtragem

Os eventos HYPR são agrupados usando o parâmetro eventTags. É possível configurar o hook de evento personalizado para enviar todos os eventos ou filtrar por tipos específicos.

Tags de evento

  • AUTHENTICATION: eventos de autenticação do usuário (login, desbloqueio).
  • REGISTRATION: eventos de registro de dispositivos (pareamento de dispositivos móveis, chaves de segurança).
  • ACCESS_TOKEN: eventos de geração e uso de token de acesso.
  • AUDIT: eventos de registro de auditoria (ações administrativas, mudanças de configuração).

Configurar a filtragem de eventos

Para enviar apenas tipos de eventos específicos, modifique o parâmetro eventType na configuração JSON:

  • Enviar todos os eventos:

    {
      "eventType": "ALL"
    }
    
  • Enviar apenas eventos de autenticação:

    {
      "eventType": "AUTHENTICATION"
    }
    
  • Enviar apenas eventos de registro:

    {
      "eventType": "REGISTRATION"
    }
    

Opção 2: configurar a integração do Google Cloud Storage

Outros pré-requisitos para a integração do GCS

Além dos pré-requisitos listados na seção "Antes de começar", você precisa do seguinte:

  • Um projeto do GCP com a API Cloud Storage ativada
  • Permissões para criar e gerenciar buckets do GCS
  • Permissões para gerenciar políticas do IAM em buckets do GCS
  • Permissões para criar serviços do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
  • Credenciais da API HYPR (entre em contato com o suporte da HYPR para acessar a API)

Criar um bucket do Google Cloud Storage

  1. Acesse o Console do Google Cloud.
  2. Selecione seu projeto ou crie um novo.
  3. No menu de navegação, acesse Cloud Storage > Buckets.
  4. Clique em Criar bucket.
  5. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, hypr-mfa-logs.
    Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional)
    Local Selecione o local (por exemplo, us-central1).
    Classe de armazenamento Padrão (recomendado para registros acessados com frequência)
    Controle de acesso Uniforme (recomendado)
    Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção
  6. Clique em Criar.

Coletar credenciais da API HYPR

Entre em contato com o suporte da HYPR para receber credenciais de API e acessar os dados de eventos da HYPR. Você precisará dos seguintes itens:

  • URL de base da API: o URL da sua instância do HYPR (por exemplo, https://your-tenant.hypr.com)
  • Token da API: token de autenticação para acesso à API.
  • ID do app da RP: o ID do aplicativo da parte confiável a ser monitorado.

Criar uma conta de serviço para a função do Cloud Run

A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS e ser invocada pelo Pub/Sub.

Criar conta de serviço

  1. No Console do GCP, acesse IAM e administrador > Contas de serviço.
  2. Clique em Criar conta de serviço.
  3. Informe os seguintes detalhes de configuração:
    • Nome da conta de serviço: insira hypr-logs-collector-sa.
    • Descrição da conta de serviço: insira Service account for Cloud Run function to collect HYPR MFA logs.
  4. Clique em Criar e continuar.
  5. Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
    1. Clique em Selecionar papel.
    2. Pesquise e selecione Administrador de objetos do Storage.
    3. Clique em + Adicionar outro papel.
    4. Pesquise e selecione Invocador do Cloud Run.
    5. Clique em + Adicionar outro papel.
    6. Pesquise e selecione Invocador do Cloud Functions.
  6. Clique em Continuar.
  7. Clique em Concluído.

Esses papéis são necessários para:

  • Administrador de objetos do Storage: grava registros em um bucket do GCS e gerencia arquivos de estado.
  • Invocador do Cloud Run: permite que o Pub/Sub invoque a função
  • Invocador do Cloud Functions: permite a invocação de funções

Conceder permissões do IAM no bucket do GCS

Conceda à conta de serviço (hypr-logs-collector-sa) permissões de gravação no bucket do GCS:

  1. Acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket (por exemplo, hypr-mfa-logs).
  3. Acesse a guia Permissões.
  4. Clique em Conceder acesso.
  5. Informe os seguintes detalhes de configuração:
    • Adicionar principais: insira o e-mail da conta de serviço (por exemplo, hypr-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Atribuir papéis: selecione Administrador de objetos do Storage.
  6. Clique em Salvar.

Criar tópico Pub/Sub

Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.

  1. No Console do GCP, acesse Pub/Sub > Tópicos.
  2. Selecione Criar tópico.
  3. Informe os seguintes detalhes de configuração:
    • ID do tópico: insira hypr-logs-trigger.
    • Não altere as outras configurações.
  4. Clique em Criar.

Criar uma função do Cloud Run para coletar registros

A função do Cloud Run será acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar registros da API HYPR e gravá-los no GCS.

  1. No console do GCP, acesse o Cloud Run.
  2. Clique em Criar serviço.
  3. Selecione Função (use um editor in-line para criar uma função).
  4. Na seção Configurar, forneça os seguintes detalhes de configuração:

    Configuração Valor
    Nome do serviço hypr-logs-collector
    Região Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).
    Ambiente de execução Selecione Python 3.12 ou uma versão mais recente.
  5. Na seção Acionador (opcional):

    1. Clique em + Adicionar gatilho.
    2. Selecione Cloud Pub/Sub.
    3. Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico do Pub/Sub (hypr-logs-trigger).
    4. Clique em Salvar.
  6. Na seção Autenticação:

    1. Selecione Exigir autenticação.
    2. Confira o Identity and Access Management (IAM).
  7. Role a tela para baixo e abra Contêineres, rede, segurança.

  8. Acesse a guia Segurança:

    • Conta de serviço: selecione a conta de serviço (hypr-logs-collector-sa).
  9. Acesse a guia Contêineres:

    1. Clique em Variáveis e secrets.
    2. Clique em + Adicionar variável para cada variável de ambiente:
    Nome da variável Valor de exemplo Descrição
    GCS_BUCKET hypr-mfa-logs Nome do bucket do GCS
    GCS_PREFIX hypr-events Prefixo para arquivos de registro
    STATE_KEY hypr-events/state.json Caminho do arquivo de estado
    HYPR_API_URL https://your-tenant.hypr.com URL base da API HYPR
    HYPR_API_TOKEN your-api-token Token de autenticação da API HYPR.
    HYPR_RP_APP_ID your-rp-app-id ID do aplicativo HYPR RP
    MAX_RECORDS 1000 Máximo de registros por execução
    PAGE_SIZE 100 Registros por página
    LOOKBACK_HOURS 24 Período de lookback inicial
  10. Na seção Variáveis e secrets, role a tela para baixo até Solicitações:

    • Tempo limite da solicitação: insira 600 segundos (10 minutos).
  11. Acesse a guia Configurações:

    • Na seção Recursos:
      • Memória: selecione 512 MiB ou mais.
      • CPU: selecione 1.
  12. Na seção Escalonamento de revisão:

    • Número mínimo de instâncias: insira 0.
    • Número máximo de instâncias: insira 100 ou ajuste com base na carga esperada.
  13. Clique em Criar.

  14. Aguarde a criação do serviço (1 a 2 minutos).

  15. Depois que o serviço for criado, o editor de código inline será aberto automaticamente.

Adicionar código da função

  1. Insira main no campo Ponto de entrada.
  2. No editor de código em linha, crie dois arquivos:

    • Primeiro arquivo: main.py::
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'hypr-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'hypr-events/state.json')
    HYPR_API_URL = os.environ.get('HYPR_API_URL')
    HYPR_API_TOKEN = os.environ.get('HYPR_API_TOKEN')
    HYPR_RP_APP_ID = os.environ.get('HYPR_RP_APP_ID')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch HYPR MFA logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, HYPR_API_URL, HYPR_API_TOKEN, HYPR_RP_APP_ID]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=LOOKBACK_HOURS)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds for HYPR API
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_url=HYPR_API_URL,
                api_token=HYPR_API_TOKEN,
                rp_app_id=HYPR_RP_APP_ID,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                page_size=PAGE_SIZE,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_url: str, api_token: str, rp_app_id: str, start_time_ms: int, end_time_ms: int, page_size: int, max_records: int):
        """
        Fetch logs from HYPR API with pagination and rate limiting.
    
        Args:
            api_url: HYPR API base URL
            api_token: HYPR API authentication token
            rp_app_id: HYPR RP application ID
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            page_size: Number of records per page
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up API URL
        base_url = api_url.rstrip('/')
    
        endpoint = f"{base_url}/rp/api/versioned/events"
    
        # Bearer token authentication
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'User-Agent': 'GoogleSecOps-HYPRCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
    
        # Offset-based pagination
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request parameters
            params = []
            params.append(f"rpAppId={rp_app_id}")
            params.append(f"startDate={start_time_ms}")
            params.append(f"endDate={end_time_ms}")
            params.append(f"start={start_index}")
            params.append(f"limit={min(page_size, max_records - len(records))}")
            url = f"{endpoint}?{'&'.join(params)}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                # Extract results
                page_results = data.get('data', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # HYPR uses LOGGEDTIMEINUTC field with Unix milliseconds
                        event_time_ms = event.get('LOGGEDTIMEINUTC')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < page_size:
                    print(f"Reached last page (size={current_size} < limit={page_size})")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records, newest_time
    
    • Segundo arquivo: requirements.txt::
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. Clique em Implantar para salvar e implantar a função.

  4. Aguarde a conclusão da implantação (2 a 3 minutos).

Criar o job do Cloud Scheduler

O Cloud Scheduler vai publicar mensagens no tópico do Pub/Sub (hypr-logs-trigger) em intervalos regulares, acionando a função do Cloud Run.

  1. No Console do GCP, acesse o Cloud Scheduler.
  2. Clique em Criar job.
  3. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nome hypr-logs-collector-hourly
    Região Selecione a mesma região da função do Cloud Run
    Frequência 0 * * * * (a cada hora, na hora)
    Fuso horário Selecione o fuso horário (UTC recomendado)
    Tipo de destino Pub/Sub
    Tópico Selecione o tópico do Pub/Sub (hypr-logs-trigger).
    Corpo da mensagem {} (objeto JSON vazio)
  4. Clique em Criar.

Opções de frequência de programação

Escolha a frequência com base no volume de registros e nos requisitos de latência:

Frequência Expressão Cron Caso de uso
A cada 5 minutos */5 * * * * Alto volume e baixa latência
A cada 15 minutos */15 * * * * Volume médio
A cada hora 0 * * * * Padrão (recomendado)
A cada 6 horas 0 */6 * * * Baixo volume, processamento em lote
Diariamente 0 0 * * * Coleta de dados históricos

Testar a integração

  1. No console do Cloud Scheduler, encontre seu job (hypr-logs-collector-hourly).
  2. Clique em Executar à força para acionar o job manualmente.
  3. Aguarde alguns segundos.
  4. Acesse Cloud Run > Serviços.
  5. Clique no nome da função (hypr-logs-collector).
  6. Clique na guia Registros.
  7. Verifique se a função foi executada com sucesso. Procure:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Acesse Cloud Storage > Buckets.

  9. Clique no nome do bucket (por exemplo, hypr-mfa-logs).

  10. Navegue até a pasta de prefixo (por exemplo, hypr-events/).

  11. Verifique se um novo arquivo .ndjson foi criado com o carimbo de data/hora atual.

Se você encontrar erros nos registros:

  • HTTP 401: verifique as credenciais da API nas variáveis de ambiente
  • HTTP 403: verifique se o token da API HYPR tem as permissões necessárias e se o ID do app RP está correto.
  • HTTP 429: limitação de taxa. A função vai tentar novamente automaticamente com espera.
  • Variáveis de ambiente ausentes: verifique se todas as variáveis necessárias estão definidas.

Recuperar a conta de serviço do Google SecOps

O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.

Configurar um feed no Google SecOps para ingerir registros da MFA do HYPR

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. Clique em Configurar um único feed.
  4. No campo Nome do feed, insira um nome para o feed (por exemplo, HYPR MFA Logs from GCS).
  5. Selecione Google Cloud Storage V2 como o Tipo de origem.
  6. Selecione HYPR MFA como o Tipo de registro.

  7. Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço será exibido, por exemplo:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copie esse endereço de e-mail para usar na próxima etapa.

  9. Clique em Próxima.

  10. Especifique valores para os seguintes parâmetros de entrada:

    • URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:

      gs://hypr-mfa-logs/hypr-events/
      
      • Substitua:
        • hypr-mfa-logs: o nome do bucket do GCS.
        • hypr-events: prefixo/caminho da pasta opcional onde os registros são armazenados (deixe em branco para a raiz).
    • Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:

      • Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
      • Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
      • Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.

    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.

    • Namespace do recurso: o namespace do recurso.

    • Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.

  11. Clique em Próxima.

  12. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Conceder permissões do IAM à conta de serviço do Google SecOps

A conta de serviço do Google SecOps precisa do papel de Leitor de objetos do Storage no seu bucket do GCS.

  1. Acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket (por exemplo, hypr-mfa-logs).
  3. Acesse a guia Permissões.
  4. Clique em Conceder acesso.
  5. Informe os seguintes detalhes de configuração:
    • Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps.
    • Atribuir papéis: selecione Leitor de objetos do Storage.
  6. Clique em Salvar.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
extensions.auth.type Tipo de autenticação (por exemplo, SSO, MFA)
metadata.event_type Tipo de evento (por exemplo, USER_LOGIN, NETWORK_CONNECTION)
EVENTNAME metadata.product_event_type Tipo de evento específico do produto
ID metadata.product_log_id ID de registro específico do produto
USERAGENT network.http.parsed_user_agent User agent HTTP analisado
USERAGENT network.http.user_agent String do user agent HTTP
SESSIONID network.session_id ID da sessão
DEVICEMODEL principal.asset.hardware.model Modelo de hardware do recurso
COMPANION,MACHINEDOMAIN principal.asset.hostname Nome do host do recurso
REMOTEIP principal.asset.ip Endereço IP do recurso
DEVICEID principal.asset_id Identificador exclusivo do recurso
COMPANION,MACHINEDOMAIN principal.hostname Nome do host associado ao principal
REMOTEIP principal.ip Endereço IP associado à principal
DEVICEOS principal.platform Plataforma (por exemplo, WINDOWS, LINUX)
DEVICEOSVERSION principal.platform_version Versão da plataforma
ISSUCCESSFUL security_result.action Ação realizada pelo sistema de segurança (por exemplo, PERMITIR, BLOQUEAR)
MENSAGEM security_result.description Descrição do resultado de segurança
MACHINEUSERNAME target.user.user_display_name Nome de exibição do usuário
FIDOUSER target.user.userid ID do usuário
metadata.product_name Nome do produto
metadata.vendor_name Nome do fornecedor/empresa

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.