Coletar registros do URLScan IO

Compatível com:

Este documento explica como ingerir registros do URLScan IO no Google Security Operations usando o Google Cloud Storage. O URLScan IO é um serviço que analisa sites e fornece informações detalhadas sobre comportamento, segurança e desempenho. Ele verifica URLs e gera relatórios abrangentes, incluindo capturas de tela, transações HTTP, registros DNS e dados de inteligência de ameaças.

Antes de começar

Verifique se você atende os seguintes pré-requisitos:

  • Uma instância do Google SecOps
  • Um projeto do GCP com a API Cloud Storage ativada
  • Permissões para criar e gerenciar buckets do GCS
  • Permissões para gerenciar políticas do IAM em buckets do GCS
  • Permissões para criar serviços do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
  • Acesso privilegiado ao locatário do URLScan IO

Conferir os pré-requisitos do URLScan IO

  1. Faça login no URLScan IO.
  2. Clique no ícone do seu perfil.
  3. Selecione Chave de API no menu.
  4. Se você ainda não tiver uma chave de API:
    1. Clique no botão Criar chave de API.
    2. Insira uma descrição para a chave de API (por exemplo, Google SecOps Integration).
    3. Clique em Gerar chave de API.
  5. Copie e salve em um local seguro os seguintes detalhes:
    • API_KEY: a string da chave de API gerada (formato: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx)
    • URL base da API: https://urlscan.io/api/v1 (constante para todos os usuários)
  6. Observe os limites de cota da API:
    • As contas sem custo financeiro e Pro estão sujeitas a limites por minuto, por hora e por dia que variam de acordo com a ação. Verifique suas cotas pessoais ou os cabeçalhos de limite de taxa da API para saber seus limites exatos.
    • Para mais detalhes, consulte a documentação sobre limites de taxa da API URLScan IO.
  7. Se você precisar restringir as pesquisas apenas às verificações da sua organização, anote:

    • Identificador do usuário: seu nome de usuário ou e-mail (para uso com o filtro de pesquisa user:)
    • Identificador da equipe: se você estiver usando o recurso de equipes (para uso com o filtro de pesquisa team:)

Verificar o acesso à API

  • Teste sua chave de API antes de prosseguir com a integração:

    # Replace with your actual API key
    API_KEY="your-api-key-here"
    
    # Test API access
    curl -v -H "API-Key: ${API_KEY}" "https://urlscan.io/api/v1/search/?q=date:>now-1h&size=1"
    

Resposta esperada: HTTP 200 com JSON contendo resultados da pesquisa.

Se você receber HTTP 401 ou 403, verifique se a chave de API está correta e não expirou.

Criar um bucket do Google Cloud Storage

  1. Acesse o Console do Google Cloud.
  2. Selecione seu projeto ou crie um novo.
  3. No menu de navegação, acesse Cloud Storage > Buckets.
  4. Clique em Criar bucket.
  5. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, urlscan-logs-bucket.
    Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional)
    Local Selecione o local (por exemplo, us-central1).
    Classe de armazenamento Padrão (recomendado para registros acessados com frequência)
    Controle de acesso Uniforme (recomendado)
    Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção
  6. Clique em Criar.

Criar uma conta de serviço para a função do Cloud Run

A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS e ser invocada pelo Pub/Sub.

Criar conta de serviço

  1. No Console do GCP, acesse IAM e administrador > Contas de serviço.
  2. Clique em Criar conta de serviço.
  3. Informe os seguintes detalhes de configuração:
    • Nome da conta de serviço: insira urlscan-collector-sa.
    • Descrição da conta de serviço: insira Service account for Cloud Run function to collect URLScan IO logs.
  4. Clique em Criar e continuar.
  5. Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
    1. Clique em Selecionar papel.
    2. Pesquise e selecione Administrador de objetos do Storage.
    3. Clique em + Adicionar outro papel.
    4. Pesquise e selecione Invocador do Cloud Run.
    5. Clique em + Adicionar outro papel.
    6. Pesquise e selecione Invocador do Cloud Functions.
  6. Clique em Continuar.
  7. Clique em Concluído.

Esses papéis são necessários para:

  • Administrador de objetos do Storage: grava registros em um bucket do GCS e gerencia arquivos de estado.
  • Invocador do Cloud Run: permite que o Pub/Sub invoque a função
  • Invocador do Cloud Functions: permite a invocação de funções

