Recolha registos da gestão de privilégios de pontos finais (EPM) da BeyondTrust

Compatível com:

Este documento explica como carregar registos do BeyondTrust Endpoint Privilege Management (EPM) para o Google Security Operations através de duas abordagens diferentes: recolha baseada no EC2 e recolha baseada no AWS Lambda através do Amazon S3. O analisador concentra-se na transformação de dados de registo JSON não processados do BeyondTrust Endpoint num formato estruturado em conformidade com o UDM do Chronicle. Primeiro, inicializa os valores predefinidos para vários campos e, em seguida, analisa a carga útil JSON, mapeando posteriormente campos específicos do registo não processado para os campos UDM correspondentes no objeto event.idm.read_only_udm.

Antes de começar

Certifique-se de que tem os seguintes pré-requisitos:

  • Instância do Google SecOps
  • Acesso privilegiado ao inquilino ou à API do BeyondTrust Endpoint Privilege Management
  • Acesso privilegiado à AWS (S3, IAM, Lambda/EC2, EventBridge)

Escolha o método de integração

Pode escolher entre dois métodos de integração:

  • Opção 1: recolha baseada no EC2: usa uma instância do EC2 com scripts agendados para a recolha de registos
  • Opção 2: recolha baseada no AWS Lambda: usa funções Lambda sem servidor com agendamento do EventBridge

Opção 1: recolha baseada no EC2

Configure o AWS IAM para a carregamento do Google SecOps

  1. Crie um utilizador seguindo este guia do utilizador: criar um utilizador do IAM.
  2. Selecione o utilizador criado.
  3. Selecione o separador Credenciais de segurança.
  4. Clique em Criar chave de acesso na secção Chaves de acesso.
  5. Selecione Serviço de terceiros como Exemplo de utilização.
  6. Clicar em Seguinte.
  7. Opcional: adicione uma etiqueta de descrição.
  8. Clique em Criar chave de acesso.
  9. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para referência futura.
  10. Clique em Concluído.
  11. Selecione o separador Autorizações.
  12. Clique em Adicionar autorizações na secção Políticas de autorizações.
  13. Selecione Adicionar autorizações.
  14. Selecione Anexar políticas diretamente.
  15. Pesquise e selecione a política AmazonS3FullAccess.
  16. Clicar em Seguinte.
  17. Clique em Adicionar autorizações.

Configure o BeyondTrust EPM para acesso à API

  1. Inicie sessão na consola Web do BeyondTrust Privilege Management como administrador.
  2. Aceda a Configuração > Definições > Definições da API.
  3. Clique em Criar uma conta da API.
  4. Indique os seguintes detalhes de configuração:
    • Nome: introduza Google SecOps Collector.
    • Acesso à API: ative a opção Auditoria (leitura) e outros âmbitos, conforme necessário.
  5. Copie e guarde o ID de cliente e o segredo do cliente.
  6. Copie o URL base da API. Normalmente, é https://<your-tenant>-services.pm.beyondtrustcloud.com (vai usá-lo como BPT_API_URL).

Crie um contentor do AWS S3

  1. Inicie sessão na AWS Management Console.
  2. Aceda a AWS Console > Services > S3 > Create bucket.
  3. Indique os seguintes detalhes de configuração:
    • Nome do segmento: my-beyondtrust-logs.
    • Região: [a sua escolha] > Criar.

Crie uma função do IAM para o EC2

  1. Inicie sessão na AWS Management Console.
  2. Aceda a AWS Console > Services > IAM > Roles > Create role.
  3. Indique os seguintes detalhes de configuração:
    • Entidade fidedigna: serviço AWS > EC2 > Seguinte.
    • Anexe a autorização: AmazonS3FullAccess (ou uma política com âmbito para o seu contentor) > Seguinte.
    • Nome da função: EC2-S3-BPT-Writer > Criar função.

