Recolha registos do SailPoint IAM

Suportado em:

Este documento explica como carregar registos do SailPoint IAM para o Google Security Operations através do Amazon S3.

Antes de começar

Certifique-se de que cumpre os seguintes pré-requisitos:

  • Uma instância do Google SecOps
  • Acesso privilegiado ao inquilino ou à API SailPoint Identity Security Cloud
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Recolha os pré-requisitos do IAM do SailPoint (IDs, chaves da API, IDs da organização e tokens)

  1. Inicie sessão na consola de administração do SailPoint Identity Security Cloud como administrador.
  2. Aceda a Global > Definições de segurança > Gestão de APIs.
  3. Clique em Criar cliente da API.
  4. Escolha Credenciais do cliente como o tipo de autorização.
  5. Forneça os seguintes detalhes de configuração:
    • Nome: introduza um nome descritivo (por exemplo, Chronicle Export API).
    • Descrição: introduza uma descrição para o cliente API.
    • Âmbitos: selecione sp:scopes:all (ou âmbitos de leitura adequados para eventos de auditoria).
  6. Clique em Criar e copie as credenciais da API geradas de forma segura.
  7. Registe o URL base do inquilino do SailPoint (por exemplo, https://tenant.api.identitynow.com).
  8. Copie e guarde numa localização segura os seguintes detalhes:
    • IDN_CLIENT_ID
    • IDN_CLIENT_SECRET
    • IDN_BASE

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
  2. Guarde o Nome e a Região do contentor para referência futura (por exemplo, sailpoint-iam-logs).
  3. Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clicar em Seguinte.
  19. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3

  1. Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
  2. Copie e cole a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/sailpoint/iam/state.json"
        }
      ]
    }
    
    • Substitua sailpoint-iam-logs se tiver introduzido um nome de contentor diferente:
  3. Clique em Seguinte > Criar política.

  4. Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.

  5. Anexe a política criada recentemente.

  6. Dê o nome SailPointIamToS3Role à função e clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Faculte os seguintes detalhes de configuração:

    Definição Valor
    Nome sailpoint_iam_to_s3
    Runtime Python 3.13
    Arquitetura x86_64
    Função de execução SailPointIamToS3Role
  4. Depois de criar a função, abra o separador Código, elimine o fragmento de código e introduza o seguinte código (sailpoint_iam_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull SailPoint Identity Security Cloud audit events and store raw JSONL payloads to S3
    # - Uses /v3/search API with pagination for audit events.
    # - Preserves vendor-native JSON format for identity events.
    # - Retries with exponential backoff; unique S3 keys to avoid overwrites.
    # - Outputs JSONL format (one event per line) for optimal Chronicle ingestion.
    
    import os, json, time, uuid, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import URLError, HTTPError
    
    import boto3
    
    S3_BUCKET   = os.environ["S3_BUCKET"]
    S3_PREFIX   = os.environ.get("S3_PREFIX", "sailpoint/iam/")
    STATE_KEY   = os.environ.get("STATE_KEY", "sailpoint/iam/state.json")
    WINDOW_SEC  = int(os.environ.get("WINDOW_SECONDS", "3600"))  # default 1h
    HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60"))
    IDN_BASE    = os.environ["IDN_BASE"]  # e.g. https://tenant.api.identitynow.com
    CLIENT_ID   = os.environ["IDN_CLIENT_ID"]
    CLIENT_SECRET = os.environ["IDN_CLIENT_SECRET"]
    SCOPE       = os.environ.get("IDN_SCOPE", "sp:scopes:all")
    PAGE_SIZE   = int(os.environ.get("PAGE_SIZE", "250"))
    MAX_PAGES   = int(os.environ.get("MAX_PAGES", "20"))
    MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
    USER_AGENT  = os.environ.get("USER_AGENT", "sailpoint-iam-to-s3/1.0")
    
    s3 = boto3.client("s3")
    
    def _load_state():
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {}
    
    def _save_state(st):
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(st, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _get_oauth_token() -> str:
        """Get OAuth2 access token using Client Credentials flow"""
        token_url = f"{IDN_BASE.rstrip('/')}/oauth/token"
    
        data = urllib.parse.urlencode({
            'grant_type': 'client_credentials',
            'client_id': CLIENT_ID,
            'client_secret': CLIENT_SECRET,
            'scope': SCOPE
        }).encode('utf-8')
    
        req = Request(token_url, data=data, method="POST")
        req.add_header("Content-Type", "application/x-www-form-urlencoded")
        req.add_header("User-Agent", USER_AGENT)
    
        with urlopen(req, timeout=HTTP_TIMEOUT) as r:
            response = json.loads(r.read())
            return response["access_token"]
    
    def _search_events(access_token: str, created_from: str, search_after: list = None) -> list:
        """Search for audit events using SailPoint's /v3/search API
    
        IMPORTANT: SailPoint requires colons in ISO8601 timestamps to be escaped with backslashes.
        Example: 2024-01-15T10:30:00Z must be sent as 2024-01-15T10\:30\:00Z
        Reference: https://developer.sailpoint.com/discuss/t/datetime-searches/6609
        """
        search_url = f"{IDN_BASE.rstrip('/')}/v3/search"
    
        # Escape colons in timestamp for SailPoint search query
        # SailPoint requires: created:>=2024-01-15T10\:30\:00Z (colons must be escaped)
        escaped_timestamp = created_from.replace(":", "\\:")
        query_str = f'created:>={escaped_timestamp}'
    
        payload = {
            "indices": ["events"],
            "query": {"query": query_str},
            "sort": ["created", "+id"],
            "limit": PAGE_SIZE
        }
    
        if search_after:
            payload["searchAfter"] = search_after
    
        attempt = 0
        while True:
            req = Request(search_url, data=json.dumps(payload).encode('utf-8'), method="POST")
            req.add_header("Content-Type", "application/json")
            req.add_header("Accept", "application/json")
            req.add_header("Authorization", f"Bearer {access_token}")
            req.add_header("User-Agent", USER_AGENT)
    
            try:
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    response = json.loads(r.read())
                    # Handle different response formats
                    if isinstance(response, list):
                        return response
                    return response.get("results", response.get("data", []))
            except (HTTPError, URLError) as e:
                attempt += 1
                print(f"HTTP error on attempt {attempt}: {e}")
                if attempt > MAX_RETRIES:
                    raise
                # exponential backoff with jitter
                time.sleep(min(60, 2 ** attempt) + (time.time() % 1))
    
    def _put_events_data(events: list, from_ts: float, to_ts: float, page_num: int) -> str:
        """Write events to S3 in JSONL format (one JSON object per line)
    
        JSONL format is preferred for Chronicle ingestion as it allows:
        - Line-by-line processing
        - Better error recovery
        - Lower memory footprint
        """
        # Create unique S3 key for events data
        ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts))
        uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}"
        key = f"{S3_PREFIX}{ts_path}/sailpoint_iam_{int(from_ts)}_{int(to_ts)}_p{page_num:03d}_{uniq}.jsonl"
    
        # Convert events list to JSONL format (one JSON object per line)
        jsonl_lines = [json.dumps(event, separators=(",", ":")) for event in events]
        jsonl_content = "\n".join(jsonl_lines)
    
        s3.put_object(
            Bucket=S3_BUCKET, 
            Key=key, 
            Body=jsonl_content.encode("utf-8"), 
            ContentType="application/x-ndjson",  # JSONL MIME type
            Metadata={
                'source': 'sailpoint-iam',
                'from_timestamp': str(int(from_ts)),
                'to_timestamp': str(int(to_ts)),
                'page_number': str(page_num),
                'events_count': str(len(events)),
                'format': 'jsonl'
            }
        )
        return key
    
    def _get_item_id(item: dict) -> str:
        """Extract ID from event item, trying multiple possible fields"""
        for field in ("id", "uuid", "eventId", "_id"):
            if field in item and item[field]:
                return str(item[field])
        return ""
    
    def lambda_handler(event=None, context=None):
        st = _load_state()
        now = time.time()
        from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC))
        to_ts = now
    
        # Get OAuth token
        access_token = _get_oauth_token()
    
        created_from = _iso(from_ts)
        print(f"Fetching SailPoint IAM events from: {created_from}")
    
        # Handle pagination state
        last_created = st.get("last_created")
        last_id = st.get("last_id")
        search_after = [last_created, last_id] if (last_created and last_id) else None
    
        pages = 0
        total_events = 0
        written_keys = []
        newest_created = last_created or created_from
        newest_id = last_id or ""
    
        while pages < MAX_PAGES:
            events = _search_events(access_token, created_from, search_after)
    
            if not events:
                break
    
            # Write page to S3 in JSONL format
            key = _put_events_data(events, from_ts, to_ts, pages + 1)
            written_keys.append(key)
            total_events += len(events)
    
            # Update pagination state from last item
            last_event = events[-1]
            last_event_created = last_event.get("created") or last_event.get("metadata", {}).get("created")
            last_event_id = _get_item_id(last_event)
    
            if last_event_created:
                newest_created = last_event_created
            if last_event_id:
                newest_id = last_event_id
    
            search_after = [newest_created, newest_id]
            pages += 1
    
            # If we got less than page size, we're done
            if len(events) < PAGE_SIZE:
                break
    
        print(f"Successfully retrieved {total_events} events across {pages} pages")
    
        # Save state for next run
        st["last_to_ts"] = to_ts
        st["last_created"] = newest_created
        st["last_id"] = newest_id
        st["last_successful_run"] = now
        _save_state(st)
    
        return {
            "statusCode": 200,
            "body": {
                "success": True,
                "pages": pages,
                "total_events": total_events,
                "s3_keys": written_keys,
                "from_timestamp": from_ts,
                "to_timestamp": to_ts,
                "last_created": newest_created,
                "last_id": newest_id,
                "format": "jsonl"
            }
        }
    
    if __name__ == "__main__":
        print(lambda_handler())
    

  5. Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  6. Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores.

    Variáveis de ambiente

    Chave Valor de exemplo
    S3_BUCKET sailpoint-iam-logs
    S3_PREFIX sailpoint/iam/
    STATE_KEY sailpoint/iam/state.json
    WINDOW_SECONDS 3600
    HTTP_TIMEOUT 60
    MAX_RETRIES 3
    USER_AGENT sailpoint-iam-to-s3/1.0
    IDN_BASE https://tenant.api.identitynow.com
    IDN_CLIENT_ID your-client-id (do passo 2)
    IDN_CLIENT_SECRET your-client-secret (do passo 2)
    IDN_SCOPE sp:scopes:all
    PAGE_SIZE 250
    MAX_PAGES 20
  7. Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).

  8. Selecione o separador Configuração.

  9. No painel Configuração geral, clique em Editar.

  10. Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.