Conceder permissões do IAM no bucket do GCS

Conceda permissões de gravação à conta de serviço no bucket do GCS:

  1. Acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket.
  3. Acesse a guia Permissões.
  4. Clique em Conceder acesso.
  5. Informe os seguintes detalhes de configuração:
    • Adicionar principais: insira o e-mail da conta de serviço (por exemplo, urlscan-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Atribuir papéis: selecione Administrador de objetos do Storage.
  6. Clique em Salvar.

Criar tópico Pub/Sub

Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.

  1. No Console do GCP, acesse Pub/Sub > Tópicos.
  2. Selecione Criar tópico.
  3. Informe os seguintes detalhes de configuração:
    • ID do tópico: insira urlscan-logs-trigger.
    • Não altere as outras configurações.
  4. Clique em Criar.

Criar uma função do Cloud Run para coletar registros

A função do Cloud Run é acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar registros da API URLScan IO e gravá-los no GCS.

  1. No console do GCP, acesse o Cloud Run.
  2. Clique em Criar serviço.
  3. Selecione Função (use um editor in-line para criar uma função).
  4. Na seção Configurar, forneça os seguintes detalhes de configuração:

    Configuração Valor
    Nome do serviço urlscan-collector
    Região Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).
    Ambiente de execução Selecione Python 3.12 ou uma versão mais recente.
  5. Na seção Acionador (opcional):

    1. Clique em + Adicionar gatilho.
    2. Selecione Cloud Pub/Sub.
    3. Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico do Pub/Sub (urlscan-logs-trigger).
    4. Clique em Salvar.
  6. Na seção Autenticação:

    1. Selecione Exigir autenticação.
    2. Confira o Identity and Access Management (IAM).
  7. Role a tela para baixo e abra Contêineres, rede, segurança.

  8. Acesse a guia Segurança:

    • Conta de serviço: selecione a conta de serviço (urlscan-collector-sa).
  9. Acesse a guia Contêineres:

    1. Clique em Variáveis e secrets.
    2. Clique em + Adicionar variável para cada variável de ambiente:
    Nome da variável Valor de exemplo Descrição
    GCS_BUCKET urlscan-logs-bucket Nome do bucket do GCS
    GCS_PREFIX urlscan/ Prefixo para arquivos de registro
    STATE_KEY urlscan/state.json Caminho do arquivo de estado
    API_KEY your-urlscan-api-key Chave da API URLScan IO
    API_BASE https://urlscan.io/api/v1 URL base da API
    SEARCH_QUERY date:>now-1h Filtro de consulta de pesquisa
    PAGE_SIZE 100 Registros por página
    MAX_PAGES 10 Número máximo de páginas a serem buscadas
  10. Na seção Variáveis e secrets, role a tela para baixo até Solicitações:

    • Tempo limite da solicitação: insira 600 segundos (10 minutos).
  11. Acesse a guia Configurações:

    • Na seção Recursos:
      • Memória: selecione 512 MiB ou mais.
      • CPU: selecione 1.
  12. Na seção Escalonamento de revisão:

    • Número mínimo de instâncias: insira 0.
    • Número máximo de instâncias: insira 100 ou ajuste com base na carga esperada.
  13. Clique em Criar.

  14. Aguarde a criação do serviço (1 a 2 minutos).

  15. Depois que o serviço é criado, o editor de código inline é aberto automaticamente.

