Raccogliere i log degli eventi di controllo di 1Password

Supportato in:

Questo documento spiega come importare i log degli eventi di controllo di 1Password in Google Security Operations utilizzando Google Cloud Storage V2.

1Password è una piattaforma di gestione delle password che aiuta i team ad archiviare, condividere e gestire in modo sicuro credenziali, secret e informazioni sensibili. L'API 1Password Events fornisce l'accesso ai dati degli eventi di controllo che acquisiscono le azioni amministrative e dei criteri eseguite nel tuo account 1Password Business.

Prima di iniziare

Assicurati di disporre dei seguenti prerequisiti:

  • Un'istanza Google SecOps
  • Un progetto GCP con l'API Cloud Storage abilitata
  • Autorizzazioni per creare e gestire bucket GCS
  • Autorizzazioni per gestire le policy IAM nei bucket GCS
  • Autorizzazioni per creare servizi Cloud Run, argomenti Pub/Sub e job Cloud Scheduler
  • Un account 1Password Business
  • Un ruolo di proprietario o amministratore nel tuo account 1Password

Creazione di un bucket Google Cloud Storage

  1. Vai alla console Google Cloud.
  2. Seleziona il tuo progetto o creane uno nuovo.
  3. Nel menu di navigazione, vai a Cloud Storage > Bucket.
  4. Fai clic su Crea bucket.
  5. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio onepassword-audit-secops-logs).
    Tipo di località Scegli in base alle tue esigenze (regione singola, doppia regione, più regioni)
    Località Seleziona la posizione (ad esempio, us-central1).
    Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente)
    Controllo dell'accesso Uniforme (consigliato)
    Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione
  6. Fai clic su Crea.

Raccogli le credenziali di 1Password

Configurare l'integrazione dei report sugli eventi

  1. Accedi al tuo account su 1Password.com.
  2. Seleziona Integrazioni nella barra laterale.
  3. Se hai già configurato altre integrazioni, seleziona Directory nella pagina Integrazioni.
  4. Nella sezione Reporting sugli eventi, seleziona Altro.
  5. Nel campo Nome, inserisci un nome per l'integrazione (ad esempio Google SecOps Audit Events).
  6. Fai clic su Aggiungi integrazione.

Creare un token di tipo Bearer

  1. Nella pagina dei dettagli dell'integrazione, fai clic su Aggiungi un token di autenticazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Nome token: inserisci un nome descrittivo (ad esempio, SecOps GCS Collector - Audit Events)
    • Scade dopo: seleziona il periodo di scadenza in base alle tue preferenze. Seleziona Mai per un token senza scadenza oppure scegli 30 giorni, 90 giorni o 180 giorni.
    • Eventi da segnalare: seleziona la seguente casella di controllo:
      • Eventi di controllo
  3. Fai clic su Issue Token (Emetti token).
  4. Nella pagina Salva il token, fai clic su Salva in 1Password o copia il token e salvalo in una posizione sicura.

  5. Fai clic su Visualizza dettagli integrazione per verificare che l'integrazione sia attiva.

Determinare l'URL di base dell'API Events

L'URL di base dipende dal server che ospita il tuo account 1Password:

Se il tuo account è ospitato su L'URL di base dell'API Events è
1password.com https://events.1password.com
ent.1password.com https://events.ent.1password.com
1password.ca https://events.1password.ca
1password.eu https://events.1password.eu

Testare l'accesso API

  • Verifica le tue credenziali prima di procedere con l'integrazione:

    # Replace with your actual bearer token and base URL
    BEARER_TOKEN="<your-bearer-token>"
    API_BASE="https://events.1password.com"
    
    # Test API access using the introspect endpoint
    curl -v \
        -H "Authorization: Bearer $BEARER_TOKEN" \
        "$API_BASE/api/v2/auth/introspect"
    

Una risposta riuscita restituisce un oggetto JSON con il campo features che elenca i tipi di eventi a cui può accedere il token (ad esempio, ["auditevents"]).

Crea un account di servizio per la funzione Cloud Run

La funzione Cloud Run richiede un account di servizio con autorizzazioni di scrittura nel bucket GCS e di invocazione da parte di Pub/Sub.