Crie um horário do EventBridge

  1. Aceda a Amazon EventBridge > Scheduler > Create schedule.
  2. Forneça os seguintes detalhes de configuração:
    • Agenda recorrente: Taxa (1 hour).
    • Destino: a sua função Lambda sailpoint_iam_to_s3.
    • Nome: sailpoint-iam-1h.
  3. Clique em Criar horário.

Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps

  1. Aceda a AWS Console > IAM > Users > Add users.
  2. Clique em Adicionar utilizadores.
  3. Forneça os seguintes detalhes de configuração:
    • Utilizador: introduza secops-reader.
    • Tipo de acesso: selecione Chave de acesso – Acesso programático.
  4. Clique em Criar utilizador.
  5. Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. No editor JSON, introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::sailpoint-iam-logs"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.

  9. Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Transfira o CSV (estes valores são introduzidos no feed).

Configure um feed no Google SecOps para carregar registos do IAM do SailPoint

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. Na página seguinte, clique em Configurar um único feed.
  4. No campo Nome do feed, introduza um nome para o feed (por exemplo, SailPoint IAM logs).
  5. Selecione Amazon S3 V2 como o Tipo de origem.
  6. Selecione SailPoint IAM como o Tipo de registo.
  7. Clicar em Seguinte.
  8. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://sailpoint-iam-logs/sailpoint/iam/
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: predefinição de 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  9. Clicar em Seguinte.
  10. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Tabela de mapeamento da UDM