Inicie e configure a VM do coletor do EC2

  1. Inicie sessão na AWS Management Console.
  2. Aceda a Serviços.
  3. Na barra de pesquisa, escreva EC2 e selecione-o.
  4. No painel de controlo do EC2, clique em Instances.
  5. Clique em Iniciar instâncias.
  6. Indique os seguintes detalhes de configuração:
    • Nome: introduza BPT-Log-Collector.
    • AMI: selecione Ubuntu Server 22.04 LTS.
    • Tipo de instância: t3.micro (ou superior) e, de seguida, clique em Seguinte.
    • Rede: certifique-se de que a definição Rede está definida para a sua VPC predefinida.
    • Função do IAM: selecione a função do IAM EC2-S3-BPT-Writer no menu.
    • Atribuir automaticamente IP público: ative (ou certifique-se de que consegue aceder através da VPN) > Seguinte.
    • Adicionar armazenamento: deixe a configuração de armazenamento predefinida (8 GiB) e, de seguida, clique em Seguinte.
    • Selecione Criar um novo grupo de segurança.
    • Regra de entrada: clique em Adicionar regra.
    • Tipo: selecione SSH.
    • Porta: 22.
    • Origem: o seu IP
    • Clique em Rever e iniciar.
    • Selecione ou crie um par de chaves.
    • Clique em Transferir par de chaves.
    • Guarde o ficheiro PEM transferido. Precisa deste ficheiro para estabelecer ligação à sua instância através de SSH.
  7. Estabeleça ligação à sua máquina virtual (VM) através do SSH.

Instale os pré-requisitos do coletor

  1. Execute o seguinte comando:

    chmod 400 ~/Downloads/your-key.pem
    ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
    
  2. Atualize o sistema e instale as dependências:

    # Update OS
    sudo apt update && sudo apt upgrade -y
    # Install Python, Git
    sudo apt install -y python3 python3-venv python3-pip git
    # Create & activate virtualenv
    python3 -m venv ~/bpt-venv
    source ~/bpt-venv/bin/activate
    # Install libraries
    pip install requests boto3
    
  3. Crie um diretório e um ficheiro de estado:

    sudo mkdir -p /var/lib/bpt-collector
    sudo touch /var/lib/bpt-collector/last_run.txt
    sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
    
  4. Inicialize-o (por exemplo, para há 1 hora):

    echo "$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)" > /var/lib/bpt-collector/last_run.txt
    

