Coletar registros do IAM do SailPoint

Compatível com:

Este documento explica como ingerir registros do IAM do SailPoint no Google Security Operations usando o Amazon S3.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Uma instância do Google SecOps
  • Acesso privilegiado ao locatário ou à API do SailPoint Identity Security Cloud
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Coletar pré-requisitos do IAM do SailPoint (IDs, chaves de API, IDs da organização, tokens)

  1. Faça login no Console de administração do SailPoint Identity Security Cloud como administrador.
  2. Acesse Global > Configurações de segurança > Gerenciamento de API.
  3. Clique em Criar cliente de API.
  4. Escolha Credenciais do cliente como o tipo de concessão.
  5. Informe os seguintes detalhes de configuração:
    • Nome: insira um nome descritivo, por exemplo, Chronicle Export API.
    • Descrição: insira uma descrição para o cliente de API.
    • Escopos: selecione sp:scopes:all ou os escopos de leitura adequados para eventos de auditoria.
  6. Clique em Criar e copie as credenciais de API geradas com segurança.
  7. Registre o URL base do locatário do SailPoint (por exemplo, https://tenant.api.identitynow.com).
  8. Copie e salve em um local seguro os seguintes detalhes:
    • IDN_CLIENT_ID
    • IDN_CLIENT_SECRET
    • IDN_BASE

Configurar o bucket do AWS S3 e o IAM para o Google SecOps

  1. Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
  2. Salve o Nome e a Região do bucket para referência futura (por exemplo, sailpoint-iam-logs).
  3. Crie um usuário seguindo este guia: Como criar um usuário do IAM.
  4. Selecione o usuário criado.
  5. Selecione a guia Credenciais de segurança.
  6. Clique em Criar chave de acesso na seção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Caso de uso.
  8. Clique em Próxima.
  9. Opcional: adicione uma tag de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
  12. Clique em Concluído.
  13. Selecione a guia Permissões.
  14. Clique em Adicionar permissões na seção Políticas de permissões.
  15. Selecione Adicionar permissões.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clique em Próxima.
  19. Clique em Adicionar permissões

Configurar a política e o papel do IAM para uploads do S3

  1. No console da AWS, acesse IAM > Políticas > Criar política > guia JSON.
  2. Copie e cole a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/sailpoint/iam/state.json"
        }
      ]
    }
    
    • Substitua sailpoint-iam-logs se você tiver inserido um nome de bucket diferente:
  3. Clique em Próxima > Criar política.

  4. Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.

  5. Anexe a política recém-criada.

  6. Nomeie a função como SailPointIamToS3Role e clique em Criar função.

