Collect Fortra Digital Guardian (formerly Digital Guardian EDR) logs

Supported in:

This document explains how to ingest Fortra Digital Guardian (formerly known as Digital Guardian EDR) logs to Google Security Operations using Google Cloud Storage V2 using a Cloud Run function.

Fortra Digital Guardian is a comprehensive data loss prevention and endpoint detection and response platform that provides visibility into system, user, and data events across endpoints, networks, and cloud applications. The Analytics & Reporting Cloud (ARC) service delivers advanced analytics, workflow, and reporting capabilities for holistic data protection.

The Cloud Run function authenticates to the ARC Export API using OAuth 2.0, retrieves export data, acknowledges the bookmark to advance to the next chunk, writes the results as NDJSON to a GCS bucket, and Google SecOps ingests them through a GCS V2 feed.

Before you begin

Make sure that you have the following prerequisites:

  • A Google SecOps instance.
  • A Google Cloud project with the following APIs enabled:
    • Cloud Storage
    • Cloud Run functions
    • Cloud Scheduler
    • Pub/Sub
    • Cloud Build
  • Permissions to create and manage GCS buckets, Cloud Run functions, Pub/Sub topics, and Cloud Scheduler jobs.
  • Privileged access to the Digital Guardian Management Console (DGMC).
  • Access to Digital Guardian Analytics & Reporting Cloud (ARC) Tenant Settings.
  • Administrator permissions to configure Cloud Services in DGMC.
  • An Export Profile created in DGMC with a valid GUID.

Create a Google Cloud Storage bucket

  1. Go to the Google Cloud Console.
  2. Select your project or create a new one.
  3. In the navigation menu, go to Cloud Storage > Buckets.
  4. Click Create bucket.
  5. Provide the following configuration details:

    Setting Value
    Name your bucket Enter a globally unique name (e.g., digitalguardian-edr-logs)
    Location type Choose based on your needs (Region, Dual-region, Multi-region)
    Location Select the location closest to your instance (e.g., us-central1)
    Storage class Standard (recommended for frequently accessed logs)
    Access control Uniform (recommended)
    Protection tools Optional: Enable object versioning or retention policy
  6. Click Create.

Collect Digital Guardian API credentials

Obtain API credentials

Get Client ID and Client Secret from ARC Tenant Settings

  1. Sign in to the Digital Guardian ARC portal.
  2. Go to ARC Tenant Settings.
  3. Copy and save the following values:
    • Tenant ID: This is your Client ID.
    • Authentication Token: This is your Client Secret.

Get gateway URLs from DGMC

  1. Sign in to the Digital Guardian Management Console (DGMC).
  2. Go to System > Configuration > Cloud Services.
  3. In the API Access section, record:
    • Access Gateway Base URL: e.g., https://accessgw-usw.msp.digitalguardian.com
    • Authorization Server URL: e.g., https://authsrv.msp.digitalguardian.com/as/token.oauth2

Create and configure an export profile

  1. In the DGMC, go to Admin > Reports > Export Profiles.
  2. Click Create Export Profile.
  3. Set Export Format to JSON Flattened Table.
  4. Click Save and copy the GUID from the profile details (format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).