Implemente o script do coletor do BeyondTrust EPM

  1. Crie uma pasta de projeto:

    mkdir ~/bpt-collector && cd ~/bpt-collector
    
  2. Exporte as variáveis de ambiente necessárias (por exemplo, em ~/.bashrc):

    export BPT_API_URL="https://<your-tenant>-services.pm.beyondtrustcloud.com"
    export BPT_CLIENT_ID="your-client-id"
    export BPT_CLIENT_SECRET="your-client-secret"
    export S3_BUCKET="my-beyondtrust-logs"
    export S3_PREFIX="bpt/"
    export STATE_FILE="/var/lib/bpt-collector/last_run.txt"
    export RECORD_SIZE="1000"
    
  3. Crie collector_bpt.py e introduza o seguinte código:

    #!/usr/bin/env python3
    import os, sys, json, boto3, requests
    from datetime import datetime, timezone, timedelta
    
    # ── UTILS ──────────────────────────────────────────────────────────────
    def must_env(var):
        val = os.getenv(var)
        if not val:
            print(f"ERROR: environment variable {var} is required", file=sys.stderr)
            sys.exit(1)
        return val
    
    def ensure_state_file(path):
        d = os.path.dirname(path)
        if not os.path.isdir(d):
            os.makedirs(d, exist_ok=True)
        if not os.path.isfile(path):
            ts = (datetime.now(timezone.utc) - timedelta(hours=1))
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            with open(path, "w") as f:
                f.write(ts)
    
    # ── CONFIG ─────────────────────────────────────────────────────────────
    BPT_API_URL = must_env("BPT_API_URL")  # e.g., https://tenant-services.pm.beyondtrustcloud.com
    CLIENT_ID = must_env("BPT_CLIENT_ID")
    CLIENT_SECRET = must_env("BPT_CLIENT_SECRET")
    S3_BUCKET = must_env("S3_BUCKET")
    S3_PREFIX = os.getenv("S3_PREFIX", "")  # e.g., "bpt/"
    STATE_FILE = os.getenv("STATE_FILE", "/var/lib/bpt-collector/last_run.txt")
    RECORD_SIZE = int(os.getenv("RECORD_SIZE", "1000"))
    
    # ── END CONFIG ─────────────────────────────────────────────────────────
    ensure_state_file(STATE_FILE)
    
    def read_last_run():
        with open(STATE_FILE, "r") as f:
            ts = f.read().strip()
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    
    def write_last_run(dt):
        with open(STATE_FILE, "w") as f:
            f.write(dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
    
    def get_oauth_token():
        """
        Get OAuth2 token using client credentials flow
        Scope: urn:management:api (for EPM Management API access)
        """
        resp = requests.post(
            f"{BPT_API_URL}/oauth/connect/token",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            data={
                "grant_type": "client_credentials",
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "scope": "urn:management:api"
            }
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def extract_event_timestamp(evt):
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(evt, dict) and isinstance(evt.get("event"), dict):
            ts = evt["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ["timestamp", "eventTime", "dateTime", "whenOccurred", "date", "time"]
        for field in timestamp_fields:
            if field in evt and evt[field]:
                return evt[field]
    
        return None
    
    def parse_timestamp(ts):
        """
        Parse timestamp handling various formats
        """
        from datetime import datetime, timezone
    
        if isinstance(ts, (int, float)):
            # Handle milliseconds vs seconds
            return datetime.fromtimestamp(ts/1000 if ts > 1e12 else ts, tz=timezone.utc)
    
        if isinstance(ts, str):
            if ts.endswith("Z"):
                return datetime.fromisoformat(ts.replace("Z", "+00:00"))
            dt = datetime.fromisoformat(ts)
            return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
    
        raise ValueError(f"Unsupported timestamp: {ts!r}")
    
    def fetch_events(token, start_date_iso):
        """
        Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate
        This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset
        """
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        all_events, current_start = [], start_date_iso
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(RECORD_SIZE, 1000)
    
        for _ in range(10):  # MAX 10 iterations to prevent infinite loops
            # Use the correct endpoint and parameters
            params = {
                "StartDate": current_start_date,
                "RecordSize": RECORD_SIZE
            }
    
            resp = requests.get(
                f"{BPT_API_URL}/management-api/v2/Events/FromStartDate",
                headers=headers, 
                params={
                    "StartDate": current_start_date,
                    "RecordSize": min(RECORD_SIZE, 1000)
                },
                timeout=300
            )
            resp.raise_for_status()
    
            data = resp.json()
            events = data.get("events", [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RECORD_SIZE, we're done
            if len(events) < RECORD_SIZE:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to ISO format if needed and increment slightly to avoid duplicates
            try:
                dt = parse_timestamp(last_timestamp)
                # Add 1 second to avoid retrieving the same event again
                dt = dt + timedelta(seconds=1)
                current_start = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def upload_to_s3(obj, key):
        boto3.client("s3").put_object(
            Bucket=S3_BUCKET, 
            Key=key,
            Body=json.dumps(obj).encode("utf-8"),
            ContentType="application/json"
        )
    
    def main():
        # 1) determine window
        start_dt = read_last_run()
        end_dt = datetime.now(timezone.utc)
        START = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        END = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        print(f"Fetching events from {START} to {END}")
    
        # 2) authenticate and fetch
        try:
            token = get_oauth_token()
            events = fetch_events(token, START)
    
            # Filter events to only include those before our end time
            filtered_events = []
            for evt in events:
                evt_time = extract_event_timestamp(evt)
                if evt_time:
                    try:
                        evt_dt = parse_timestamp(evt_time)
                        if evt_dt <= end_dt:
                            filtered_events.append(evt)
                    except Exception as e:
                        print(f"Error parsing event timestamp {evt_time}: {e}")
                        # Include event anyway if timestamp parsing fails
                        filtered_events.append(evt)
                else:
                    # Include events without timestamps
                    filtered_events.append(evt)
    
            count = len(filtered_events)
    
            if count > 0:
                # Upload events to S3
                timestamp_str = end_dt.strftime('%Y%m%d_%H%M%S')
                for idx, evt in enumerate(filtered_events, start=1):
                    key = f"{S3_PREFIX}{end_dt.strftime('%Y/%m/%d')}/evt_{timestamp_str}_{idx:06d}.json"
                    upload_to_s3(evt, key)
    
                print(f"Uploaded {count} events to S3")
            else:
                print("No events to upload")
    
            # 3) persist state
            write_last_run(end_dt)
    
        except Exception as e:
            print(f"Error: {e}")
            sys.exit(1)
    
    if __name__ == "__main__":
        main()
    
  4. Torne-o executável:

    chmod +x collector_bpt.py
    

Agende diariamente com o Cron

  1. Execute o seguinte comando:

    crontab -e
    
  2. Adicione a tarefa diária à meia-noite UTC:

    0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
    

Opção 2: recolha baseada no AWS Lambda

Recolha os pré-requisitos do BeyondTrust EPM

  1. Inicie sessão na consola Web do BeyondTrust Privilege Management como administrador.
  2. Aceda a Configuração do sistema > API REST > Tokens.
  3. Clique em Adicionar token.
  4. Indique os seguintes detalhes de configuração:
    • Nome: introduza Google SecOps Collector.
    • Âmbitos: selecione Audit:Read e outros âmbitos, conforme necessário.
  5. Clique em Guardar e copie o valor do token.
  6. Copie e guarde numa localização segura os seguintes detalhes:
    • URL base da API: o URL da API BeyondTrust EPM (por exemplo, https://yourtenant-services.pm.beyondtrustcloud.com).
    • ID de cliente: a partir da configuração da aplicação OAuth.
    • Segredo do cliente: a partir da configuração da aplicação OAuth.

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor.
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, beyondtrust-epm-logs-bucket).
  3. Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clicar em Seguinte.
  19. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3

  1. Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
  2. Copie e cole a seguinte política:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutObjects",
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Sid": "AllowGetStateObject",
        "Effect": "Allow",
        "Action": "s3:GetObject",
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json"
        }
    ]
    }
    
    • Substitua beyondtrust-epm-logs-bucket se tiver introduzido um nome de contentor diferente.
  3. Clique em Seguinte > Criar política.

  4. Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.

  5. Anexe a política recém-criada e a política gerida AWSLambdaBasicExecutionRole (para registo do CloudWatch).

  6. Dê o nome BeyondTrustEPMLogExportRole à função e clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Faculte os seguintes detalhes de configuração:
Definição Valor
Nome BeyondTrustEPMLogExport
Runtime Python 3.13
Arquitetura x86_64
Função de execução BeyondTrustEPMLogExportRole
  1. Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (BeyondTrustEPMLogExport.py):

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta, timezone
    import os
    from typing import Dict, List, Optional
    
    # Initialize urllib3 pool manager
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch BeyondTrust EPM audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # BeyondTrust EPM API credentials
        BPT_API_URL = os.environ['BPT_API_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_SCOPE = os.environ.get('OAUTH_SCOPE', 'urn:management:api')
    
        # Optional parameters
        RECORD_SIZE = int(os.environ.get('RECORD_SIZE', '1000'))
        MAX_ITERATIONS = int(os.environ.get('MAX_ITERATIONS', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(BPT_API_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPE)
    
            # Fetch audit events
            events = fetch_audit_events(BPT_API_URL, access_token, last_timestamp, RECORD_SIZE, MAX_ITERATIONS)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing BeyondTrust EPM logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(api_url: str, client_id: str, client_secret: str, scope: str = "urn:management:api") -> str:
        """
        Get OAuth access token using client credentials flow for BeyondTrust EPM
        Uses the correct scope: urn:management:api and /oauth/connect/token endpoint
        """
    
        token_url = f"{api_url}/oauth/connect/token"
    
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}"
    
        response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0))
    
        if response.status != 200:
            raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(api_url: str, access_token: str, last_timestamp: Optional[str], record_size: int, max_iterations: int) -> List[Dict]:
        """
        Fetch audit events using the correct BeyondTrust EPM API endpoint:
        /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters
        """
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
    
        all_events = []
        current_start_date = last_timestamp or (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
        iterations = 0
    
        # Enforce maximum RecordSize limit of 1000
        record_size_limited = min(record_size, 1000)
    
        while iterations < max_iterations:
            # Use the correct EPM API endpoint and parameters
            query_url = f"{api_url}/management-api/v2/Events/FromStartDate"
            params = {
                'StartDate': current_start_date,
                'RecordSize': record_size_limited
            }
    
            response = http.request('GET', query_url, headers=headers, fields=params, timeout=urllib3.Timeout(300.0))
    
            if response.status != 200:
                raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
            response_data = json.loads(response.data.decode('utf-8'))
            events = response_data.get('events', [])
    
            if not events:
                break
    
            all_events.extend(events)
            iterations += 1
    
            # If we got fewer events than RecordSize, we've reached the end
            if len(events) < record_size_limited:
                break
    
            # For pagination, update StartDate to the timestamp of the last event
            last_event = events[-1]
            last_timestamp = extract_event_timestamp(last_event)
    
            if not last_timestamp:
                print("Warning: Could not find timestamp in last event for pagination")
                break
    
            # Convert to datetime and add 1 second to avoid retrieving the same event again
            try:
                dt = parse_timestamp(last_timestamp)
                dt = dt + timedelta(seconds=1)
                current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
            except Exception as e:
                print(f"Error parsing timestamp {last_timestamp}: {e}")
                break
    
        return all_events
    
    def extract_event_timestamp(event: Dict) -> Optional[str]:
        """
        Extract timestamp from event, prioritizing event.ingested field
        """
        # Primary (documented) path: event.ingested
        if isinstance(event, dict) and isinstance(event.get("event"), dict):
            ts = event["event"].get("ingested")
            if ts:
                return ts
    
        # Fallbacks for other timestamp fields
        timestamp_fields = ['timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time']
        for field in timestamp_fields:
            if field in event and event[field]:
                return event[field]
    
        return None
    
    def parse_timestamp(timestamp_str: str) -> datetime:
        """
        Parse timestamp string to datetime object, handling various formats
        """
        if isinstance(timestamp_str, (int, float)):
            # Unix timestamp (in milliseconds or seconds)
            if timestamp_str > 1e12:  # Milliseconds
                return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc)
            else:  # Seconds
                return datetime.fromtimestamp(timestamp_str, tz=timezone.utc)
    
        if isinstance(timestamp_str, str):
            # Try different string formats
            try:
                # ISO format with Z
                if timestamp_str.endswith('Z'):
                    return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                # ISO format with timezone
                elif '+' in timestamp_str or timestamp_str.endswith('00:00'):
                    return datetime.fromisoformat(timestamp_str)
                # ISO format without timezone (assume UTC)
                else:
                    dt = datetime.fromisoformat(timestamp_str)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt
            except ValueError:
                pass
    
        raise ValueError(f"Could not parse timestamp: {timestamp_str}")
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            timestamp = extract_event_timestamp(event)
            if timestamp:
                try:
                    event_dt = parse_timestamp(timestamp)
                    event_iso = event_dt.isoformat() + 'Z'
                    if latest is None or event_iso > latest:
                        latest = event_iso
                except Exception as e:
                    print(f"Error parsing event timestamp {timestamp}: {e}")
                    continue
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  2. Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  3. Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores.

    Chave Valor de exemplo
    S3_BUCKET beyondtrust-epm-logs-bucket
    S3_PREFIX beyondtrust-epm-logs/
    STATE_KEY beyondtrust-epm-logs/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE urn:management:api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  4. Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).

  5. Selecione o separador Configuração.

  6. No painel Configuração geral, clique em Editar.

  7. Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.

Crie um horário do EventBridge

  1. Aceda a Amazon EventBridge > Scheduler > Create schedule.
  2. Indique os seguintes detalhes de configuração:
    • Agenda recorrente: Taxa (1 hour).
    • Alvo: a sua função Lambda BeyondTrustEPMLogExport.
    • Nome: BeyondTrustEPMLogExport-1h.
  3. Clique em Criar programação.

Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps

  1. Aceda a AWS Console > IAM > Users > Add users.
  2. Clique em Adicionar utilizadores.
  3. Indique os seguintes detalhes de configuração:
    • Utilizador: introduza secops-reader.
    • Tipo de acesso: selecione Chave de acesso – Acesso programático.
  4. Clique em Criar utilizador.
  5. Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. No editor JSON, introduza a seguinte política:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": ["s3:GetObject"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket/*"
        },
        {
        "Effect": "Allow",
        "Action": ["s3:ListBucket"],
        "Resource": "arn:aws:s3:::beyondtrust-epm-logs-bucket"
        }
    ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.

  9. Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Transfira o CSV (estes valores são introduzidos no feed).