Criar a função Lambda

  1. No console da AWS, acesse Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nome sailpoint_iam_to_s3
    Ambiente de execução Python 3.13
    Arquitetura x86_64
    Função de execução SailPointIamToS3Role
  4. Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (sailpoint_iam_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Pull SailPoint Identity Security Cloud audit events and store raw JSONL payloads to S3
    # - Uses /v3/search API with pagination for audit events.
    # - Preserves vendor-native JSON format for identity events.
    # - Retries with exponential backoff; unique S3 keys to avoid overwrites.
    # - Outputs JSONL format (one event per line) for optimal Chronicle ingestion.
    
    import os, json, time, uuid, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import URLError, HTTPError
    
    import boto3
    
    S3_BUCKET   = os.environ["S3_BUCKET"]
    S3_PREFIX   = os.environ.get("S3_PREFIX", "sailpoint/iam/")
    STATE_KEY   = os.environ.get("STATE_KEY", "sailpoint/iam/state.json")
    WINDOW_SEC  = int(os.environ.get("WINDOW_SECONDS", "3600"))  # default 1h
    HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60"))
    IDN_BASE    = os.environ["IDN_BASE"]  # e.g. https://tenant.api.identitynow.com
    CLIENT_ID   = os.environ["IDN_CLIENT_ID"]
    CLIENT_SECRET = os.environ["IDN_CLIENT_SECRET"]
    SCOPE       = os.environ.get("IDN_SCOPE", "sp:scopes:all")
    PAGE_SIZE   = int(os.environ.get("PAGE_SIZE", "250"))
    MAX_PAGES   = int(os.environ.get("MAX_PAGES", "20"))
    MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
    USER_AGENT  = os.environ.get("USER_AGENT", "sailpoint-iam-to-s3/1.0")
    
    s3 = boto3.client("s3")
    
    def _load_state():
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {}
    
    def _save_state(st):
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(st, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _get_oauth_token() -> str:
        """Get OAuth2 access token using Client Credentials flow"""
        token_url = f"{IDN_BASE.rstrip('/')}/oauth/token"
    
        data = urllib.parse.urlencode({
            'grant_type': 'client_credentials',
            'client_id': CLIENT_ID,
            'client_secret': CLIENT_SECRET,
            'scope': SCOPE
        }).encode('utf-8')
    
        req = Request(token_url, data=data, method="POST")
        req.add_header("Content-Type", "application/x-www-form-urlencoded")
        req.add_header("User-Agent", USER_AGENT)
    
        with urlopen(req, timeout=HTTP_TIMEOUT) as r:
            response = json.loads(r.read())
            return response["access_token"]
    
    def _search_events(access_token: str, created_from: str, search_after: list = None) -> list:
        """Search for audit events using SailPoint's /v3/search API
    
        IMPORTANT: SailPoint requires colons in ISO8601 timestamps to be escaped with backslashes.
        Example: 2024-01-15T10:30:00Z must be sent as 2024-01-15T10\:30\:00Z
        Reference: https://developer.sailpoint.com/discuss/t/datetime-searches/6609
        """
        search_url = f"{IDN_BASE.rstrip('/')}/v3/search"
    
        # Escape colons in timestamp for SailPoint search query
        # SailPoint requires: created:>=2024-01-15T10\:30\:00Z (colons must be escaped)
        escaped_timestamp = created_from.replace(":", "\\:")
        query_str = f'created:>={escaped_timestamp}'
    
        payload = {
            "indices": ["events"],
            "query": {"query": query_str},
            "sort": ["created", "+id"],
            "limit": PAGE_SIZE
        }
    
        if search_after:
            payload["searchAfter"] = search_after
    
        attempt = 0
        while True:
            req = Request(search_url, data=json.dumps(payload).encode('utf-8'), method="POST")
            req.add_header("Content-Type", "application/json")
            req.add_header("Accept", "application/json")
            req.add_header("Authorization", f"Bearer {access_token}")
            req.add_header("User-Agent", USER_AGENT)
    
            try:
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    response = json.loads(r.read())
                    # Handle different response formats
                    if isinstance(response, list):
                        return response
                    return response.get("results", response.get("data", []))
            except (HTTPError, URLError) as e:
                attempt += 1
                print(f"HTTP error on attempt {attempt}: {e}")
                if attempt > MAX_RETRIES:
                    raise
                # exponential backoff with jitter
                time.sleep(min(60, 2 ** attempt) + (time.time() % 1))
    
    def _put_events_data(events: list, from_ts: float, to_ts: float, page_num: int) -> str:
        """Write events to S3 in JSONL format (one JSON object per line)
    
        JSONL format is preferred for Chronicle ingestion as it allows:
        - Line-by-line processing
        - Better error recovery
        - Lower memory footprint
        """
        # Create unique S3 key for events data
        ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts))
        uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}"
        key = f"{S3_PREFIX}{ts_path}/sailpoint_iam_{int(from_ts)}_{int(to_ts)}_p{page_num:03d}_{uniq}.jsonl"
    
        # Convert events list to JSONL format (one JSON object per line)
        jsonl_lines = [json.dumps(event, separators=(",", ":")) for event in events]
        jsonl_content = "\n".join(jsonl_lines)
    
        s3.put_object(
            Bucket=S3_BUCKET, 
            Key=key, 
            Body=jsonl_content.encode("utf-8"), 
            ContentType="application/x-ndjson",  # JSONL MIME type
            Metadata={
                'source': 'sailpoint-iam',
                'from_timestamp': str(int(from_ts)),
                'to_timestamp': str(int(to_ts)),
                'page_number': str(page_num),
                'events_count': str(len(events)),
                'format': 'jsonl'
            }
        )
        return key
    
    def _get_item_id(item: dict) -> str:
        """Extract ID from event item, trying multiple possible fields"""
        for field in ("id", "uuid", "eventId", "_id"):
            if field in item and item[field]:
                return str(item[field])
        return ""
    
    def lambda_handler(event=None, context=None):
        st = _load_state()
        now = time.time()
        from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC))
        to_ts = now
    
        # Get OAuth token
        access_token = _get_oauth_token()
    
        created_from = _iso(from_ts)
        print(f"Fetching SailPoint IAM events from: {created_from}")
    
        # Handle pagination state
        last_created = st.get("last_created")
        last_id = st.get("last_id")
        search_after = [last_created, last_id] if (last_created and last_id) else None
    
        pages = 0
        total_events = 0
        written_keys = []
        newest_created = last_created or created_from
        newest_id = last_id or ""
    
        while pages < MAX_PAGES:
            events = _search_events(access_token, created_from, search_after)
    
            if not events:
                break
    
            # Write page to S3 in JSONL format
            key = _put_events_data(events, from_ts, to_ts, pages + 1)
            written_keys.append(key)
            total_events += len(events)
    
            # Update pagination state from last item
            last_event = events[-1]
            last_event_created = last_event.get("created") or last_event.get("metadata", {}).get("created")
            last_event_id = _get_item_id(last_event)
    
            if last_event_created:
                newest_created = last_event_created
            if last_event_id:
                newest_id = last_event_id
    
            search_after = [newest_created, newest_id]
            pages += 1
    
            # If we got less than page size, we're done
            if len(events) < PAGE_SIZE:
                break
    
        print(f"Successfully retrieved {total_events} events across {pages} pages")
    
        # Save state for next run
        st["last_to_ts"] = to_ts
        st["last_created"] = newest_created
        st["last_id"] = newest_id
        st["last_successful_run"] = now
        _save_state(st)
    
        return {
            "statusCode": 200,
            "body": {
                "success": True,
                "pages": pages,
                "total_events": total_events,
                "s3_keys": written_keys,
                "from_timestamp": from_ts,
                "to_timestamp": to_ts,
                "last_created": newest_created,
                "last_id": newest_id,
                "format": "jsonl"
            }
        }
    
    if __name__ == "__main__":
        print(lambda_handler())
    

  5. Acesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  6. Insira as seguintes variáveis de ambiente, substituindo pelos seus valores.

    Variáveis de ambiente

    Chave Valor de exemplo
    S3_BUCKET sailpoint-iam-logs
    S3_PREFIX sailpoint/iam/
    STATE_KEY sailpoint/iam/state.json
    WINDOW_SECONDS 3600
    HTTP_TIMEOUT 60
    MAX_RETRIES 3
    USER_AGENT sailpoint-iam-to-s3/1.0
    IDN_BASE https://tenant.api.identitynow.com
    IDN_CLIENT_ID your-client-id (da etapa 2)
    IDN_CLIENT_SECRET your-client-secret (da etapa 2)
    IDN_SCOPE sp:scopes:all
    PAGE_SIZE 250
    MAX_PAGES 20
  7. Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.

  8. Selecione a guia Configuração.

  9. No painel Configuração geral, clique em Editar.

  10. Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.

