Collect Qualys asset context logs
This document explains how to ingest Qualys asset context logs to Google Security Operations using Google Cloud Storage.
Qualys asset context provides host asset inventory data (IP addresses, DNS names, operating system, cloud provider metadata, and tags) from the Qualys platform. Because this data is pulled from the Qualys API, a Cloud Run function periodically queries the Qualys Host List API, converts the XML response to JSON, and writes it to a Cloud Storage bucket, where Google SecOps ingests it.
Before you begin
Make sure you have the following prerequisites:
- A Google SecOps instance
- A Google Cloud project with the Cloud Storage, Cloud Run, Pub/Sub, and Cloud Scheduler APIs enabled
- Permissions to create and manage Cloud Storage buckets, service accounts, Cloud Run functions, Pub/Sub topics, and Cloud Scheduler jobs
- A Qualys account with API access (a user with the Manager role, or a role that has API access and visibility of the assets you want to collect)
Create a Cloud Storage bucket
- Go to the Google Cloud console.
- Select your project or create a new one.
- Go to Cloud Storage > Buckets.
- Click Create bucket.
- Provide the following configuration details:
- Name your bucket: Enter a globally unique name (for example,
qualys-asset-context-logs). - Location type: Choose based on your needs (Region, Dual-region, or Multi-region).
- Location: Select the location (for example,
us-central1). - Storage class: Select Standard.
- Access control: Select Uniform.
- Name your bucket: Enter a globally unique name (for example,
- Click Create.
Collect Qualys API credentials
- Sign in to the Qualys web application.
- Identify or create an API user:
- Go to Users > User Management.
- Use an existing user, or click New > User and assign the Manager role (or a role with API access).
- Record the user's username and password.
Identify your Qualys API server URL (it is specific to your Qualys platform, for example,
https://qualysapi.qg3.apps.qualys.com). To find it, see Identify your Qualys platform.
Create a service account for the Cloud Run function
- Go to IAM & Admin > Service Accounts.
- Click Create service account.
- Provide the following configuration details:
- Service account name: Enter
qualys-asset-context-sa. - Service account description: Enter
Service account for Cloud Run function to collect Qualys asset context logs.
- Service account name: Enter
- Click Create and continue.
- Grant the following roles:
- Storage Object Admin: write host records to the bucket.
- Cloud Run Invoker: allow Pub/Sub to invoke the function.
- Click Continue, and then click Done.
Grant the service account access to the Cloud Storage bucket
- Go to Cloud Storage > Buckets.
- Click your bucket name.
- Go to the Permissions tab.
- Click Grant access.
- Provide the following configuration details:
- Add principals: Enter the service account email (
qualys-asset-context-sa@<PROJECT_ID>.iam.gserviceaccount.com). - Assign roles: Select Storage Object Admin.
- Add principals: Enter the service account email (
- Click Save.
Create a Pub/Sub topic
- Go to Pub/Sub > Topics.
- Click Create topic.
- In the Topic ID field, enter
qualys-asset-context-trigger. - Click Create.
Create the Cloud Run function to collect logs
The Cloud Run function is triggered by Pub/Sub messages from Cloud Scheduler. It calls the Qualys FO Host List API, converts the XML response to JSON, and writes the host records to the bucket.
- Go to Cloud Run.
- Click Create service.
- Select Function to use the inline code editor.
- Provide the following configuration details:
- Service name: Enter
qualys-asset-context-collector. - Region: Select the region matching your bucket (for example,
us-central1). - Runtime: Select Python 3.12 or later.
- Service name: Enter
- In the Trigger section:
- Click Add trigger.
- Select Cloud Pub/Sub.
- In Select a Cloud Pub/Sub topic, select
qualys-asset-context-trigger. - Click Save.
- In the Authentication section, select Require authentication.
- Expand Containers, Networking, Security, and on the Security tab set Service account to
qualys-asset-context-sa. On the Containers > Variables & Secrets tab, add the following environment variables:
Variable name Example value GCS_BUCKETqualys-asset-context-logsGCS_PREFIXqualys-asset-context/QUALYS_BASE_URLhttps://qualysapi.qg3.apps.qualys.comQUALYS_USERNAMEyour-qualys-usernameQUALYS_PASSWORDyour-qualys-passwordTRUNCATION_LIMIT1000HTTP_TIMEOUT300MAX_PAGES50Set the Request timeout to
600seconds and Memory to 512 MiB or higher.Click Create. After the service is created, the inline code editor opens.
Add the function code
- In the Function entry point field, enter
main. In the inline code editor, replace the contents of
main.pywith the following code:import functions_framework from google.cloud import storage import os import time import uuid import json import base64 import requests import xmltodict storage_client = storage.Client() GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'qualys-asset-context/') QUALYS_BASE_URL = os.environ.get('QUALYS_BASE_URL') QUALYS_USERNAME = os.environ.get('QUALYS_USERNAME') QUALYS_PASSWORD = os.environ.get('QUALYS_PASSWORD') TRUNCATION_LIMIT = int(os.environ.get('TRUNCATION_LIMIT', '1000')) HTTP_TIMEOUT = int(os.environ.get('HTTP_TIMEOUT', '300')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '50')) def _headers(): """Qualys FO API requires Basic auth and the X-Requested-With header.""" token = base64.b64encode(f'{QUALYS_USERNAME}:{QUALYS_PASSWORD}'.encode()).decode() return {'Authorization': f'Basic {token}', 'X-Requested-With': 'Cloud Run function'} def _extract_hosts(xml_text): """Parse the FO Host List XML into (list_of_host_dicts, next_page_url).""" doc = xmltodict.parse(xml_text) response = (doc.get('HOST_LIST_OUTPUT') or {}).get('RESPONSE') or {} host_list = (response.get('HOST_LIST') or {}).get('HOST') if host_list is None: hosts = [] elif isinstance(host_list, list): hosts = host_list else: hosts = [host_list] # When the result is truncated, RESPONSE contains <WARNING><URL>...</URL></WARNING>. warning = response.get('WARNING') next_url = warning.get('URL') if isinstance(warning, dict) else None return hosts, next_url def _write_hosts(bucket, hosts, page_num): """Write one batch of hosts to GCS as JSON, one host per line.""" ts_path = time.strftime('%Y/%m/%d', time.gmtime()) uniq = f'{int(time.time() * 1e6)}_{uuid.uuid4().hex[:8]}' key = f'{GCS_PREFIX}{ts_path}/qualys_asset_context_p{page_num:03d}_{uniq}.json' lines = [json.dumps(host, separators=(',', ':')) for host in hosts] bucket.blob(key).upload_from_string('\n'.join(lines), content_type='application/x-ndjson') return key @functions_framework.cloud_event def main(cloud_event): """Triggered by Pub/Sub to pull the Qualys host asset inventory and write it to GCS.""" if not all([GCS_BUCKET, QUALYS_BASE_URL, QUALYS_USERNAME, QUALYS_PASSWORD]): print('Error: missing required environment variables') return bucket = storage_client.bucket(GCS_BUCKET) session = requests.Session() session.headers.update(_headers()) url = f"{QUALYS_BASE_URL.rstrip('/')}/api/2.0/fo/asset/host/" params = { 'action': 'list', 'details': 'All', 'truncation_limit': TRUNCATION_LIMIT, # show_cloud_tags, host_metadata, and show_tags add the cloud provider # tags, cloud instance metadata, and asset tags that the parser maps. 'show_cloud_tags': 1, 'host_metadata': 'all', 'show_tags': 1, } pages = 0 total = 0 try: while pages < MAX_PAGES: resp = session.get(url, params=params, timeout=HTTP_TIMEOUT) resp.raise_for_status() hosts, next_url = _extract_hosts(resp.text) if hosts: _write_hosts(bucket, hosts, pages + 1) total += len(hosts) pages += 1 if not next_url: break # The pagination URL already includes every query parameter. url, params = next_url, None print(f'Wrote {total} hosts across {pages} pages') except Exception as e: print(f'Error collecting Qualys asset context: {e}') raiseCreate a
requirements.txtfile with the following contents:functions-framework==3.* google-cloud-storage==2.* requests>=2.31.0 xmltodict>=0.13.0Click Deploy and wait for the deployment to complete.
Create a Cloud Scheduler job
- Go to Cloud Scheduler.
- Click Create job.
- Provide the following configuration details:
- Name: Enter
qualys-asset-context-collector-daily. - Region: Select the same region as the Cloud Run function.
- Frequency: Enter
0 */6 * * *(every six hours). - Timezone: Select UTC.
- Target type: Select Pub/Sub.
- Topic: Select
qualys-asset-context-trigger. - Message body: Enter
{}.
- Name: Enter
- Click Create.
- To verify the setup, click Force run on the job, and then check the Cloud Run logs and the bucket to confirm that host records were written.
Configure a feed in Google SecOps to ingest Qualys asset context logs
- Go to SIEM Settings > Feeds.
- Click Add New Feed.
- On the next page, click Configure a single feed.
- In the Feed name field, enter a name for the feed (for example,
Qualys asset context logs). - Select Google Cloud Storage V2 as the Source type.
- Select Qualys asset context as the Log type.
- Click Get Service Account next to the Chronicle Service Account field, and copy the service account email.
- Click Next.
- Specify values for the following input parameters:
- Storage Bucket URI: Enter
gs://qualys-asset-context-logs/qualys-asset-context/. This URL must end with a trailing forward slash (/). - Source deletion options: Select the deletion option according to your preference.
- Maximum File Age: Include files modified in the last number of days. Default is 180 days.
- Storage Bucket URI: Enter
- Click Next.
- Review your new feed configuration in the Finalize screen, and then click Submit.
Grant the Google SecOps service account access to the bucket
- Go to Cloud Storage > Buckets.
- Click your bucket name.
- Go to the Permissions tab.
- Click Grant access.
- Provide the following configuration details:
- Add principals: Paste the Google SecOps service account email from the feed setup.
- Assign roles: Select Storage Object Viewer (or Storage Object Admin if you chose a deletion option).
- Click Save.
UDM mapping table
| Log Field | UDM Mapping | Logic |
|---|---|---|
ASSET_ID |
entity.asset.asset_id |
Directly mapped |
DNS_DATA.HOSTNAME |
entity.asset.hostname |
Directly mapped |
IP |
entity.asset.ip |
Merged |
OS |
entity.asset.platform_software.platform |
Mapped: (?i)windows → WINDOWS |
ID |
entity.asset.product_object_id |
Directly mapped |
dataset |
relations |
Merged |
| N/A | entity.asset.platform_software.platform |
Constant: WINDOWS |
| N/A | metadata.entity_type |
Constant: ASSET |
| N/A | metadata.product_name |
Constant: QUALYS ASSET CONTEXT |
| N/A | metadata.vendor_name |
Constant: QUALYS ASSET CONTEXT |
Need more help? Get answers from Community members and Google SecOps professionals.