Configure feeds (ambas as opções)

Para configurar um feed, siga estes passos:

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, BeyondTrust EPM logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione BeyondTrust Endpoint Privilege Management como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: o URI do contentor
      • s3://your-log-bucket-name/. Substitua your-log-bucket-name pelo nome real do contentor.
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Tabela de mapeamento da UDM

Campo de registo Mapeamento de UDM Lógica
agent.id principal.asset.attribute.labels.value Mapeado com a etiqueta com a chave agent_id
agent.version principal.asset.attribute.labels.value Mapeado com a etiqueta com a chave agent_version
ecs.version principal.asset.attribute.labels.value Mapeado com a etiqueta com a chave ecs_version
event_data.reason metadata.description Descrição do evento a partir do registo não processado
event_datas.ActionId metadata.product_log_id Identificador de registo específico do produto
file.path principal.file.full_path Caminho completo do ficheiro a partir do evento
headers.content_length additional.fields.value.string_value Mapeado com a etiqueta com a chave content_length
headers.content_type additional.fields.value.string_value Mapeado com a etiqueta com a chave content_type
headers.http_host additional.fields.value.string_value Mapeado com a etiqueta com a chave http_host
headers.http_version network.application_protocol_version Versão do protocolo HTTP
headers.request_method network.http.method Método de pedido HTTP
host.hostname principal.hostname Nome do anfitrião principal
host.hostname principal.asset.hostname Nome do anfitrião do recurso principal
host.ip principal.asset.ip Endereço IP do recurso principal
host.ip principal.ip Endereço IP principal
host.mac principal.mac Endereço MAC principal
host.os.platform principal.platform Definido como MAC se for igual ao macOS
host.os.version principal.platform_version Versão do sistema operativo
labels.related_item_id metadata.product_log_id Identificador de artigo relacionado
process.command_line principal.process.command_line Linha de comandos do processo
process.name additional.fields.value.string_value Mapeado com a etiqueta com a chave process_name
process.parent.name additional.fields.value.string_value Mapeado com a etiqueta com a chave process_parent_name
process.parent.pid principal.process.parent_process.pid PID do processo principal convertido em string
process.pid principal.process.pid PID do processo convertido em string
user.id principal.user.userid Identificador do utilizador
user.name principal.user.user_display_name Nome a apresentar do utilizador
N/A metadata.event_timestamp Data/hora do evento definida como data/hora da entrada do registo
N/A metadata.event_type GENERIC_EVENT se não existir um principal, caso contrário, STATUS_UPDATE
N/A network.application_protocol Definido como HTTP se o campo http_version contiver HTTP

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.