Criar uma programação do EventBridge

  1. Acesse Amazon EventBridge > Scheduler > Criar programação.
  2. Informe os seguintes detalhes de configuração:
    • Programação recorrente: Taxa (1 hour).
    • Destino: sua função Lambda sailpoint_iam_to_s3.
    • Nome: sailpoint-iam-1h.
  3. Clique em Criar programação.

Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps

  1. Acesse Console da AWS > IAM > Usuários > Adicionar usuários.
  2. Clique em Add users.
  3. Informe os seguintes detalhes de configuração:
    • Usuário: insira secops-reader.
    • Tipo de acesso: selecione Chave de acesso – Acesso programático.
  4. Clique em Criar usuário.
  5. Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
  6. No editor JSON, insira a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::sailpoint-iam-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::sailpoint-iam-logs"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.

  9. Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Faça o download do CSV (esses valores são inseridos no feed).

Configurar um feed no Google SecOps para ingerir registros do IAM do SailPoint

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. Na próxima página, clique em Configurar um único feed.
  4. No campo Nome do feed, insira um nome para o feed (por exemplo, SailPoint IAM logs).
  5. Selecione Amazon S3 V2 como o Tipo de origem.
  6. Selecione SailPoint IAM como o Tipo de registro.
  7. Clique em Próxima.
  8. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://sailpoint-iam-logs/sailpoint/iam/
    • Opções de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência.
    • Idade máxima do arquivo: padrão de 180 dias.
    • ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
    • Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
    • Namespace do recurso: o namespace do recurso.
    • Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
  9. Clique em Próxima.
  10. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