Create the Cloud Run function

  • Prepare function source files

    • requirements.txt

      functions-framework==3.*
      google-cloud-storage==2.*
      urllib3==2.*
      
    • main.py

      """Cloud Run function to ingest Digital Guardian EDR logs into GCS."""
      
      import json
      import os
      import time
      import urllib.parse
      from datetime import datetime, timezone
      
      import functions_framework
      import urllib3
      from google.cloud import storage
      
      GCS_BUCKET = os.environ["GCS_BUCKET"]
      GCS_PREFIX = os.environ.get("GCS_PREFIX", "digitalguardian_edr")
      STATE_KEY = os.environ.get("STATE_KEY", "digitalguardian_edr_state.json")
      AUTH_SERVER_URL = os.environ["AUTH_SERVER_URL"]
      ARC_SERVER_URL = os.environ["ARC_SERVER_URL"]
      CLIENT_ID = os.environ["CLIENT_ID"]
      CLIENT_SECRET = os.environ["CLIENT_SECRET"]
      EXPORT_PROFILE_GUID = os.environ["EXPORT_PROFILE_GUID"]
      MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000"))
      
      http = urllib3.PoolManager()
      gcs = storage.Client()
      
      def _get_access_token() -> str:
              body = urllib.parse.urlencode({
                      "grant_type": "client_credentials",
                      "client_id": CLIENT_ID,
                      "client_secret": CLIENT_SECRET,
                      "scope": "client",
              })
              resp = http.request(
                      "POST",
                      AUTH_SERVER_URL,
                      body=body,
                      headers={"Content-Type": "application/x-www-form-urlencoded"},
              )
              if resp.status != 200:
                      raise RuntimeError(f"OAuth token request failed: {resp.status}")
              return json.loads(resp.data.decode("utf-8"))["access_token"]
      
      def _arc_get(token: str, path: str, retries: int = 5) -> dict:
              url = f"{ARC_SERVER_URL}{path}"
              headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
              backoff = 2
              for attempt in range(retries):
                      resp = http.request("GET", url, headers=headers)
                      if resp.status == 200:
                              return json.loads(resp.data.decode("utf-8"))
                      if resp.status == 429:
                              time.sleep(backoff * (2 ** attempt))
                              continue
                      raise RuntimeError(f"ARC API error: {resp.status}")
              raise RuntimeError("ARC API rate limit exceeded.")
      
      def _arc_acknowledge(token: str) -> None:
              url = f"{ARC_SERVER_URL}/rest/1.0/export/{EXPORT_PROFILE_GUID}/acknowledge"
              headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
              resp = http.request("POST", url, headers=headers)
              if resp.status not in (200, 204):
                      raise RuntimeError(f"ARC acknowledge failed: {resp.status}")
      
      def _load_state() -> dict:
              bucket = gcs.bucket(GCS_BUCKET)
              blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
              return json.loads(blob.download_as_text()) if blob.exists() else {}
      
      def _save_state(state: dict) -> None:
              bucket = gcs.bucket(GCS_BUCKET)
              blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
              blob.upload_from_string(json.dumps(state), content_type="application/json")
      
      def _fetch_export(token: str) -> list:
              path = f"/rest/1.0/export/{EXPORT_PROFILE_GUID}"
              data = _arc_get(token, path)
              records = data if isinstance(data, list) else data.get("data", [])
              return records[:MAX_RECORDS]
      
      def _write_ndjson(records: list, run_ts: str) -> str:
              bucket = gcs.bucket(GCS_BUCKET)
              blob_path = f"{GCS_PREFIX}/year={run_ts[:4]}/month={run_ts[5:7]}/day={run_ts[8:10]}/{run_ts}_export.ndjson"
              blob = bucket.blob(blob_path)
              ndjson = "\n".join(json.dumps(r, separators=(",", ":")) for r in records)
              blob.upload_from_string(ndjson, content_type="application/x-ndjson")
              return blob_path
      
      @functions_framework.cloud_event
      def main(cloud_event):
              state = _load_state()
              now = datetime.now(timezone.utc)
              token = _get_access_token()
              records = _fetch_export(token)
              if not records:
                      return "OK"
              run_ts = now.strftime("%Y-%m-%dT%H%M%SZ")
              blob_path = _write_ndjson(records, run_ts)
              _arc_acknowledge(token)
              state["last_run"] = now.isoformat()
              _save_state(state)
              return "OK"
      

Retrieve the Google SecOps service account and configure the feed

  1. Go to SIEM Settings > Feeds.
  2. Click Add New Feed.
  3. Select Google Cloud Storage V2 as the Source type.
  4. Select Digital Guardian EDR as the Log type.
  5. Click Get Service Account and copy the email address.
  6. Specify the Storage bucket URL: gs://digitalguardian-edr-logs/digitalguardian_edr/ (ensure the trailing slash is included).

UDM mapping table

Log Field UDM Mapping Logic
Application target.application Value copied directly.
Bytes_Written network.sent_bytes Converted to uinteger.
Command_Line principal.process.command_line Cleaned of trailing quotes.
Computer_Name principal.hostname Source hostname mapping.
Destination_File target.file.names Direct mapping.
IP_Address principal.ip Principal IP address.
Process_PID principal.process.pid Process ID mapping.
Rule security_result.rule_name Direct mapping.
Severity security_result.severity Scaled to LOW, MEDIUM, HIGH, CRITICAL.
URL_Path target.url Web destination mapping.
User_Name principal.user.userid Extracted via grok.
security_action security_result.action Direct mapping.
N/A metadata.product_name Set to "Enterprise DLP Platform".
N/A metadata.vendor_name Set to "DigitalGuardian".

Need more help? Get answers from Community members and Google SecOps professionals.