Collect Lucid audit logs
This document explains how to ingest Lucid audit logs to Google Security Operations using Google Cloud Storage V2.
Lucid Software provides a visual collaboration suite including Lucidchart, Lucidspark, and Lucidscale. The Audit Logs API, available exclusively to Enterprise Shield customers, captures security and compliance events across the organization, including account access, document activity, user management, admin actions, and team operations.
Before you begin
Make sure you have the following prerequisites:
- A Google SecOps instance
- A GCP project with Cloud Storage API enabled
- Permissions to create and manage GCS buckets
- Permissions to manage IAM policies on GCS buckets
- Permissions to create Cloud Run services, Pub/Sub topics, and Cloud Scheduler jobs
- A Lucid Enterprise Shield account with admin or account owner privileges
- Access to the Lucid Developer Portal with developer tools enabled
Enable developer tools in Lucid
Before creating API credentials, you must have access to the Lucid Developer Portal. There are two ways to enable developer tools:
Option 1: Enable via user settings
- Sign in to lucid.app.
- Click your profile icon in the upper right corner.
- Select Account Settings.
Check the Enable developer tools checkbox.
Option 2: Enable via admin role assignment
- Sign in to lucid.app as an account owner or team admin.
- Navigate to the admin panel.
- Go to the Users section.
- Select the target user.
Edit the user's roles and assign the Developer role.
Configure Lucid API access
To enable Google SecOps to retrieve audit logs, you need to create an OAuth 2.0 client and generate an account-level access token with the account.audit.logs scope.
Create an OAuth 2.0 application and client
- Navigate to the Lucid Developer Portal.
- Click Create Application.
- Enter a name for your application (for example,
Chronicle SIEM Integration). - Click on the newly created application to access its settings.
- Navigate to the OAuth 2.0 tab.
Enter a name for your OAuth 2.0 client (for example,
Chronicle Audit Log Collector).Click Create OAuth 2.0 client.
Record the client credentials
After creating the OAuth 2.0 client, the portal displays your credentials:
- Client ID: Your unique client identifier.
- Client Secret: Your API secret key.
Important: Copy and save the client secret immediately. If the client secret is compromised, click the Reset Client Secret button on the OAuth 2.0 settings page. Resetting the secret immediately revokes access until the new secret is updated in your integration.
Register a redirect URI
- On the OAuth 2.0 settings page, click Add Redirect URI.
Enter the redirect URI for your integration. If using the Lucid-provided test redirect, enter:
https://lucid.app/oauth2/clients/<CLIENT_ID>/redirectReplace
<CLIENT_ID>with your actual client ID.
Obtain an account-level access token
The Audit Logs API requires an account token (not a user token). An account admin must authorize the OAuth 2.0 client to create a token on behalf of the account.
Direct an account admin to the following authorization URL in a browser:
https://lucid.app/oauth2/authorizeAccount?client_id=<CLIENT_ID>&redirect_uri=<REDIRECT_URI>&scope=account.audit.logsReplace
<CLIENT_ID>and<REDIRECT_URI>with your actual values.The admin reviews the requested permissions on the consent screen and clicks Allow.
Lucid redirects to the redirect URI with an authorization
codequery parameter.Exchange the authorization code for an access token by making a POST request to the token endpoint:
curl --request POST \ --url https://api.lucid.co/oauth2/token \ --header 'Content-Type: application/json' \ --data '{ "grant_type": "authorization_code", "client_id": "<CLIENT_ID>", "client_secret": "<CLIENT_SECRET>", "code": "<AUTHORIZATION_CODE>", "redirect_uri": "<REDIRECT_URI>" }'The response includes an
access_tokenand arefresh_token. Record both values securely.
Verify API access
Confirm that the access token works by making a test request:
curl --request GET \ --url 'https://api.lucid.co/auditLogs?pageSize=1' \ --header 'Authorization: Bearer <ACCESS_TOKEN>' \ --header 'Lucid-Api-Version: 1' \ --header 'Accept: application/json'
A successful response returns a JSON array of audit log events.
Required API permissions
The OAuth 2.0 client requires the following scope:
Scope Token Type Purpose account.audit.logs Account Retrieve audit log events for the account
Create Google Cloud Storage bucket
- Go to the Google Cloud Console.
- Select your project or create a new one.
- In the navigation menu, go to Cloud Storage > Buckets.
- Click Create bucket.
Provide the following configuration details:
Setting Value Name your bucket Enter a globally unique name (for example, lucid-audit-logs-gcs)Location type Choose based on your needs (Region, Dual-region, Multi-region) Location Select the location (for example, us-central1)Storage class Standard (recommended for frequently accessed logs) Access control Uniform (recommended) Protection tools Optional: Enable object versioning or retention policy Click Create.
Create service account for Cloud Run function
- In the GCP Console, go to IAM & Admin > Service Accounts.
- Click Create Service Account.
- Provide the following configuration details:
- Service account name: Enter
lucid-audit-collector-sa - Service account description: Enter
Service account for Cloud Run function to collect Lucid audit logs
- Service account name: Enter
- Click Create and Continue.
- In the Grant this service account access to project section, add the following roles:
- Click Select a role.
- Search for and select Storage Object Admin.
- Click + Add another role.
- Search for and select Cloud Run Invoker.
- Click + Add another role.
- Search for and select Cloud Functions Invoker.
- Click Continue.
- Click Done.
Grant IAM permissions on GCS bucket
- Go to Cloud Storage > Buckets.
- Click on your bucket name (
lucid-audit-logs-gcs). - Go to the Permissions tab.
- Click Grant access.
- Provide the following configuration details:
- Add principals: Enter the service account email (
lucid-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com) - Assign roles: Select Storage Object Admin
- Add principals: Enter the service account email (
- Click Save.
Create Pub/Sub topic
- In the GCP Console, go to Pub/Sub > Topics.
- Click Create topic.
- Provide the following configuration details:
- Topic ID: Enter
lucid-audit-trigger - Leave other settings as default
- Topic ID: Enter
- Click Create.
Create Cloud Run function to collect logs
The Cloud Run function will be triggered by Pub/Sub messages from Cloud Scheduler to fetch logs from the Lucid Audit Logs API and write them to GCS.
- In the GCP Console, go to Cloud Run.
- Click Create service.
- Select Function (use an inline editor to create a function).
In the Configure section, provide the following configuration details:
Setting Value Service name lucid-audit-collectorRegion Select region matching your GCS bucket (for example, us-central1)Runtime Select Python 3.12 or later In the Trigger (optional) section:
- Click + Add trigger.
- Select Cloud Pub/Sub.
- In Select a Cloud Pub/Sub topic, choose
lucid-audit-trigger. - Click Save.
In the Authentication section:
- Select Require authentication.
- Check Identity and Access Management (IAM).
Scroll down and expand Containers, Networking, Security.
Go to the Security tab:
- Service account: Select
lucid-audit-collector-sa
- Service account: Select
Go to the Containers tab:
- Click Variables & Secrets.
- Click + Add variable for each environment variable:
Variable Name Example Value Description GCS_BUCKETlucid-audit-logs-gcsGCS bucket name GCS_PREFIXlucid-auditPrefix for log files STATE_KEYlucid-audit/state.jsonState file path LUCID_CLIENT_IDyour-oauth-client-idLucid OAuth 2.0 Client ID LUCID_CLIENT_SECRETyour-oauth-client-secretLucid OAuth 2.0 Client Secret LUCID_REFRESH_TOKENyour-refresh-tokenLucid OAuth 2.0 Refresh Token LOOKBACK_HOURS24Initial lookback period PAGE_SIZE200Records per API page (max 200) MAX_PAGES50Max pages per run In the Variables & Secrets section, scroll down to Requests:
- Request timeout: Enter
600seconds (10 minutes)
- Request timeout: Enter
Go to the Settings tab:
- In the Resources section:
- Memory: Select 512 MiB or higher
- CPU: Select 1
- In the Resources section:
In the Revision scaling section:
- Minimum number of instances: Enter
0 - Maximum number of instances: Enter
100
- Minimum number of instances: Enter
Click Create.
Wait for the service to be created (1-2 minutes).
After the service is created, the inline code editor will open automatically.
Add function code
- Enter main in the Entry point field.
- In the inline code editor, create two files:
main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import re http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=10.0, read=60.0), retries=False, ) storage_client = storage.Client() GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'lucid-audit') STATE_KEY = os.environ.get('STATE_KEY', 'lucid-audit/state.json') LUCID_CLIENT_ID = os.environ.get('LUCID_CLIENT_ID') LUCID_CLIENT_SECRET = os.environ.get('LUCID_CLIENT_SECRET') LUCID_REFRESH_TOKEN = os.environ.get('LUCID_REFRESH_TOKEN') LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '200')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '50')) API_BASE = 'https://api.lucid.co' @functions_framework.cloud_event def main(cloud_event): if not all([GCS_BUCKET, LUCID_CLIENT_ID, LUCID_CLIENT_SECRET, LUCID_REFRESH_TOKEN]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(GCS_BUCKET) state = load_state(bucket) now = datetime.now(timezone.utc) if isinstance(state, dict) and state.get('last_event_time'): try: last_val = state['last_event_time'] if last_val.endswith('Z'): last_val = last_val[:-1] + '+00:00' last_time = datetime.fromisoformat(last_val) last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") last_time = now - timedelta(hours=LOOKBACK_HOURS) else: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching audit logs from {last_time.isoformat()} to {now.isoformat()}") access_token, new_refresh_token = refresh_access_token() if new_refresh_token: save_refresh_token(bucket, new_refresh_token) records, newest_event_time = fetch_audit_logs(access_token, last_time, now) if not records: print("No new audit log records found.") save_state(bucket, now.isoformat()) return timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/lucid_audit_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join( [json.dumps(record, ensure_ascii=False) for record in records] ) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") if newest_event_time: save_state(bucket, newest_event_time) else: save_state(bucket, now.isoformat()) print(f"Successfully processed {len(records)} audit log records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def refresh_access_token(): url = f"{API_BASE}/oauth2/token" body = json.dumps({ 'grant_type': 'refresh_token', 'refresh_token': LUCID_REFRESH_TOKEN, 'client_id': LUCID_CLIENT_ID, 'client_secret': LUCID_CLIENT_SECRET }).encode('utf-8') response = http.request( 'POST', url, body=body, headers={'Content-Type': 'application/json'} ) if response.status != 200: raise Exception( f"Token refresh failed: {response.status} - " f"{response.data.decode('utf-8')}" ) data = json.loads(response.data.decode('utf-8')) access_token = data.get('access_token') new_refresh_token = data.get('refresh_token') if not access_token: raise Exception("No access_token in token refresh response") print("Successfully refreshed Lucid API access token") return access_token, new_refresh_token def fetch_audit_logs(access_token, start_time, end_time): records = [] newest_time = None page_num = 0 next_page_url = None since_str = start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z') until_str = end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z') while page_num < MAX_PAGES: page_num += 1 if next_page_url: url = next_page_url else: url = ( f"{API_BASE}/auditLogs" f"?pageSize={PAGE_SIZE}" f"&since={since_str}" f"&until={until_str}" ) headers = { 'Authorization': f'Bearer {access_token}', 'Lucid-Api-Version': '1', 'Accept': 'application/json' } response = http.request('GET', url, headers=headers) if response.status == 429: print(f"Rate limited on page {page_num}. Stopping pagination.") break if response.status != 200: print( f"API error on page {page_num}: {response.status} - " f"{response.data.decode('utf-8')}" ) break page_results = json.loads(response.data.decode('utf-8')) if not page_results: print(f"No more results at page {page_num}") break records.extend(page_results) print(f"Page {page_num}: Retrieved {len(page_results)} events " f"(total: {len(records)})") for event in page_results: try: event_time = event.get('eventTimestamp') if event_time: if newest_time is None or event_time > newest_time: newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") link_header = response.headers.get('Link', '') next_page_url = parse_link_header(link_header) if not next_page_url: print(f"No next page link found. Pagination complete.") break if len(page_results) < PAGE_SIZE: break print(f"Total audit log records fetched: {len(records)} from {page_num} pages") return records, newest_time def parse_link_header(link_header): if not link_header: return None match = re.search(r'<([^>]+)>;\s*rel="next"', link_header) if match: return match.group(1) return None def load_state(bucket): try: blob = bucket.blob(STATE_KEY) if blob.exists(): return json.loads(blob.download_as_text()) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, last_event_time_iso): try: state = { 'last_event_time': last_event_time_iso, 'last_run': datetime.now(timezone.utc).isoformat() } blob = bucket.blob(STATE_KEY) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def save_refresh_token(bucket, new_refresh_token): try: token_key = f"{GCS_PREFIX}/refresh_token.json" blob = bucket.blob(token_key) blob.upload_from_string( json.dumps({'refresh_token': new_refresh_token}, indent=2), content_type='application/json' ) print("Saved new refresh token to GCS") except Exception as e: print(f"Warning: Could not save refresh token: {e}")requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0 ```
- Click Deploy to save and deploy the function.
Wait for deployment to complete (2-3 minutes).
Create Cloud Scheduler job
- In the GCP Console, go to Cloud Scheduler.
- Click Create Job.
Provide the following configuration details:
Setting Value Name lucid-audit-collector-hourlyRegion Select same region as Cloud Run function Frequency 0 * * * *(every hour, on the hour)Timezone Select timezone (UTC recommended) Target type Pub/Sub Topic Select lucid-audit-triggerMessage body {}(empty JSON object)Click Create.
Test the integration
- In the Cloud Scheduler console, find your job (
lucid-audit-collector-hourly). - Click Force run to trigger the job manually.
- Wait a few seconds.
- Go to Cloud Run > Services.
- Click on
lucid-audit-collector. - Click the Logs tab.
Verify the function executed successfully. Look for:
Fetching audit logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Successfully refreshed Lucid API access token Page 1: Retrieved X events (total: X) Wrote X records to gs://lucid-audit-logs-gcs/lucid-audit/lucid_audit_YYYYMMDD_HHMMSS.ndjson Successfully processed X audit log recordsGo to Cloud Storage > Buckets.
Click on
lucid-audit-logs-gcs.Navigate to the
lucid-audit/folder.Verify that a new
.ndjsonfile was created with the current timestamp.
If you see errors in the logs:
- HTTP 401: Verify the
LUCID_CLIENT_ID,LUCID_CLIENT_SECRET, andLUCID_REFRESH_TOKENenvironment variables are correct - HTTP 403: Verify the OAuth 2.0 client has the
account.audit.logsscope and the token is an account token - HTTP 429: Rate limiting — the function will stop pagination and resume on the next scheduled run
- Missing environment variables: Verify all required variables are set in the Cloud Run function configuration
Audit log event categories
Lucid audit logs are organized into the following event categories:
| Category | Description |
|---|---|
| Logins | Events associated with user logins |
| Content | Events associated with document and folder access and modification |
| Administration | Events associated with admin activity and account changes |
| User | Events associated with user actions on personal settings |
| Team | Events associated with team operations |
For a complete list of event types and their schemas, see the Lucid Audit Log Events documentation.
FedRAMP environment
Users in the Lucid FedRAMP environment use different authorization and API endpoints. See the Lucid FedRAMP environment documentation for the correct endpoint URLs.
Retrieve the Google SecOps service account
- Go to SIEM Settings > Feeds.
- Click Add New Feed.
- Click Configure a single feed.
- In the Feed name field, enter a name for the feed (for example,
Lucid Audit Logs). - Select Google Cloud Storage V2 as the Source type.
- Select Lucid as the Log type.
Click Get Service Account. A unique service account email will be displayed, for example:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopy this email address for use in the next step.
Click Next.
Specify values for the following input parameters:
Storage bucket URL: Enter the GCS bucket URI with the prefix path:
gs://lucid-audit-logs-gcs/lucid-audit/
Source deletion option: Select the deletion option according to your preference:
- Never: Never deletes any files after transfers (recommended for testing).
- Delete transferred files: Deletes files after successful transfer.
Delete transferred files and empty directories: Deletes files and empty directories after successful transfer.
Maximum File Age: Include files modified in the last number of days (default is 180 days)
Asset namespace: The asset namespace
Ingestion labels: The label to be applied to the events from this feed
Click Next.
Review your new feed configuration in the Finalize screen, and then click Submit.
Grant IAM permissions to the Google SecOps service account
The Google SecOps service account needs Storage Object Viewer role on your GCS bucket.
- Go to Cloud Storage > Buckets.
- Click on
lucid-audit-logs-gcs. - Go to the Permissions tab.
- Click Grant access.
- Provide the following configuration details:
- Add principals: Paste the Google SecOps service account email
- Assign roles: Select Storage Object Viewer
Click Save.
UDM mapping table
| Log Field | UDM Mapping | Logic |
|---|---|---|
| document_opened | additional.fields | Merged with labels created from each source field if not empty |
| share_link_enable | additional.fields | |
| restricted_account_enable | additional.fields | |
| event1.documentIds | additional.fields | |
| team_folder | additional.fields | |
| method_type | additional.fields | |
| extensions.auth.type | extensions.auth.type | Set to "AUTHTYPE_UNSPECIFIED" for login/logout events |
| eventTimestamp | metadata.event_timestamp | Converted using date match ISO8601 or yyyy-MM-ddTHH:mm:ss.SSSSSSZ |
| has_user_login | metadata.event_type | Set based on has_* flags: USER_LOGIN if has_user_login and has_target_user, USER_LOGOUT if has_user_logout and has_target_user, USER_RESOURCE_UPDATE_CONTENT if has_user_resource_updated and has_target_resource and has_principal_user, USER_CREATION if has_principal_user and has_target_application and has_target_user, USER_RESOURCE_CREATION if has_target_resource and has_principal_user, FILE_CREATION if has_target_file and has_principal and has_target, USER_RESOURCE_ACCESS if has_target_resource, USER_COMMUNICATION if has_principal_user, STATUS_UPDATE if has_principal and has_target false, else GENERIC_EVENT |
| has_user_logout | metadata.event_type | |
| has_user_resource_updated | metadata.event_type | |
| has_principal_user | metadata.event_type | |
| has_target_resource | metadata.event_type | |
| has_target_file | metadata.event_type | |
| has_principal | metadata.event_type | |
| has_target_user | metadata.event_type | |
| has_target_application | metadata.event_type | |
| has_target | metadata.event_type | |
| event1.event1Type | metadata.product_event_type | Value copied directly |
| actor.actorIp | principal.asset.ip | Value copied directly if not empty |
| actor.actorIp | principal.ip | Value copied directly if not empty |
| actor.actorAccountId | principal.resource.id | Value copied directly if not empty |
| actor.actorClient | principal.resource.name | Value copied directly if not empty |
| flowId | principal.resource.product_object_id | Value copied directly if not empty |
| actor.actorType | principal.user.attribute.roles | Merged from actor_type if eventType not login/logout and actor_type not empty |
| actor.actorEmail | principal.user.email_addresses | Merged if actor_email not empty and valid email and eventType not login/logout |
| actor.actorUserId | principal.user.userid | Value copied directly if not empty and eventType not login/logout |
| event1.registrationMethod | security_result.description | Value copied directly if not empty |
| event1.source | target.application | Value copied directly if not empty |
| event1.publishedLink.link | target.file.full_path | Value copied directly if not empty |
| event1.filename | target.file.names | Merged if not empty |
| targetData._targetType | target.resource.attribute.labels | Merged with labels created from each source field if not empty |
| targetData._targetId | target.resource.attribute.labels | |
| event1.format | target.resource.attribute.labels | |
| event1.method | target.resource.attribute.labels | |
| event1.publishedLink.format | target.resource.attribute.labels | |
| accountId | target.resource.id | Value copied directly if not empty |
| event1.destinationFolderName | target.resource.name | Set to destinationFolderName if not empty, else folderName if not empty, else publishedLink.name if not empty |
| event1.folderName | target.resource.name | |
| event1.publishedLink.name | target.resource.name | |
| event1.documentId | target.resource.product_object_id | Value copied directly if not empty |
| actor.actorType | target.user.attribute.roles | Merged from actor_type if eventType is login/logout, and from event_role if not empty |
| event1.role | target.user.attribute.roles | |
| actor.actorEmail | target.user.email_addresses | Merged from actor_email if login/logout and valid, and from userEmail in _target |
| targetData.userEmail | target.user.email_addresses | |
| event1.destinationFolderId | target.user.product_object_id | Set to destinationFolderId if not empty, else folderId if not empty |
| event1.folderId | target.user.product_object_id | |
| targetData.displayName | target.user.user_display_name | Value copied directly if not empty |
| actor.actorUserId | target.user.userid | Value copied directly if not empty and eventType is login/logout |
| event1.product | metadata.product_name | Value from product if not empty, else "LUCID" |
| metadata.vendor_name | metadata.vendor_name | Set to "LUCID" |
Need more help? Get answers from Community members and Google SecOps professionals.