Adicionar código da função

  1. Insira main em Ponto de entrada da função.
  2. No editor de código em linha, crie dois arquivos:

    • Primeiro arquivo: main.py::
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timedelta, timezone
    import time
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'urlscan/')
    STATE_KEY = os.environ.get('STATE_KEY', 'urlscan/state.json')
    API_KEY = os.environ.get('API_KEY')
    API_BASE = os.environ.get('API_BASE', 'https://urlscan.io/api/v1')
    SEARCH_QUERY = os.environ.get('SEARCH_QUERY', 'date:>now-1h')
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch URLScan IO results and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, API_KEY]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
            last_run = state.get('last_run')
    
            # Adjust search query based on last run
            search_query = SEARCH_QUERY
            if last_run:
                try:
                    search_time = parse_datetime(last_run)
                    time_diff = datetime.now(timezone.utc) - search_time
                    hours = int(time_diff.total_seconds() / 3600) + 1
                    search_query = f'date:>now-{hours}h'
                except Exception as e:
                    print(f'Warning: Could not parse last_run: {e}')
    
            print(f'Searching with query: {search_query}')
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_base=API_BASE,
                api_key=API_KEY,
                search_query=search_query,
                page_size=PAGE_SIZE,
                max_pages=MAX_PAGES,
            )
    
            if not records:
                print("No new log records found.")
                now = datetime.now(timezone.utc)
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            now = datetime.now(timezone.utc)
            file_key = f"{GCS_PREFIX}year={now.year}/month={now.month:02d}/day={now.day:02d}/hour={now.hour:02d}/urlscan_{now.strftime('%Y%m%d_%H%M%S')}.json"
    
            ndjson_content = '\n'.join([json.dumps(r, separators=(',', ':')) for r in records])
    
            blob = bucket.blob(file_key)
            blob.upload_from_string(
                ndjson_content,
                content_type='application/x-ndjson'
            )
    
            print(f"Uploaded {len(records)} results to gs://{GCS_BUCKET}/{file_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f'Successfully processed {len(records)} scan results')
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f'Warning: Could not load state: {str(e)}')
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_run': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_run={last_event_time_iso}")
        except Exception as e:
            print(f'Warning: Could not save state: {str(e)}')
    
    def fetch_logs(api_base: str, api_key: str, search_query: str, page_size: int, max_pages: int):
        """
        Fetch logs from URLScan IO API with pagination and rate limiting.
    
        Args:
            api_base: API base URL
            api_key: URLScan IO API key
            search_query: Search query string
            page_size: Number of records per page
            max_pages: Maximum total pages to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
    
        headers = {
            'API-Key': api_key,
            'Accept': 'application/json',
            'User-Agent': 'GoogleSecOps-URLScanCollector/1.0'
        }
    
        all_results = []
        newest_time = None
        page_num = 0
        backoff = 1.0
        offset = 0
    
        while page_num < max_pages:
            page_num += 1
    
            # Build search URL with pagination
            search_url = f"{api_base}/search/"
            params = [
                f"q={search_query}",
                f"size={page_size}",
                f"offset={offset}"
            ]
            url = f"{search_url}?{'&'.join(params)}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"Search failed: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    break
    
                search_data = json.loads(response.data.decode('utf-8'))
                results = search_data.get('results', [])
    
                if not results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(results)} scan results")
    
                # Fetch full result for each scan
                for result in results:
                    task = result.get('task', {})
                    uuid = task.get('uuid')
                    if uuid:
                        result_url = f"{api_base}/result/{uuid}/"
    
                        try:
                            result_response = http.request('GET', result_url, headers=headers)
    
                            # Handle rate limiting
                            if result_response.status == 429:
                                retry_after = int(result_response.headers.get('Retry-After', '5'))
                                print(f"Rate limited on result fetch. Retrying after {retry_after}s...")
                                time.sleep(retry_after)
                                result_response = http.request('GET', result_url, headers=headers)
    
                            if result_response.status == 200:
                                full_result = json.loads(result_response.data.decode('utf-8'))
                                all_results.append(full_result)
    
                                # Track newest event time
                                try:
                                    event_time = task.get('time')
                                    if event_time:
                                        if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                            newest_time = event_time
                                except Exception as e:
                                    print(f"Warning: Could not parse event time: {e}")
                            else:
                                print(f"Failed to fetch result for {uuid}: {result_response.status}")
                        except Exception as e:
                            print(f"Error fetching result for {uuid}: {e}")
    
                # Check if we have more pages
                total = search_data.get('total', 0)
                if offset + len(results) >= total or len(results) < page_size:
                    print(f"Reached last page (offset={offset}, results={len(results)}, total={total})")
                    break
    
                offset += len(results)
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(all_results)} total records from {page_num} pages")
        return all_results, newest_time
    
    • Segundo arquivo: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. Clique em Implantar para salvar e implantar a função.

  4. Aguarde a conclusão da implantação (2 a 3 minutos).

Criar o job do Cloud Scheduler

O Cloud Scheduler publica mensagens no tópico do Pub/Sub em intervalos regulares, acionando a função do Cloud Run.

  1. No Console do GCP, acesse o Cloud Scheduler.
  2. Clique em Criar job.
  3. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nome urlscan-collector-hourly
    Região Selecione a mesma região da função do Cloud Run
    Frequência 0 * * * * (a cada hora, na hora)
    Fuso horário Selecione o fuso horário (UTC recomendado)
    Tipo de destino Pub/Sub
    Tópico Selecione o tópico do Pub/Sub (urlscan-logs-trigger).
    Corpo da mensagem {} (objeto JSON vazio)
  4. Clique em Criar.

Opções de frequência de programação

  • Escolha a frequência com base no volume de registros e nos requisitos de latência:

    Frequência Expressão Cron Caso de uso
    A cada 5 minutos */5 * * * * Alto volume e baixa latência
    A cada 15 minutos */15 * * * * Volume médio
    A cada hora 0 * * * * Padrão (recomendado)
    A cada 6 horas 0 */6 * * * Baixo volume, processamento em lote
    Diário 0 0 * * * Coleta de dados históricos

Testar a integração

  1. No console do Cloud Scheduler, encontre seu job (urlscan-collector-hourly).
  2. Clique em Executar à força para acionar o job manualmente.
  3. Aguarde alguns segundos.
  4. Acesse Cloud Run > Serviços.
  5. Clique no nome da função (urlscan-collector).
  6. Clique na guia Registros.
  7. Verifique se a função foi executada com sucesso. Procure o seguinte:

    Searching with query: date:>now-1h
    Page 1: Retrieved X scan results
    Uploaded X results to gs://bucket-name/urlscan/year=YYYY/month=MM/day=DD/hour=HH/urlscan_YYYYMMDD_HHMMSS.json
    Successfully processed X scan results
    
  8. Acesse Cloud Storage > Buckets.

  9. Clique no nome do bucket.

  10. Navegue até a pasta de prefixo (urlscan/).

  11. Verifique se um novo arquivo .json foi criado com o carimbo de data/hora atual.

Se você encontrar erros nos registros:

  • HTTP 401: verifique a chave de API nas variáveis de ambiente
  • HTTP 403: verifique se a chave de API não expirou
  • HTTP 429: limitação de taxa. A função vai tentar novamente automaticamente com espera.
  • Variáveis de ambiente ausentes: verifique se todas as variáveis necessárias estão definidas.
  • Falha na pesquisa: verifique se a sintaxe da consulta de pesquisa está correta

Recuperar a conta de serviço do Google SecOps

O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.

Receber o e-mail da conta de serviço

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. Clique em Configurar um único feed.
  4. No campo Nome do feed, insira um nome para o feed (por exemplo, URLScan IO logs).
  5. Selecione Google Cloud Storage V2 como o Tipo de origem.
  6. Selecione URLScan IO como o Tipo de registro.
  7. Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço é exibido, por exemplo:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copie esse endereço de e-mail para usar na próxima etapa.

Conceder permissões do IAM à conta de serviço do Google SecOps

A conta de serviço do Google SecOps precisa do papel de Leitor de objetos do Storage no seu bucket do GCS.

  1. Acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket.
  3. Acesse a guia Permissões.
  4. Clique em Conceder acesso.
  5. Informe os seguintes detalhes de configuração:
    • Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps.
    • Atribuir papéis: selecione Leitor de objetos do Storage.
  6. Clique em Salvar.

Configurar um feed no Google SecOps para ingerir registros do URLScan IO

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. Clique em Configurar um único feed.
  4. No campo Nome do feed, insira um nome para o feed (por exemplo, URLScan IO logs).
  5. Selecione Google Cloud Storage V2 como o Tipo de origem.
  6. Selecione URLScan IO como o Tipo de registro.
  7. Clique em Próxima.
  8. Especifique valores para os seguintes parâmetros de entrada:

    • URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:

      gs://urlscan-logs-bucket/urlscan/
      
      • Substitua:

        • urlscan-logs-bucket: o nome do bucket do GCS.
        • urlscan/: prefixo/caminho da pasta opcional onde os registros são armazenados (deixe em branco para a raiz).

          Exemplos:

          • Bucket raiz: gs://urlscan-logs-bucket/
          • Com prefixo: gs://urlscan-logs-bucket/urlscan/
    • Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:

      • Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
      • Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
      • Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.

    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.

    • Namespace do recurso: o namespace do recurso.

    • Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.

  9. Clique em Próxima.

  10. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.