Raccogliere i log di Proofpoint Secure Email Relay
Questo documento spiega come importare i log di Proofpoint Secure Email Relay in Google Security Operations utilizzando Google Cloud Storage V2.
Proofpoint Secure Email Relay (SER) è un servizio di sicurezza email in uscita che fornisce crittografia, prevenzione della perdita di dati (DLP) e applicazione della conformità per i messaggi inviati dalla tua organizzazione. Il parser estrae i campi dai dati di monitoraggio dei messaggi SER e li mappa al modello UDM (Unified Data Model), acquisendo i metadati delle email, le metriche di throughput, lo stato di consegna e l'attività utente.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Un'istanza Google SecOps
- Un progetto GCP con l'API Storage Cloud abilitata
- Autorizzazioni per creare e gestire bucket GCS
- Autorizzazioni per gestire le policy IAM nei bucket GCS
- Autorizzazioni per creare servizi Cloud Run, argomenti Pub/Sub e job Cloud Scheduler
- Accesso privilegiato a Proofpoint Secure Email Relay con accesso tramite chiave API
Crea un bucket Cloud Storage
- Vai alla console Google Cloud.
- Seleziona il tuo progetto o creane uno nuovo.
- Nel menu di navigazione, vai a Cloud Storage > Bucket.
- Fai clic su Crea bucket.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio proofpoint-ser-logs).Tipo di località Scegli in base alle tue esigenze (regione singola, a due regioni, multiregionale) Località Seleziona la posizione (ad esempio, us-central1).Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente) Controllo dell'accesso Uniforme (consigliato) Strumenti di protezione (Facoltativo) Attivare il controllo delle versioni degli oggetti o la policy di conservazione Fai clic su Crea.
Raccogli le credenziali API Proofpoint SER
Ottenere le credenziali API
- Accedi al portale di amministrazione di Proofpoint Secure Email Relay con le credenziali di amministratore.
- Vai a Impostazioni > Chiavi API.
- Fai clic su Genera o Crea chiave API.
Copia e memorizza in modo sicuro le seguenti credenziali:
- Chiave API: copia questo valore
- API Secret: copia questo valore
Verifica le autorizzazioni
Per verificare che le credenziali API dispongano delle autorizzazioni richieste:
- Accedi al portale di amministrazione Proofpoint SER.
- Vai a Impostazioni > Chiavi API.
- Verifica che la chiave API sia elencata e abbia lo stato Attiva.
- Verifica che la chiave abbia accesso agli endpoint di monitoraggio e generazione di report dei messaggi.
Testare l'accesso API
Verifica le tue credenziali prima di procedere con l'integrazione:
# Replace with your actual credentials API_KEY="<your-api-key>" API_SECRET="<your-api-secret>" # Test API access - retrieve recent messages curl -v -u "${API_KEY}:${API_SECRET}" \ "https://ser-api.proofpoint.com/v1/messages?startDate=$(date -u -v-1H +%Y-%m-%dT%H:%M:%SZ)&endDate=$(date -u +%Y-%m-%dT%H:%M:%SZ)"
Crea un account di servizio per la funzione Cloud Run
La funzione Cloud Run richiede un account di servizio con autorizzazioni di scrittura nel bucket GCS e di invocazione da parte di Pub/Sub.
Crea service account
- Nella console Google Cloud, vai a IAM e amministrazione > Service account.
- Fai clic su Crea account di servizio.
- Fornisci i seguenti dettagli di configurazione:
- Nome del service account: inserisci
proofpoint-ser-collector-sa - Descrizione service account: inserisci
Service account for Cloud Run function to collect Proofpoint Secure Email Relay logs
- Nome del service account: inserisci
- Fai clic su Crea e continua.
- Nella sezione Concedi a questo account di servizio l'accesso al progetto, aggiungi i seguenti ruoli:
- Fai clic su Seleziona un ruolo.
- Cerca e seleziona Amministratore oggetti di archiviazione.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Cloud Run Invoker.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Invoker di Cloud Functions.
- Fai clic su Continua.
- Fai clic su Fine.
Questi ruoli sono necessari per:
- Storage Object Admin: scrive i log nel bucket GCS e gestisce i file di stato
- Cloud Run Invoker: consente a Pub/Sub di richiamare la funzione
- Cloud Functions Invoker: consente la chiamata di funzioni
Concedi autorizzazioni IAM sul bucket GCS
Concedi al account di servizio le autorizzazioni write sul bucket GCS:
- Vai a Cloud Storage > Bucket.
- Fai clic sul nome del bucket (ad esempio
proofpoint-ser-logs). - Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
- Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: inserisci l'email del account di servizio (ad esempio,
proofpoint-ser-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Assegna i ruoli: seleziona Storage Object Admin.
- Aggiungi entità: inserisci l'email del account di servizio (ad esempio,
- Fai clic su Salva.
Crea argomento Pub/Sub
Crea un argomento Pub/Sub a cui Cloud Scheduler pubblicherà e a cui la funzione Cloud Run si iscriverà.
- Nella console GCP, vai a Pub/Sub > Argomenti.
- Fai clic su Crea argomento.
- Fornisci i seguenti dettagli di configurazione:
- ID argomento: inserisci
proofpoint-ser-trigger - Lascia invariate le altre impostazioni predefinite
- ID argomento: inserisci
- Fai clic su Crea.
Crea una funzione Cloud Run per raccogliere i log
La funzione Cloud Run verrà attivata dai messaggi Pub/Sub di Cloud Scheduler per recuperare i log dei messaggi dall'API Proofpoint SER e scriverli in GCS.
- Nella console GCP, vai a Cloud Run.
- Fai clic su Crea servizio.
- Seleziona Funzione (usa un editor in linea per creare una funzione).
Nella sezione Configura, fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome servizio proofpoint-ser-collectorRegione Seleziona la regione corrispondente al tuo bucket GCS (ad esempio, us-central1)Tempo di esecuzione Seleziona Python 3.12 o versioni successive Nella sezione Trigger (facoltativo):
- Fai clic su + Aggiungi trigger.
- Seleziona Cloud Pub/Sub.
- In Seleziona un argomento Cloud Pub/Sub, scegli l'argomento Pub/Sub (
proofpoint-ser-trigger). - Fai clic su Salva.
Nella sezione Autenticazione:
- Seleziona Richiedi autenticazione.
- Controlla Identity and Access Management (IAM).
Scorri verso il basso ed espandi Container, networking, sicurezza.
Vai alla scheda Sicurezza:
- Service account: seleziona il account di servizio (
proofpoint-ser-collector-sa).
- Service account: seleziona il account di servizio (
Vai alla scheda Container:
- Fai clic su Variabili e secret.
- Fai clic su + Aggiungi variabile per ogni variabile di ambiente:
Nome variabile Valore di esempio Descrizione GCS_BUCKETproofpoint-ser-logsNome del bucket GCS GCS_PREFIXser-logsPrefisso per i file di log STATE_KEYser-logs/state.jsonPercorso file di stato API_KEYyour-api-keyChiave API Proofpoint SER API_SECRETyour-api-secretAPI secret Proofpoint SER MAX_RECORDS1000Numero massimo di record per esecuzione PAGE_SIZE100Record per pagina LOOKBACK_HOURS1Periodo di riferimento iniziale Nella sezione Variabili e secret, scorri verso il basso fino a Richieste:
- Timeout richiesta: inserisci
600secondi (10 minuti)
- Timeout richiesta: inserisci
Vai alla scheda Impostazioni:
- Nella sezione Risorse:
- Memoria: seleziona 512 MiB o superiore
- CPU: seleziona 1
- Nella sezione Risorse:
Nella sezione Scalabilità della revisione:
- Numero minimo di istanze: inserisci
0 - Numero massimo di istanze: inserisci
100(o modifica in base al carico previsto)
- Numero minimo di istanze: inserisci
Fai clic su Crea.
Attendi la creazione del servizio (1-2 minuti).
Dopo aver creato il servizio, si aprirà automaticamente l'editor di codice incorporato.
Aggiungi codice per la funzione
- Inserisci main nel campo Entry point (Punto di ingresso).
Nell'editor di codice incorporato, crea due file:
Primo file:main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'ser-logs') STATE_KEY = os.environ.get('STATE_KEY', 'ser-logs/state.json') API_KEY = os.environ.get('API_KEY') API_SECRET = os.environ.get('API_SECRET') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '1')) API_BASE = "https://ser-api.proofpoint.com/v1" def parse_datetime(value: str) -> datetime: """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Proofpoint SER message logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, API_KEY, API_SECRET]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") # Build auth header (Basic auth with API key and secret) auth_string = f"{API_KEY}:{API_SECRET}" auth_bytes = auth_string.encode('utf-8') auth_b64 = base64.b64encode(auth_bytes).decode('utf-8') # Fetch messages records, newest_event_time = fetch_messages( auth_b64=auth_b64, start_time=last_time, end_time=now, page_size=PAGE_SIZE, max_records=MAX_RECORDS, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.isoformat()) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time_iso: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time_iso} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_messages(auth_b64: str, start_time: datetime, end_time: datetime, page_size: int, max_records: int): """ Fetch message logs from Proofpoint SER API with pagination. Args: auth_b64: Base64-encoded API key:secret for Basic auth start_time: Start time for log query end_time: End time for log query page_size: Number of records per page max_records: Maximum total records to fetch Returns: Tuple of (records list, newest_event_time ISO string) """ headers = { 'Authorization': f'Basic {auth_b64}', 'Accept': 'application/json', 'Content-Type': 'application/json', 'User-Agent': 'GoogleSecOps-ProofpointSERCollector/1.0', } records = [] newest_time = None page_num = 0 backoff = 1.0 offset = 0 start_date = start_time.strftime('%Y-%m-%dT%H:%M:%SZ') end_date = end_time.strftime('%Y-%m-%dT%H:%M:%SZ') while True: page_num += 1 if len(records) >= max_records: print(f"Reached max_records limit ({max_records})") break current_limit = min(page_size, max_records - len(records)) url = f"{API_BASE}/messages?startDate={start_date}&endDate={end_date}&offset={offset}&limit={current_limit}" try: response = http.request('GET', url, headers=headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) if isinstance(data, list): page_results = data else: page_results = data.get('messages', data.get('results', data.get('data', []))) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} messages") records.extend(page_results) # Track newest event time for event in page_results: try: event_time = event.get('date') or event.get('timestamp') or event.get('sentDate') if event_time: if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results if len(page_results) < current_limit: print(f"Reached last page (size={len(page_results)} < limit={current_limit})") break offset += len(page_results) except Exception as e: print(f"Error fetching messages: {e}") return [], None print(f"Retrieved {len(records)} total messages from {page_num} pages") return records, newest_timeSecondo file:requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0
Fai clic su Esegui il deployment per salvare la funzione ed eseguirne il deployment.
Attendi il completamento del deployment (2-3 minuti).
Crea job Cloud Scheduler
Cloud Scheduler pubblicherà messaggi nell'argomento Pub/Sub a intervalli regolari, attivando la funzione Cloud Run.
- Nella console di GCP, vai a Cloud Scheduler.
- Fai clic su Crea job.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Nome proofpoint-ser-collector-hourlyRegione Seleziona la stessa regione della funzione Cloud Run Frequenza 0 * * * *(ogni ora, all'ora)Fuso orario Seleziona il fuso orario (UTC consigliato) Tipo di target Pub/Sub Argomento Seleziona l'argomento Pub/Sub ( proofpoint-ser-trigger)Corpo del messaggio {}(oggetto JSON vuoto)Fai clic su Crea.
Opzioni di frequenza di pianificazione
Scegli la frequenza in base al volume dei log e ai requisiti di latenza:
| Frequenza | Espressione cron | Caso d'uso |
|---|---|---|
| Ogni 5 minuti | */5 * * * * |
Volume elevato, bassa latenza |
| Ogni 15 minuti | */15 * * * * |
Volume medio |
| Ogni ora | 0 * * * * |
Standard (consigliato) |
| Ogni 6 ore | 0 */6 * * * |
Volume basso, elaborazione batch |
| Ogni giorno | 0 0 * * * |
Raccolta dei dati storici |
Testare l'integrazione
- Nella console Cloud Scheduler, trova il job.
- Fai clic su Forza esecuzione per attivare il job manualmente.
- Attendi qualche secondo.
- Vai a Cloud Run > Servizi.
- Fai clic sul nome della funzione (
proofpoint-ser-collector). - Fai clic sulla scheda Log.
Verifica che la funzione sia stata eseguita correttamente. Cerca:
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X messages Wrote X records to gs://proofpoint-ser-logs/ser-logs/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsVai a Cloud Storage > Bucket.
Fai clic sul nome del bucket (
proofpoint-ser-logs).Vai alla cartella del prefisso (
ser-logs/).Verifica che sia stato creato un nuovo file
.ndjsoncon il timestamp corrente.
Se visualizzi errori nei log:
- HTTP 401: controlla la chiave API e l'API secret nelle variabili di ambiente
- HTTP 403: verifica che la chiave API abbia accesso all'endpoint di monitoraggio dei messaggi
- HTTP 429: limitazione di frequenza: la funzione riproverà automaticamente con backoff
- Variabili di ambiente mancanti: controlla che tutte le variabili richieste siano impostate
Recuperare il account di servizio Google SecOps
Google SecOps utilizza un account di servizio univoco per leggere i dati dal tuo bucket GCS. Devi concedere a questo account di servizio l'accesso al tuo bucket.
Recupera l'email del account di servizio
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Fai clic su Configura un singolo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Proofpoint SER Logs). - Seleziona Google Cloud Storage V2 come Tipo di origine.
- Seleziona ProofPoint Secure Email Relay come Tipo di log.
- Fai clic su Ottieni service account.
Verrà visualizzata un'email dell'account di servizio univoca, ad esempio:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopia questo indirizzo email per utilizzarlo nel passaggio successivo.
Fai clic su Avanti.
Specifica i valori per i seguenti parametri di input:
URL bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:
gs://proofpoint-ser-logs/ser-logs/- Sostituisci:
proofpoint-ser-logs: il nome del bucket GCS.ser-logs: (Facoltativo) prefisso/percorso della cartella in cui vengono archiviati i log (lascia vuoto per la radice).
- Sostituisci:
Opzione di eliminazione della fonte: seleziona l'opzione di eliminazione in base alle tue preferenze:
- Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
- Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.
Età massima del file: includi i file modificati nell'ultimo numero di giorni (il valore predefinito è 180 giorni)
Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset
Etichette di importazione: l'etichetta da applicare agli eventi di questo feed
Fai clic su Avanti.
Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Concedi le autorizzazioni IAM al account di servizio Google SecOps
3 Il account di servizio Google SecOps deve avere il ruolo Visualizzatore oggetti Storage nel bucket GCS.
- Vai a Cloud Storage > Bucket.
- Fai clic sul nome del bucket (ad esempio
proofpoint-ser-logs). - Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
- Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: incolla l'email del account di servizio Google SecOps
- Assegna i ruoli: seleziona Visualizzatore oggetti Storage.
Fai clic su Salva.
Tabella di mappatura UDM
| Campo log | Mappatura UDM | Logic |
|---|---|---|
| status, details, data.throughputLimit, data.throughput, data.totalThroughput, log_metadata.totalThroughput, data.averageDailyThroughput, data.throughputForecast, data.remainingThroughput, data.acceptedThroughput, data.licenseStartDate, data.licenseEndDate, data.average7DayThroughput, data.average30DayThroughput, data.requestedMessages, data.acceptedMessages, data.sentMessages, data.deliveredMessages, data.avgAcceptedMessageSize, data.blockedMessages, data.quarantinedMessages, data.rejectedMessages, data.requestedThroughput, data.totalMessages, data.undeliveredMessages | additional.fields | Etichette unite dalla mappa di stato come valori stringa, mappa nidificata dei dettagli come chiavi compresse con valori stringa e vari campi di dati come valori stringa o numerici |
| desc, data.name | metadata.description | Valore da desc se non è vuoto, altrimenti data.name |
| event_type | metadata.event_type | Imposta EMAIL_TRANSACTION se user_present è true, altrimenti GENERIC_EVENT |
| metadata.product_name | Imposta su "PROOFPOINT SER" | |
| metadata.vendor_name | Impostato su "PROOFPOINT" | |
| fromEnvelope | network.email.bounce_address | Valore da fromEnvelope se corrisponde al pattern email |
| fromHeader | network.email.from | Valore da fromHeader se corrisponde al pattern email |
| applicationName | principal.administrative_domain | Valore copiato direttamente |
| principal_host | principal.asset.hostname | Valore copiato direttamente |
| principal_host | principal.hostname | Valore copiato direttamente |
| principal_port | principal.port | Valore di principal_port convertito in numero intero |
| userId, data.relayUserId | principal.user.product_object_id | Valore di userId se non è vuoto, altrimenti data.relayUserId |
| applicationUserName | principal.user.user_display_name | Valore copiato direttamente |
| senderName | target.administrative_domain | Valore copiato direttamente |
| senderId | target.user.product_object_id | Valore copiato direttamente |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.