Crea service account

  1. Nella console Google Cloud, vai a IAM e amministrazione > Service Accounts.
  2. Fai clic su Crea account di servizio.
  3. Fornisci i seguenti dettagli di configurazione:
    • Nome del service account: inserisci onepassword-audit-collector-sa
    • Descrizione service account: inserisci Service account for Cloud Run function to collect 1Password Audit Events logs
  4. Fai clic su Crea e continua.
  5. Nella sezione Concedi a questo account di servizio l'accesso al progetto, aggiungi i seguenti ruoli:
    1. Fai clic su Seleziona un ruolo.
    2. Cerca e seleziona Amministratore oggetti di archiviazione.
    3. Fai clic su + Aggiungi un altro ruolo.
    4. Cerca e seleziona Cloud Run Invoker.
    5. Fai clic su + Aggiungi un altro ruolo.
    6. Cerca e seleziona Invoker di Cloud Functions.
  6. Fai clic su Continua.
  7. Fai clic su Fine.

Questi ruoli sono necessari per:

  • Amministratore oggetti Storage: scrive i log nel bucket GCS e gestisce i file di stato
  • Cloud Run Invoker: consente a Pub/Sub di richiamare la funzione
  • Cloud Functions Invoker: consente la chiamata di funzioni

Concedi autorizzazioni IAM sul bucket GCS

Concedi al account di servizio le autorizzazioni di scrittura sul bucket GCS:

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic sul tuo bucket (onepassword-audit-secops-logs).
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi dirigenti: inserisci onepassword-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com
    • Assegna i ruoli: seleziona Storage Object Admin.
  6. Fai clic su Salva.

Crea argomento Pub/Sub

Crea un argomento Pub/Sub a cui Cloud Scheduler pubblicherà e a cui la funzione Cloud Run si iscriverà.

  1. Nella console GCP, vai a Pub/Sub > Argomenti.
  2. Fai clic su Crea argomento.
  3. Fornisci i seguenti dettagli di configurazione:
    • ID argomento: inserisci onepassword-audit-trigger
    • Lascia le altre impostazioni predefinite
  4. Fai clic su Crea.

Crea una funzione Cloud Run per raccogliere i log

La funzione Cloud Run verrà attivata dai messaggi Pub/Sub di Cloud Scheduler per recuperare i log degli eventi di controllo dall'API 1Password Events e scriverli in GCS.

  1. Nella console GCP, vai a Cloud Run.
  2. Fai clic su Crea servizio.
  3. Seleziona Funzione (usa un editor in linea per creare una funzione).
  4. Nella sezione Configura, fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome servizio onepassword-audit-collector
    Regione Seleziona la regione corrispondente al tuo bucket GCS (ad esempio, us-central1)
    Tempo di esecuzione Seleziona Python 3.12 o versioni successive
  5. Nella sezione Trigger (facoltativo):

    1. Fai clic su + Aggiungi trigger.
    2. Seleziona Cloud Pub/Sub.
    3. In Seleziona un argomento Cloud Pub/Sub, scegli onepassword-audit-trigger.
    4. Fai clic su Salva.
  6. Nella sezione Autenticazione:

    1. Seleziona Richiedi autenticazione.
    2. Controlla Identity and Access Management (IAM).
  7. Scorri fino a Container, networking, sicurezza ed espandi la sezione.

  8. Vai alla scheda Sicurezza:

    • Service account (Account di servizio): seleziona onepassword-audit-collector-sa
  9. Vai alla scheda Container:

    1. Fai clic su Variabili e secret.
    2. Fai clic su + Aggiungi variabile per ogni variabile di ambiente:

      Nome variabile Valore di esempio Descrizione
      GCS_BUCKET onepassword-audit-secops-logs Nome bucket GCS
      GCS_PREFIX onepassword-audit Prefisso per i file di log
      STATE_KEY onepassword-audit/state.json Percorso file di stato
      OP_API_BASE https://events.1password.com URL di base dell'API 1Password Events
      OP_BEARER_TOKEN <your-bearer-token> Token di connessione API Events di 1Password
      MAX_RECORDS 10000 Numero massimo di record per esecuzione
      PAGE_SIZE 1000 Record per pagina (max 1000)
      LOOKBACK_HOURS 24 Periodo iniziale di ricerca a ritroso in ore
  10. Nella sezione Variabili e secret, scorri fino a Richieste:

    • Timeout richiesta: inserisci 600 secondi (10 minuti)
  11. Vai alla scheda Impostazioni:

    • Nella sezione Risorse:
      • Memoria: seleziona 512 MiB o un valore superiore
      • CPU: seleziona 1
  12. Nella sezione Scalabilità della revisione:

    • Numero minimo di istanze: inserisci 0
    • Numero massimo di istanze: inserisci 100 (o modifica in base al carico previsto)
  13. Fai clic su Crea.

  14. Attendi la creazione del servizio (1-2 minuti).

  15. Dopo aver creato il servizio, si aprirà automaticamente l'editor di codice incorporato.