action metadata.description O valor do campo action do registro bruto.
actor.name principal.user.user_display_name O valor do campo actor.name do registro bruto.
attributes.accountName principal.user.group_identifiers O valor do campo attributes.accountName do registro bruto.
attributes.appId target.asset_id "ID do app: " concatenado com o valor do campo attributes.appId do registro bruto.
attributes.attributeName additional.fields[0].value.string_value O valor do campo attributes.attributeName do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Nome do atributo".
attributes.attributeValue additional.fields[1].value.string_value O valor do campo attributes.attributeValue do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Valor do atributo".
attributes.cloudAppName target.application O valor do campo attributes.cloudAppName do registro bruto.
attributes.hostName target.hostname, target.asset.hostname O valor do campo attributes.hostName do registro bruto.
attributes.interface additional.fields[2].value.string_value O valor do campo attributes.interface do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Interface".
attributes.operation security_result.action_details O valor do campo attributes.operation do registro bruto.
attributes.previousValue additional.fields[3].value.string_value O valor do campo attributes.previousValue do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Valor anterior".
attributes.provisioningResult security_result.detection_fields.value O valor do campo attributes.provisioningResult do registro bruto, colocado em um objeto security_result.detection_fields. A chave é definida como "Provisioning Result".
attributes.sourceId principal.labels[0].value O valor do campo attributes.sourceId do registro bruto, colocado em um objeto principal.labels. A chave é definida como "ID da origem".
attributes.sourceName principal.labels[1].value O valor do campo attributes.sourceName do registro bruto, colocado em um objeto principal.labels. A chave é definida como "Nome da origem".
auditClassName metadata.product_event_type O valor do campo auditClassName do registro bruto.
created metadata.event_timestamp.seconds, metadata.event_timestamp.nanos O valor do campo created do registro bruto, convertido em carimbo de data/hora se instant.epochSecond não estiver presente.
id metadata.product_log_id O valor do campo id do registro bruto.
instant.epochSecond metadata.event_timestamp.seconds O valor do campo instant.epochSecond do registro bruto, usado para carimbo de data/hora.
ipAddress principal.asset.ip, principal.ip O valor do campo ipAddress do registro bruto.
interface additional.fields[0].value.string_value O valor do campo interface do registro bruto, colocado em um objeto additional.fields. A chave é definida como "interface".
loggerName intermediary.application O valor do campo loggerName do registro bruto.
message metadata.description, security_result.description Usado para várias finalidades, incluindo definir a descrição em metadados e security_result, além de extrair conteúdo XML.
name security_result.description O valor do campo name do registro bruto.
operation target.resource.attribute.labels[0].value, metadata.product_event_type O valor do campo operation do registro bruto, colocado em um objeto target.resource.attribute.labels. A chave é definida como "operation". Também usado para metadata.product_event_type.
org principal.administrative_domain O valor do campo org do registro bruto.
pod principal.location.name O valor do campo pod do registro bruto.
referenceClass additional.fields[1].value.string_value O valor do campo referenceClass do registro bruto, colocado em um objeto additional.fields. A chave é definida como "referenceClass".
referenceId additional.fields[2].value.string_value O valor do campo referenceId do registro bruto, colocado em um objeto additional.fields. A chave é definida como "referenceId".
sailPointObjectName additional.fields[3].value.string_value O valor do campo sailPointObjectName do registro bruto, colocado em um objeto additional.fields. A chave é definida como "sailPointObjectName".
serverHost principal.hostname, principal.asset.hostname O valor do campo serverHost do registro bruto.
stack additional.fields[4].value.string_value O valor do campo stack do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Stack".
status security_result.severity_details O valor do campo status do registro bruto.
target additional.fields[4].value.string_value O valor do campo target do registro bruto, colocado em um objeto additional.fields. A chave é definida como "target".
target.name principal.user.userid O valor do campo target.name do registro bruto.
technicalName security_result.summary O valor do campo technicalName do registro bruto.
thrown.cause.message xml_body, detailed_message O valor do campo thrown.cause.message do registro bruto, usado para extrair conteúdo XML.
thrown.message xml_body, detailed_message O valor do campo thrown.message do registro bruto, usado para extrair conteúdo XML.
trackingNumber additional.fields[5].value.string_value O valor do campo trackingNumber do registro bruto, colocado em um objeto additional.fields. A chave é definida como "Número de rastreamento".
type metadata.product_event_type O valor do campo type do registro bruto.
_version metadata.product_version O valor do campo _version do registro bruto.
N/A metadata.event_timestamp Derivado dos campos instant.epochSecond ou created.
N/A metadata.event_type Determinado pela lógica do analisador com base em vários campos, incluindo has_principal_user, has_target_application, technicalName e action. O valor padrão é "GENERIC_EVENT".
N/A metadata.log_type Defina como "SAILPOINT_IAM".
N/A metadata.product_name Defina como "IAM".
N/A metadata.vendor_name Defina como "SAILPOINT".
N/A extensions.auth.type Definido como "AUTHTYPE_UNSPECIFIED" em determinadas condições.
N/A target.resource.attribute.labels[0].key Defina como "operation".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.