Campo de registo Mapeamento da UDM Lógica
action metadata.description O valor do campo action do registo não processado.
actor.name principal.user.user_display_name O valor do campo actor.name do registo não processado.
attributes.accountName principal.user.group_identifiers O valor do campo attributes.accountName do registo não processado.
attributes.appId target.asset_id "App ID: " concatenado com o valor do campo attributes.appId do registo não processado.
attributes.attributeName additional.fields[0].value.string_value O valor do campo attributes.attributeName do registo não processado, colocado num objeto additional.fields. A chave está definida como "Nome do atributo".
attributes.attributeValue additional.fields[1].value.string_value O valor do campo attributes.attributeValue do registo não processado, colocado num objeto additional.fields. A chave está definida como "Valor do atributo".
attributes.cloudAppName target.application O valor do campo attributes.cloudAppName do registo não processado.
attributes.hostName target.hostname, target.asset.hostname O valor do campo attributes.hostName do registo não processado.
attributes.interface additional.fields[2].value.string_value O valor do campo attributes.interface do registo não processado, colocado num objeto additional.fields. A chave está definida como "Interface".
attributes.operation security_result.action_details O valor do campo attributes.operation do registo não processado.
attributes.previousValue additional.fields[3].value.string_value O valor do campo attributes.previousValue do registo não processado, colocado num objeto additional.fields. A chave está definida como "Previous Value" (Valor anterior).
attributes.provisioningResult security_result.detection_fields.value O valor do campo attributes.provisioningResult do registo não processado, colocado num objeto security_result.detection_fields. A chave está definida como "Provisioning Result".
attributes.sourceId principal.labels[0].value O valor do campo attributes.sourceId do registo não processado, colocado num objeto principal.labels. A chave está definida como "ID da origem".
attributes.sourceName principal.labels[1].value O valor do campo attributes.sourceName do registo não processado, colocado num objeto principal.labels. A chave está definida como "Nome da origem".
auditClassName metadata.product_event_type O valor do campo auditClassName do registo não processado.
created metadata.event_timestamp.seconds, metadata.event_timestamp.nanos O valor do campo created do registo não processado, convertido em data/hora se instant.epochSecond não estiver presente.
id metadata.product_log_id O valor do campo id do registo não processado.
instant.epochSecond metadata.event_timestamp.seconds O valor do campo instant.epochSecond do registo não processado, usado para a data/hora.
ipAddress principal.asset.ip, principal.ip O valor do campo ipAddress do registo não processado.
interface additional.fields[0].value.string_value O valor do campo interface do registo não processado, colocado num objeto additional.fields. A chave está definida como "interface".
loggerName intermediary.application O valor do campo loggerName do registo não processado.
message metadata.description, security_result.description Usado para vários fins, incluindo a definição da descrição nos metadados e security_result, e a extração de conteúdo XML.
name security_result.description O valor do campo name do registo não processado.
operation target.resource.attribute.labels[0].value, metadata.product_event_type O valor do campo operation do registo não processado, colocado num objeto target.resource.attribute.labels. A chave está definida como "operation". Também é usado para metadata.product_event_type.
org principal.administrative_domain O valor do campo org do registo não processado.
pod principal.location.name O valor do campo pod do registo não processado.
referenceClass additional.fields[1].value.string_value O valor do campo referenceClass do registo não processado, colocado num objeto additional.fields. A chave está definida como "referenceClass".
referenceId additional.fields[2].value.string_value O valor do campo referenceId do registo não processado, colocado num objeto additional.fields. A chave está definida como "referenceId".
sailPointObjectName additional.fields[3].value.string_value O valor do campo sailPointObjectName do registo não processado, colocado num objeto additional.fields. A chave está definida como "sailPointObjectName".
serverHost principal.hostname, principal.asset.hostname O valor do campo serverHost do registo não processado.
stack additional.fields[4].value.string_value O valor do campo stack do registo não processado, colocado num objeto additional.fields. A chave está definida como "Stack".
status security_result.severity_details O valor do campo status do registo não processado.
target additional.fields[4].value.string_value O valor do campo target do registo não processado, colocado num objeto additional.fields. A chave está definida como "target".
target.name principal.user.userid O valor do campo target.name do registo não processado.
technicalName security_result.summary O valor do campo technicalName do registo não processado.
thrown.cause.message xml_body, detailed_message O valor do campo thrown.cause.message do registo não processado, usado para extrair conteúdo XML.
thrown.message xml_body, detailed_message O valor do campo thrown.message do registo não processado, usado para extrair conteúdo XML.
trackingNumber additional.fields[5].value.string_value O valor do campo trackingNumber do registo não processado, colocado num objeto additional.fields. A chave está definida como "Número de acompanhamento".
type metadata.product_event_type O valor do campo type do registo não processado.
_version metadata.product_version O valor do campo _version do registo não processado.
N/A metadata.event_timestamp Derivados dos campos instant.epochSecond ou created.
N/A metadata.event_type Determinado pela lógica de análise com base em vários campos, incluindo has_principal_user, has_target_application, technicalName e action. O valor predefinido é "GENERIC_EVENT".
N/A metadata.log_type Definido como "SAILPOINT_IAM".
N/A metadata.product_name Definido como "IAM".
N/A metadata.vendor_name Definido como "SAILPOINT".
N/A extensions.auth.type Definido como "AUTHTYPE_UNSPECIFIED" em determinadas condições.
N/A target.resource.attribute.labels[0].key Definido como "operation".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.