Aggiungi codice per la funzione

  1. Inserisci main nel campo Entry point (Punto di ingresso).
  2. Nell'editor di codice incorporato, crea due file:

    • Primo file: main.py:

      import functions_framework
      from google.cloud import storage
      import json
      import os
      import urllib3
      from datetime import datetime, timezone, timedelta
      import time
      
      # Initialize HTTP client with timeouts
      http = urllib3.PoolManager(
          timeout=urllib3.Timeout(connect=5.0, read=30.0),
          retries=False,
      )
      
      # Initialize Storage client
      storage_client = storage.Client()
      
      # Environment variables
      GCS_BUCKET = os.environ.get('GCS_BUCKET')
      GCS_PREFIX = os.environ.get('GCS_PREFIX', 'onepassword-audit')
      STATE_KEY = os.environ.get('STATE_KEY', 'onepassword-audit/state.json')
      API_BASE = os.environ.get('OP_API_BASE')
      BEARER_TOKEN = os.environ.get('OP_BEARER_TOKEN')
      MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '10000'))
      PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
      LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
      
      # 1Password Events API v2 audit events endpoint
      AUDIT_EVENTS_PATH = '/api/v2/auditevents'
      
      def parse_datetime(value: str) -> datetime:
          """Parse RFC 3339 datetime string to datetime object."""
          if value.endswith('Z'):
              value = value[:-1] + '+00:00'
          return datetime.fromisoformat(value)
      
      @functions_framework.cloud_event
      def main(cloud_event):
          """
          Cloud Run function triggered by Pub/Sub to fetch 1Password
          audit event logs and write them to GCS as NDJSON.
          """
          if not all([GCS_BUCKET, API_BASE, BEARER_TOKEN]):
              print('Error: Missing required environment variables '
                  '(GCS_BUCKET, OP_API_BASE, OP_BEARER_TOKEN)')
              return
      
          try:
              bucket = storage_client.bucket(GCS_BUCKET)
      
              # Load state (stores cursor)
              state = load_state(bucket, STATE_KEY)
      
              now = datetime.now(timezone.utc)
      
              print('--- Processing endpoint: auditevents ---')
      
              saved_cursor = state.get('cursor_auditevents')
      
              records, last_cursor = fetch_endpoint(
                  api_base=API_BASE,
                  path=AUDIT_EVENTS_PATH,
                  bearer_token=BEARER_TOKEN,
                  saved_cursor=saved_cursor,
                  lookback_hours=LOOKBACK_HOURS,
                  page_size=PAGE_SIZE,
                  max_records=MAX_RECORDS,
              )
      
              if records:
                  timestamp = now.strftime('%Y%m%d_%H%M%S')
                  object_key = (f'{GCS_PREFIX}/'
                              f'auditevents_{timestamp}.ndjson')
                  blob = bucket.blob(object_key)
      
                  ndjson = '\n'.join(
                      json.dumps(r, ensure_ascii=False) for r in records
                  ) + '\n'
                  blob.upload_from_string(
                      ndjson, content_type='application/x-ndjson'
                  )
      
                  print(f'Wrote {len(records)} auditevents records '
                      f'to gs://{GCS_BUCKET}/{object_key}')
      
              # Save cursor even if no records (for next poll)
              if last_cursor:
                  state['cursor_auditevents'] = last_cursor
      
              # Save state
              state['last_run'] = now.isoformat()
              save_state(bucket, STATE_KEY, state)
      
              print(f'Successfully processed {len(records)} records')
      
          except Exception as e:
              print(f'Error processing logs: {str(e)}')
              raise
      
      def load_state(bucket, key):
          """Load state from GCS."""
          try:
              blob = bucket.blob(key)
              if blob.exists():
                  state_data = blob.download_as_text()
                  return json.loads(state_data)
          except Exception as e:
              print(f'Warning: Could not load state: {e}')
          return {}
      
      def save_state(bucket, key, state: dict):
          """Save state to GCS."""
          try:
              blob = bucket.blob(key)
              blob.upload_from_string(
                  json.dumps(state, indent=2),
                  content_type='application/json',
              )
              print(f'Saved state: {json.dumps(state)}')
          except Exception as e:
              print(f'Warning: Could not save state: {e}')
      
      def fetch_endpoint(api_base, path, bearer_token, saved_cursor,
                      lookback_hours, page_size, max_records):
          """
          Fetch events from the 1Password Events API v2 audit events
          endpoint.
      
          The 1Password Events API uses cursor-based pagination with POST
          requests. The first request sends a ResetCursor object with
          optional start_time and limit. Subsequent requests send the
          cursor returned from the previous response.
      
          Args:
              api_base: Events API base URL
              path: Endpoint path
              bearer_token: JWT bearer token
              saved_cursor: Cursor from previous run (or None)
              lookback_hours: Hours to look back on first run
              page_size: Max events per page (1-1000)
              max_records: Max total events per run
      
          Returns:
              Tuple of (records list, last_cursor string)
          """
          url = f'{api_base.rstrip("/")}{path}'
      
          headers = {
              'Authorization': f'Bearer {bearer_token}',
              'Content-Type': 'application/json',
              'Accept': 'application/json',
              'User-Agent': 'GoogleSecOps-1PasswordAuditCollector/1.0',
          }
      
          records = []
          cursor = saved_cursor
          page_num = 0
          backoff = 1.0
      
          while True:
              page_num += 1
      
              if len(records) >= max_records:
                  print(f'Reached max_records limit ({max_records})')
                  break
      
              # Build request body
              if cursor:
                  # Continuing cursor: resume from last position
                  body = json.dumps({'cursor': cursor})
              else:
                  # ResetCursor: first request or no saved state
                  start_time = (
                      datetime.now(timezone.utc)
                      - timedelta(hours=lookback_hours)
                  )
                  body = json.dumps({
                      'limit': page_size,
                      'start_time': start_time.strftime(
                          '%Y-%m-%dT%H:%M:%SZ'
                      ),
                  })
      
              try:
                  response = http.request(
                      'POST', url, body=body, headers=headers,
                  )
      
                  # Handle rate limiting (600 req/min, 30000 req/hour)
                  if response.status == 429:
                      retry_after = int(
                          response.headers.get(
                              'Retry-After', str(int(backoff))
                          )
                      )
                      print(f'Rate limited (429). Retrying after '
                          f'{retry_after}s...')
                      time.sleep(retry_after)
                      backoff = min(backoff * 2, 60.0)
                      continue
      
                  backoff = 1.0
      
                  if response.status != 200:
                      response_text = response.data.decode('utf-8')
                      print(f'HTTP Error {response.status}: '
                          f'{response_text}')
                      break
      
                  data = json.loads(response.data.decode('utf-8'))
      
                  page_items = data.get('items', [])
                  cursor = data.get('cursor')
                  has_more = data.get('has_more', False)
      
                  if page_items:
                      print(f'Page {page_num}: Retrieved '
                          f'{len(page_items)} events')
                      records.extend(page_items)
      
                  if not has_more:
                      print(f'No more pages (has_more=false)')
                      break
      
                  if not cursor:
                      print(f'No cursor returned, stopping')
                      break
      
              except urllib3.exceptions.HTTPError as e:
                  print(f'HTTP error: {str(e)}')
                  break
              except Exception as e:
                  print(f'Error fetching events: {str(e)}')
                  break
      
          print(f'Retrieved {len(records)} total records '
              f'from {page_num} pages')
          return records, cursor
      
    • Secondo file: requirements.txt::

      functions-framework==3.*
      google-cloud-storage==2.*
      urllib3>=2.0.0
      
  3. Fai clic su Esegui il deployment per salvare la funzione ed eseguirne il deployment.

  4. Attendi il completamento del deployment (2-3 minuti).

