Recolha registos do URLScan IO
Este documento explica como carregar registos do URLScan IO para o Google Security Operations através do Amazon S3.
Antes de começar
Certifique-se de que cumpre os seguintes pré-requisitos:
- Uma instância do Google SecOps
- Acesso privilegiado ao inquilino do URLScan IO
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Obtenha os pré-requisitos do URLScan IO
- Inicie sessão no URLScan IO.
- Clique no ícone do perfil.
- Selecione Chave da API no menu.
- Se ainda não tiver uma chave da API:
- Clique no botão Criar chave da API.
- Introduza uma descrição para a chave da API (por exemplo,
Google SecOps Integration). - Selecione as autorizações para a chave (para acesso só de leitura, selecione as autorizações Ler).
- Clique em Gerar chave da API.
- Copie e guarde numa localização segura os seguintes detalhes:
- API_KEY: a string da chave da API gerada (formato:
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) - URL base da API:
https://urlscan.io/api/v1(é constante para todos os utilizadores)
- API_KEY: a string da chave da API gerada (formato:
- Tenha em atenção os limites de quota da API:
- Contas gratuitas: limitadas a 1000 chamadas API por dia e 60 por minuto
- Contas Pro: limites mais elevados com base no nível de subscrição
- Se precisar de restringir as pesquisas apenas às análises da sua organização, tome nota do seguinte:
- Identificador do utilizador: o seu nome de utilizador ou email (para utilização com o filtro de pesquisa
user:) - Identificador da equipa: se usar a funcionalidade de equipas (para utilização com o filtro de pesquisa
team:)
- Identificador do utilizador: o seu nome de utilizador ou email (para utilização com o filtro de pesquisa
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor.
- Guarde o nome e a região do contentor para referência futura (por exemplo,
urlscan-logs-bucket). - Crie um utilizador seguindo este guia do utilizador: criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para referência futura.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente.
- Pesquise a política AmazonS3FullAccess.
- Selecione a política.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Na consola da AWS, aceda a IAM > Políticas.
- Clique em Criar política > separador JSON.
Introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::urlscan-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::urlscan-logs-bucket/urlscan/state.json" } ] }- Substitua
urlscan-logs-bucketse tiver introduzido um nome de contentor diferente.
- Substitua
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
urlscan-lambda-roleà função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome urlscan-collectorRuntime Python 3.13 Arquitetura x86_64 Função de execução urlscan-lambda-roleDepois de criar a função, abra o separador Código, elimine o fragmento de código e introduza o seguinte código (
urlscan-collector.py):import json import os import boto3 from datetime import datetime, timedelta import urllib3 import base64 s3 = boto3.client('s3') http = urllib3.PoolManager() def lambda_handler(event, context): # Environment variables bucket = os.environ['S3_BUCKET'] prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_key = os.environ['API_KEY'] api_base = os.environ['API_BASE'] search_query = os.environ.get('SEARCH_QUERY', 'date:>now-1h') page_size = int(os.environ.get('PAGE_SIZE', '100')) max_pages = int(os.environ.get('MAX_PAGES', '10')) # Load state state = load_state(bucket, state_key) last_run = state.get('last_run') # Prepare search query if last_run: # Adjust search query based on last run search_time = datetime.fromisoformat(last_run) time_diff = datetime.utcnow() - search_time hours = int(time_diff.total_seconds() / 3600) + 1 search_query = f'date:>now-{hours}h' # Search for scans headers = {'API-Key': api_key} all_results = [] for page in range(max_pages): search_url = f"{api_base}/search/" params = { 'q': search_query, 'size': page_size, 'offset': page * page_size } # Make search request response = http.request( 'GET', search_url, fields=params, headers=headers ) if response.status != 200: print(f"Search failed: {response.status}") break search_data = json.loads(response.data.decode('utf-8')) results = search_data.get('results', []) if not results: break # Fetch full result for each scan for result in results: uuid = result.get('task', {}).get('uuid') if uuid: result_url = f"{api_base}/result/{uuid}/" result_response = http.request( 'GET', result_url, headers=headers ) if result_response.status == 200: full_result = json.loads(result_response.data.decode('utf-8')) all_results.append(full_result) else: print(f"Failed to fetch result for {uuid}: {result_response.status}") # Check if we have more pages if len(results) < page_size: break # Write results to S3 if all_results: now = datetime.utcnow() file_key = f"{prefix}year={now.year}/month={now.month:02d}/day={now.day:02d}/hour={now.hour:02d}/urlscan_{now.strftime('%Y%m%d_%H%M%S')}.json" # Create NDJSON content ndjson_content = '\n'.join([json.dumps(r, separators=(',', ':')) for r in all_results]) # Upload to S3 s3.put_object( Bucket=bucket, Key=file_key, Body=ndjson_content.encode('utf-8'), ContentType='application/x-ndjson' ) print(f"Uploaded {len(all_results)} results to s3://{bucket}/{file_key}") # Update state state['last_run'] = datetime.utcnow().isoformat() save_state(bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps({ 'message': f'Processed {len(all_results)} scan results', 'location': f"s3://{bucket}/{prefix}" }) } def load_state(bucket, key): try: response = s3.get_object(Bucket=bucket, Key=key) return json.loads(response['Body'].read()) except s3.exceptions.NoSuchKey: return {} except Exception as e: print(f"Error loading state: {e}") return {} def save_state(bucket, key, state): try: s3.put_object( Bucket=bucket, Key=key, Body=json.dumps(state), ContentType='application/json' ) except Exception as e: print(f"Error saving state: {e}")Aceda a Configuração > Variáveis de ambiente.
Clique em Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:
Chave Exemplo de valor S3_BUCKETurlscan-logs-bucketS3_PREFIXurlscan/STATE_KEYurlscan/state.jsonAPI_KEY<your-api-key>API_BASEhttps://urlscan.io/api/v1SEARCH_QUERYdate:>now-1hPAGE_SIZE100MAX_PAGES10Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Forneça os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour). - Destino: a sua função Lambda
urlscan-collector. - Nome:
urlscan-collector-1h.
- Programação recorrente: Taxa (
- Clique em Criar horário.
Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Aceda a AWS Console > IAM > Users.
- Clique em Adicionar utilizadores.
- Forneça os seguintes detalhes de configuração:
- Utilizador: introduza
secops-reader. - Tipo de acesso: selecione Chave de acesso – Acesso programático.
- Utilizador: introduza
- Clique em Criar utilizador.
- Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::urlscan-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::urlscan-logs-bucket" } ] }Defina o nome como
secops-reader-policy.Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.
Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos do URLScan IO
- Aceda a Definições do SIEM > Feeds.
- Clique em Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
URLScan IO logs). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione URLScan IO como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://urlscan-logs-bucket/urlscan/ - Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.