Coletar registros do Akamai Cloud Monitor
Este documento explica como ingerir registros do Akamai Cloud Monitor (balanceador de carga, modelador de tráfego, ADC) no Google Security Operations usando o Google Cloud Storage. A Akamai envia eventos JSON para seu endpoint HTTPS. Um receptor de gateway de API + função do Cloud grava os eventos no GCS (JSONL, gz). O analisador transforma os registros JSON em UDM. Ele extrai campos do payload JSON, realiza conversões de tipo de dados, renomeia campos para corresponder ao esquema da UDM e processa lógica específica para campos personalizados e construção de URL. Ele também incorpora tratamento de erros e lógica condicional com base na presença de campos.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Uma instância do Google SecOps
- Um projeto do GCP com a API Cloud Storage ativada
- Permissões para criar e gerenciar buckets do GCS
- Permissões para gerenciar políticas do IAM em buckets do GCS
- Permissões para criar o Cloud Functions, tópicos do Pub/Sub e o gateway de API
- Acesso privilegiado ao Akamai Control Center e ao Property Manager
Criar um bucket do Google Cloud Storage
- Acesse o Console do Google Cloud.
- Selecione seu projeto ou crie um novo.
- No menu de navegação, acesse Cloud Storage > Buckets.
- Clique em Criar bucket.
Informe os seguintes detalhes de configuração:
Configuração Valor Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, akamai-cloud-monitor.Tipo de local Escolha com base nas suas necessidades (região, birregional, multirregional) Local Selecione o local (por exemplo, us-central1).Classe de armazenamento Padrão (recomendado para registros acessados com frequência) Controle de acesso Uniforme (recomendado) Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção Clique em Criar.
Coletar detalhes da configuração do Akamai Cloud Monitor
Você vai precisar das seguintes informações da Central de controle da Akamai:
- Nome da propriedade no Gerenciador de propriedades
- Conjuntos de dados obrigatórios do Cloud Monitoring para coleta
- Token secret compartilhado opcional para autenticação de webhook
Criar uma conta de serviço para o Cloud Functions
A função do Cloud precisa de uma conta de serviço com permissões para gravar no bucket do GCS.
Criar conta de serviço
- No Console do GCP, acesse IAM e administrador > Contas de serviço.
- Clique em Criar conta de serviço.
- Informe os seguintes detalhes de configuração:
- Nome da conta de serviço: insira
akamai-cloud-monitor-sa. - Descrição da conta de serviço: insira
Service account for Cloud Function to collect Akamai Cloud Monitor logs.
- Nome da conta de serviço: insira
- Clique em Criar e continuar.
- Na seção Conceda a essa conta de serviço acesso ao projeto:
- Clique em Selecionar papel.
- Pesquise e selecione Administrador de objetos do Storage.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Run.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Functions.
- Clique em Continuar.
- Clique em Concluído.
Esses papéis são necessários para:
- Administrador de objetos do Storage: grava registros em um bucket do GCS e gerencia arquivos de estado.
- Invocador do Cloud Run: permite que o Pub/Sub invoque a função
- Invocador do Cloud Functions: permite a invocação de funções
Conceder permissões do IAM no bucket do GCS
Conceda permissões de gravação à conta de serviço no bucket do GCS:
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar principais: insira o e-mail da conta de serviço (por exemplo,
akamai-cloud-monitor-sa@PROJECT_ID.iam.gserviceaccount.com). - Atribuir papéis: selecione Administrador de objetos do Storage.
- Adicionar principais: insira o e-mail da conta de serviço (por exemplo,
- Clique em Salvar.
Criar uma função do Cloud para receber registros do Akamai
A função do Cloud recebe solicitações HTTP POST do Akamai Cloud Monitor e grava registros no GCS.
- No Console do GCP, acesse Cloud Functions.
- Clique em Criar função.
Informe os seguintes detalhes de configuração:
Configuração Valor Ambiente Selecione 2ª geração. Nome da função akamai-cloud-monitor-receiverRegião Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).Na seção Acionador:
- Tipo de gatilho: selecione HTTPS.
- Autenticação: selecione Permitir invocações não autenticadas. O Akamai vai enviar solicitações não autenticadas.
Clique em Salvar para armazenar a configuração do acionador.
Abra as Configurações de ambiente de execução, build, conexões e segurança.
Na seção Ambiente de execução:
- Memória alocada: selecione 512 MiB.
- Tempo limite: insira
600segundos (10 minutos). - Conta de serviço do ambiente de execução: selecione a conta de serviço (
akamai-cloud-monitor-sa).
Na seção Variáveis de ambiente de execução, clique em + Adicionar variável para cada uma:
Nome da variável Valor de exemplo GCS_BUCKETakamai-cloud-monitorGCS_PREFIXakamai/cloud-monitor/jsonINGEST_TOKENrandom-shared-secret(opcional)Clique em Próxima para acessar o editor de código.
No menu suspenso Ambiente de execução, selecione Python 3.12.
Adicionar código da função
- Insira main em Ponto de entrada da função.
No editor de código em linha, crie dois arquivos:
- Primeiro arquivo: main.py::
import os import json import gzip import io import uuid import datetime as dt from google.cloud import storage import functions_framework GCS_BUCKET = os.environ.get("GCS_BUCKET") GCS_PREFIX = os.environ.get("GCS_PREFIX", "akamai/cloud-monitor/json").strip("/") + "/" INGEST_TOKEN = os.environ.get("INGEST_TOKEN") # optional shared secret storage_client = storage.Client() def _write_jsonl_gz(objs: list) -> str: """Write JSON objects to GCS as gzipped JSONL.""" timestamp = dt.datetime.utcnow() key = f"{timestamp:%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for o in objs: gz.write((json.dumps(o, separators=(",", ":")) + "\n").encode()) buf.seek(0) bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}{key}") blob.upload_from_file(buf, content_type="application/json", content_encoding="gzip") return f"gs://{GCS_BUCKET}/{GCS_PREFIX}{key}" def _parse_records_from_request(request) -> list: """Parse JSON records from HTTP request body.""" body = request.get_data(as_text=True) if not body: return [] try: data = json.loads(body) except Exception: # Accept line-delimited JSON as pass-through try: return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] @functions_framework.http def main(request): """ Cloud Function HTTP handler for Akamai Cloud Monitor logs. Args: request: Flask request object Returns: Tuple of (response_body, status_code, headers) """ # Optional shared-secret verification via query parameter (?token=...) if INGEST_TOKEN: token = request.args.get("token") if token != INGEST_TOKEN: return ("Forbidden", 403) records = _parse_records_from_request(request) if not records: return ("No content", 204) try: gcs_key = _write_jsonl_gz(records) response = { "ok": True, "gcs_key": gcs_key, "count": len(records) } return (json.dumps(response), 200, {"Content-Type": "application/json"}) except Exception as e: print(f"Error writing to GCS: {str(e)}") return (f"Internal server error: {str(e)}", 500)- Segundo arquivo: requirements.txt:
functions-framework==3.* google-cloud-storage==2.*Clique em implantar para implantar a função.
Aguarde a conclusão da implantação (2 a 3 minutos).
Após a implantação, acesse a guia Acionador e copie o URL do acionador. Você vai usar esse URL na configuração da Akamai.
Configurar o Akamai Cloud Monitor para enviar registros
- Faça login na Akamai Control Center.
- Abra sua propriedade no Gerenciador de propriedades.
- Clique em Adicionar regra > escolha Gerenciamento na nuvem.
- Adicione Instrumentação do Cloud Monitoring e selecione os Conjuntos de dados necessários.
- Adicione Entrega de dados do Cloud Monitor.
Informe os seguintes detalhes de configuração:
- Nome do host de entrega: insira o nome do host do URL do gatilho da função do Cloud (por exemplo,
us-central1-your-project.cloudfunctions.net). Caminho do URL de entrega: insira o caminho do URL do gatilho da função do Cloud mais o token de consulta opcional:
- Sem token:
/akamai-cloud-monitor-receiver Com token:
/akamai-cloud-monitor-receiver?token=<INGEST_TOKEN>Substitua
<INGEST_TOKEN>pelo valor definido nas variáveis de ambiente do Cloud Functions.
- Sem token:
- Nome do host de entrega: insira o nome do host do URL do gatilho da função do Cloud (por exemplo,
Clique em Salvar.
Clique em Ativar para ativar a versão da propriedade.
Recuperar a conta de serviço do Google SecOps
O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.
Receber o e-mail da conta de serviço
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Akamai Cloud Monitor - GCS). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Akamai Cloud Monitor como o Tipo de registro.
Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço é exibido, por exemplo:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopie esse endereço de e-mail para usar na próxima etapa.
Conceder permissões do IAM à conta de serviço do Google SecOps
A conta de serviço do Google SecOps precisa do papel de Leitor de objetos do Storage no seu bucket do GCS.
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket.
- Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar participantes: cole o e-mail da conta de serviço do Google SecOps.
- Atribuir papéis: selecione Leitor de objetos do Storage.
Clique em Salvar.
Configurar um feed no Google SecOps para ingerir registros do Akamai Cloud Monitor
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Akamai Cloud Monitor - GCS). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione Akamai Cloud Monitor como o Tipo de registro.
- Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:
gs://akamai-cloud-monitor/akamai/cloud-monitor/json/Substitua:
akamai-cloud-monitor: o nome do bucket do GCS.akamai/cloud-monitor/json: caminho do prefixo em que os registros são armazenados. Precisa corresponder aGCS_PREFIXna função do Cloud.
Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:
- Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
- Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.
Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
Namespace do recurso:
akamai.cloud_monitorRótulos de ingestão: são adicionados a todos os eventos desse feed (por exemplo,
source=akamai_cloud_monitor,format=json).
Clique em Próxima.
Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Registros de amostra do Akamai Cloud Monitor compatíveis
JSON:
{ "UA": "-", "accLang": "-", "bytes": "3929", "cacheStatus": "1", "cliIP": "0.0.0.0", "cookie": "-", "cp": "848064", "customField": "-", "dnsLookupTimeMSec": "-", "errorCode": "-", "maxAgeSec": "31536000", "objSize": "3929", "overheadBytes": "240", "proto": "HTTPS", "queryStr": "-", "range": "-", "referer": "-", "reqEndTimeMSec": "4", "reqHost": "www.example.com", "reqId": "1ce83c03", "reqMethod": "GET", "reqPath": "assets/images/placeholder-tagline.png", "reqPort": "443", "reqTimeSec": "1622470405.760", "rspContentLen": "3929", "rspContentType": "image/png", "statusCode": "200", "tlsOverheadTimeMSec": "0", "tlsVersion": "TLSv1.2", "totalBytes": "4599", "transferTimeMSec": "0", "turnAroundTimeMSec": "0", "uncompressedSize": "-", "version": "1", "xForwardedFor": "-" }
Tabela de mapeamento do UDM
| Campo de registro | Mapeamento do UDM | Lógica |
|---|---|---|
| accLang | network.http.user_agent | Mapeado diretamente se não for "-" ou uma string vazia. |
| cidade | principal.location.city | Mapeado diretamente se não for "-" ou uma string vazia. |
| cliIP | principal.ip | Mapeado diretamente se não for uma string vazia. |
| país | principal.location.country_or_region | Mapeado diretamente se não for "-" ou uma string vazia. |
| cp | additional.fields | Mapeado como um par de chave-valor com a chave "cp". |
| customField | about.ip, about.labels, src.ip | Analisados como pares de chave-valor. Tratamento especial para "eIp" e "pIp" para mapear para src.ip e about.ip, respectivamente. Outras chaves são mapeadas como rótulos em "Sobre". |
| errorCode | security_result.summary, security_result.severity | Se presente, define security_result.severity como "ERROR" e mapeia o valor para security_result.summary. |
| geo.city | principal.location.city | Mapeado diretamente se a cidade for "-" ou uma string vazia. |
| geo.country | principal.location.country_or_region | Mapeado diretamente se o país for "-" ou uma string vazia. |
| geo.lat | principal.location.region_latitude | Mapeado diretamente, convertido para ponto flutuante. |
| geo.long | principal.location.region_longitude | Mapeado diretamente, convertido para ponto flutuante. |
| geo.region | principal.location.state | Mapeado diretamente. |
| ID | metadata.product_log_id | Mapeado diretamente se não for uma string vazia. |
| message.cliIP | principal.ip | Mapeado diretamente se cliIP for uma string vazia. |
| message.fwdHost | principal.hostname | Mapeado diretamente. |
| message.reqHost | target.hostname, target.url | Usado para construir target.url e extrair target.hostname. |
| message.reqLen | network.sent_bytes | Mapeado diretamente, convertido em número inteiro sem sinal se totalBytes estiver vazio ou for "-". |
| message.reqMethod | network.http.method | Mapeado diretamente se reqMethod for uma string vazia. |
| message.reqPath | target.url | Anexado a target.url. |
| message.reqPort | target.port | Mapeado diretamente, convertido em número inteiro se reqPort for uma string vazia. |
| message.respLen | network.received_bytes | Mapeado diretamente, convertido para número inteiro sem sinal. |
| message.sslVer | network.tls.version | Mapeado diretamente. |
| message.status | network.http.response_code | Mapeado diretamente, convertido em número inteiro se statusCode estiver vazio ou for "-". |
| message.UA | network.http.user_agent | Mapeado diretamente se o UA for "-" ou uma string vazia. |
| network.asnum | additional.fields | Mapeado como um par de chave-valor com a chave "asnum". |
| network.edgeIP | intermediary.ip | Mapeado diretamente. |
| network.network | additional.fields | Mapeado como um par de chave-valor com a chave "network". |
| network.networkType | additional.fields | Mapeado como um par de chave-valor com a chave "networkType". |
| proto | network.application_protocol | Usado para determinar network.application_protocol. |
| queryStr | target.url | Adicionado a target.url se não for "-" ou uma string vazia. |
| referenciador | network.http.referral_url, about.hostname | Mapeado diretamente se não for "-". O nome do host extraído é mapeado para "about.hostname". |
| reqHost | target.hostname, target.url | Usado para construir target.url e extrair target.hostname. |
| reqId | metadata.product_log_id, network.session_id | Mapeado diretamente se o ID for uma string vazia. Também mapeado para network.session_id. |
| reqMethod | network.http.method | Mapeado diretamente se não for uma string vazia. |
| reqPath | target.url | Adicionado a target.url se não for "-". |
| reqPort | target.port | Mapeado diretamente, convertido em número inteiro. |
| reqTimeSec | metadata.event_timestamp, timestamp | Usado para definir o carimbo de data/hora do evento. |
| start | metadata.event_timestamp, timestamp | Usado para definir o carimbo de data/hora do evento se reqTimeSec for uma string vazia. |
| statusCode | network.http.response_code | Mapeado diretamente, convertido em número inteiro se não for "-" ou uma string vazia. |
| tlsVersion | network.tls.version | Mapeado diretamente. |
| totalBytes | network.sent_bytes | Mapeado diretamente, convertido em número inteiro sem sinal se não estiver vazio ou "-". |
| tipo | metadata.product_event_type | Mapeado diretamente. |
| UA | network.http.user_agent | Mapeado diretamente se não for "-" ou uma string vazia. |
| version | metadata.product_version | Mapeado diretamente. |
| xForwardedFor | principal.ip | Mapeado diretamente se não for "-" ou uma string vazia. |
| (Lógica do analisador) | metadata.vendor_name | Defina como "Akamai". |
| (Lógica do analisador) | metadata.product_name | Defina como "Cloud Monitor". |
| (Lógica do analisador) | metadata.event_type | Defina como "NETWORK_HTTP". |
| (Lógica do analisador) | metadata.product_version | Definido como "2" se a versão for uma string vazia. |
| (Lógica do analisador) | metadata.log_type | Defina como "AKAMAI_CLOUD_MONITOR". |
| (Lógica do analisador) | network.application_protocol | Determinado por proto ou message.proto. Defina como "HTTPS" se um dos dois contiver "HTTPS" (sem diferenciação de maiúsculas e minúsculas) ou "HTTP" caso contrário. |
| (Lógica do analisador) | security_result.severity | Definido como "INFORMATIONAL" se errorCode for "-" ou uma string vazia. |
| (Lógica do analisador) | target.url | Construído com base em protocolo, reqHost (ou message.reqHost), reqPath (ou message.reqPath) e queryStr. |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.