Crea job Cloud Scheduler

Cloud Scheduler pubblicherà messaggi nell'argomento Pub/Sub a intervalli regolari, attivando la funzione Cloud Run.

  1. Nella console di GCP, vai a Cloud Scheduler.
  2. Fai clic su Crea job.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome onepassword-audit-collector-hourly
    Regione Seleziona la stessa regione della funzione Cloud Run
    Frequenza 0 * * * * (ogni ora, all'ora)
    Fuso orario Seleziona il fuso orario (UTC consigliato)
    Tipo di target Pub/Sub
    Argomento Seleziona onepassword-audit-trigger
    Corpo del messaggio {} (oggetto JSON vuoto)
  4. Fai clic su Crea.

Opzioni di frequenza di pianificazione

Scegli la frequenza in base al volume dei log e ai requisiti di latenza:

Frequenza Espressione cron Caso d'uso
Ogni 5 minuti */5 * * * * Volume elevato, bassa latenza
Ogni 15 minuti */15 * * * * Volume medio
Ogni ora 0 * * * * Standard (consigliato)
Ogni 6 ore 0 */6 * * * Volume basso, elaborazione batch
Ogni giorno 0 0 * * * Raccolta dei dati storici

Testare l'integrazione

  1. Nella console Cloud Scheduler, trova onepassword-audit-collector-hourly.
  2. Fai clic su Forza esecuzione per attivare il job manualmente.
  3. Attendi qualche secondo.
  4. Vai a Cloud Run > Servizi.
  5. Fai clic su onepassword-audit-collector.
  6. Fai clic sulla scheda Log.
  7. Verifica che la funzione sia stata eseguita correttamente. Cerca:

    --- Processing endpoint: auditevents ---
    Page 1: Retrieved X events
    Wrote X auditevents records to gs://onepassword-audit-secops-logs/onepassword-audit/auditevents_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Vai a Cloud Storage > Bucket.

  9. Fai clic su onepassword-audit-secops-logs.

  10. Vai alla cartella onepassword-audit/.

  11. Verifica che sia stato creato un nuovo file .ndjson con il timestamp corrente.

Se visualizzi errori nei log:

  • HTTP 401: controlla il token di connessione nella variabile di ambiente OP_BEARER_TOKEN. Il token potrebbe essere scaduto.
  • HTTP 429: limitazione della frequenza. La funzione viene ritentata automaticamente con backoff. L'API 1Password Events consente 600 richieste al minuto e 30.000 richieste all'ora.
  • Variabili di ambiente mancanti: verifica che tutte le variabili richieste siano impostate nella configurazione della funzione Cloud Run.
  • Nessun record restituito: verifica che il token di autenticazione abbia accesso agli eventi di controllo utilizzando l'endpoint di introspezione.

Recuperare il account di servizio Google SecOps

Google SecOps utilizza un account di servizio univoco per leggere i dati dal tuo bucket GCS. Devi concedere a questo account di servizio l'accesso al tuo bucket.

Recupera l'email del account di servizio

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Fai clic su Configura un singolo feed.
  4. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, 1Password Audit Events Logs).
  5. Seleziona Google Cloud Storage V2 come Tipo di origine.
  6. Seleziona Eventi di controllo di 1Password come Tipo di log.
  7. Fai clic su Ottieni service account. Verrà visualizzata un'email univoca del account di servizio, ad esempio:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copia questo indirizzo email per utilizzarlo nel passaggio successivo.

  9. Fai clic su Avanti.

  10. Specifica i valori per i seguenti parametri di input:

    • URL del bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:

      gs://onepassword-audit-secops-logs/onepassword-audit/
      
    • Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze:

      • Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
      • Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
      • Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.

    • Età massima del file: includi i file modificati nell'ultimo numero di giorni (il valore predefinito è 180 giorni)

    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset

    • Etichette di importazione: l'etichetta da applicare agli eventi di questo feed

  11. Fai clic su Avanti.

  12. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Concedi le autorizzazioni IAM al account di servizio Google SecOps

Il account di servizio Google SecOps deve avere il ruolo Visualizzatore oggetti Storage nel bucket GCS.

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic su onepassword-audit-secops-logs.
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi entità: incolla l'email del account di servizio Google SecOps
    • Assegna i ruoli: seleziona Visualizzatore oggetti Storage
  6. Fai clic su Salva.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.