Collect OpenTelemetry Netflow Receiver logs
This document explains how to ingest OpenTelemetry Netflow Receiver logs to Google Security Operations using Google Cloud Storage V2.
The OpenTelemetry Netflow Receiver is an open-source component that listens for netflow, sflow, and IPFIX UDP traffic and converts it to OpenTelemetry log records. This enables network traffic monitoring and analysis, including protocol identification, traffic volume analysis, port usage tracking, and byte/packet statistics.
Before you begin
Make sure that you have the following prerequisites:
- A Google SecOps instance
- A GCP project with Cloud Storage API enabled
- Permissions to create and manage GCS buckets
- Permissions to manage IAM policies on GCS buckets
- Permissions to create Cloud Run services, Pub/Sub topics, and Cloud Scheduler jobs
- Network devices capable of sending netflow, sflow, or IPFIX data
- Access to configure network device export settings
Create Google Cloud Storage bucket
- Go to the Google Cloud Console.
- Select your project or create a new one.
- In the navigation menu, go to Cloud Storage > Buckets.
- Click Create bucket.
Provide the following configuration details:
Setting Value Name your bucket Enter a globally unique name (for example, netflow-logs-bucket)Location type Choose based on your needs (Region, Dual-region, Multi-region) Location Select the location (for example, us-central1)Storage class Standard (recommended for frequently accessed logs) Access control Uniform (recommended) Protection tools Optional: Enable object versioning or retention policy Click Create.
Create service account for Cloud Run function
The Cloud Run function needs a service account with permissions to write to GCS bucket and be invoked by Pub/Sub.
Create service account
- In the GCP Console, go to IAM & Admin > Service Accounts.
- Click Create Service Account.
- Provide the following configuration details:
- Service account name: Enter
netflow-collector-sa - Service account description: Enter
Service account for Cloud Run function to collect netflow logs
- Service account name: Enter
- Click Create and Continue.
- In the Grant this service account access to project section, add the following roles:
- Click Select a role.
- Search for and select Storage Object Admin.
- Click + Add another role.
- Search for and select Cloud Run Invoker.
- Click + Add another role.
- Search for and select Cloud Functions Invoker.
- Click Continue.
- Click Done.
These roles are required for:
- Storage Object Admin: Write logs to GCS bucket and manage state files
- Cloud Run Invoker: Allow Pub/Sub to invoke the function
- Cloud Functions Invoker: Allow function invocation
Grant IAM permissions on GCS bucket
Grant the service account write permissions on the GCS bucket:
- Go to Cloud Storage > Buckets.
- Click on your bucket name (for example,
netflow-logs-bucket). - Go to the Permissions tab.
- Click Grant access.
- Provide the following configuration details:
- Add principals: Enter the service account email (for example,
netflow-collector-sa@PROJECT_ID.iam.gserviceaccount.com) - Assign roles: Select Storage Object Admin
- Add principals: Enter the service account email (for example,
- Click Save.
Create Pub/Sub topic
Create a Pub/Sub topic that Cloud Scheduler will publish to and the Cloud Run function will subscribe to.
- In the GCP Console, go to Pub/Sub > Topics.
- Click Create topic.
- Provide the following configuration details:
- Topic ID: Enter
netflow-trigger - Leave other settings as default
- Topic ID: Enter
- Click Create.
Create Cloud Run function to collect netflow logs
The Cloud Run function will run an OpenTelemetry Collector that receives netflow data and exports it to GCS.
- In the GCP Console, go to Cloud Run.
- Click Create service.
- Select Function (use an inline editor to create a function).
In the Configure section, provide the following configuration details:
Setting Value Service name netflow-collectorRegion Select region matching your GCS bucket (for example, us-central1)Runtime Select Python 3.12 or later In the Trigger (optional) section:
- Click + Add trigger.
- Select Cloud Pub/Sub.
- In Select a Cloud Pub/Sub topic, choose the Pub/Sub topic
netflow-trigger. - Click Save.
In the Authentication section:
- Select Require authentication.
- Check Identity and Access Management (IAM).
Scroll down and expand Containers, Networking, Security.
Go to the Security tab:
- Service account: Select the service account
netflow-collector-sa
- Service account: Select the service account
Go to the Containers tab:
- Click Variables & Secrets.
- Click + Add variable for each environment variable:
Variable Name Example Value Description GCS_BUCKETnetflow-logs-bucketGCS bucket name GCS_PREFIXnetflowPrefix for log files NETFLOW_PORT2055Port for netflow receiver NETFLOW_SCHEMEnetflowScheme type: netflow, sflow, or ipfix NETFLOW_SOCKETS4Number of UDP sockets NETFLOW_WORKERS8Number of decoder workers In the Variables & Secrets section, scroll down to Requests:
- Request timeout: Enter
600seconds (10 minutes)
- Request timeout: Enter
Go to the Settings tab:
- In the Resources section:
- Memory: Select 1 GiB or higher
- CPU: Select 2
- In the Resources section:
In the Revision scaling section:
- Minimum number of instances: Enter
1 - Maximum number of instances: Enter
10
- Minimum number of instances: Enter
Click Create.
Wait for the service to be created (1-2 minutes).
After the service is created, the inline code editor will open automatically.
Add function code
- Enter main in the Entry point field.
In the inline code editor, create three files:
- main.py:
import functions_framework from google.cloud import storage import json import os import subprocess import tempfile import signal import time from datetime import datetime, timezone # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'netflow') NETFLOW_PORT = os.environ.get('NETFLOW_PORT', '2055') NETFLOW_SCHEME = os.environ.get('NETFLOW_SCHEME', 'netflow') NETFLOW_SOCKETS = os.environ.get('NETFLOW_SOCKETS', '4') NETFLOW_WORKERS = os.environ.get('NETFLOW_WORKERS', '8') # Global process handle collector_process = None def create_collector_config(): """Create OpenTelemetry Collector configuration.""" timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') file_path = f"/tmp/netflow_{timestamp}.ndjson" config = { 'receivers': { 'netflow': { 'scheme': NETFLOW_SCHEME, 'hostname': '0.0.0.0', 'port': int(NETFLOW_PORT), 'sockets': int(NETFLOW_SOCKETS), 'workers': int(NETFLOW_WORKERS), 'queue_size': 5000 } }, 'processors': { 'batch': { 'timeout': '10s', 'send_batch_size': 1000 } }, 'exporters': { 'file': { 'path': file_path, 'format': 'json' } }, 'service': { 'pipelines': { 'logs': { 'receivers': ['netflow'], 'processors': ['batch'], 'exporters': ['file'] } }, 'telemetry': { 'logs': { 'level': 'info' } } } } config_path = '/tmp/otel-config.yaml' with open(config_path, 'w') as f: import yaml yaml.dump(config, f) return config_path, file_path def upload_to_gcs(file_path): """Upload netflow logs to GCS.""" if not os.path.exists(file_path) or os.path.getsize(file_path) == 0: print(f"No data to upload from {file_path}") return bucket = storage_client.bucket(GCS_BUCKET) timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/netflow_{timestamp}.ndjson" blob = bucket.blob(object_key) blob.upload_from_filename(file_path, content_type='application/x-ndjson') print(f"Uploaded {os.path.getsize(file_path)} bytes to gs://{GCS_BUCKET}/{object_key}") def signal_handler(signum, frame): """Handle shutdown signals.""" global collector_process if collector_process: print("Shutting down collector...") collector_process.terminate() collector_process.wait(timeout=10) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function to run OpenTelemetry Collector for netflow collection. Args: cloud_event: CloudEvent object containing Pub/Sub message """ global collector_process if not GCS_BUCKET: print('Error: GCS_BUCKET environment variable not set') return try: # Set up signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Create collector configuration config_path, file_path = create_collector_config() print(f"Created collector config at {config_path}") print(f"Netflow receiver listening on {NETFLOW_SCHEME}://0.0.0.0:{NETFLOW_PORT}") # Start OpenTelemetry Collector collector_process = subprocess.Popen( ['/otelcol-contrib', '--config', config_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) print(f"Started OpenTelemetry Collector (PID: {collector_process.pid})") # Run for collection period (e.g., 5 minutes) collection_time = 300 print(f"Collecting netflow data for {collection_time} seconds...") start_time = time.time() while time.time() - start_time < collection_time: if collector_process.poll() is not None: stdout, stderr = collector_process.communicate() print(f"Collector exited unexpectedly") print(f"STDOUT: {stdout}") print(f"STDERR: {stderr}") break time.sleep(10) # Stop collector if collector_process.poll() is None: print("Stopping collector...") collector_process.terminate() collector_process.wait(timeout=10) # Upload collected data upload_to_gcs(file_path) # Cleanup if os.path.exists(file_path): os.remove(file_path) if os.path.exists(config_path): os.remove(config_path) print("Netflow collection completed successfully") except Exception as e: print(f'Error during netflow collection: {str(e)}') if collector_process and collector_process.poll() is None: collector_process.terminate() raise- requirements.txt:
functions-framework==3.* google-cloud-storage==2.* PyYAML==6.*- Dockerfile:
FROM python:3.12-slim # Install OpenTelemetry Collector Contrib RUN apt-get update && apt-get install -y wget && \ wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.144.0/otelcol-contrib_0.144.0_linux_amd64.deb && \ dpkg -i otelcol-contrib_0.144.0_linux_amd64.deb && \ rm otelcol-contrib_0.144.0_linux_amd64.deb && \ apt-get clean # Set working directory WORKDIR /app # Copy requirements and install COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy function code COPY main.py . # Expose netflow port EXPOSE 2055/udp # Run function CMD ["functions-framework", "--target=main", "--port=8080"]Click Deploy to save and deploy the function.
Wait for deployment to complete (3-5 minutes).
Configure network devices to send netflow data
Configure your network devices (routers, switches, firewalls) to export netflow, sflow, or IPFIX data to the Cloud Run function.
Get Cloud Run function external IP
- In the GCP Console, go to Cloud Run > Services.
- Click on the function name
netflow-collector. - Copy the URL displayed at the top (for example,
https://netflow-collector-xxxxx-uc.a.run.app). - Extract the hostname from the URL.
Use
nslookupordigto resolve the IP address:nslookup netflow-collector-xxxxx-uc.a.run.app
Example: Cisco router netflow configuration
! Configure netflow exporter flow exporter OTEL-EXPORTER destination <CLOUD_RUN_IP> transport udp 2055 source <INTERFACE> export-protocol netflow-v9 ! Configure flow monitor flow monitor OTEL-MONITOR exporter OTEL-EXPORTER record netflow ipv4 original-input ! Apply to interface interface GigabitEthernet0/0 ip flow monitor OTEL-MONITOR input ip flow monitor OTEL-MONITOR outputExample: Generic netflow configuration
For most network devices, configure the following settings:
- Netflow version: NetFlow v5, v9, or IPFIX
- Collector IP: Cloud Run function IP address
- Collector port:
2055(or configured port) - Protocol: UDP
- Active timeout: 60 seconds (recommended)
- Inactive timeout: 15 seconds (recommended)
Create Cloud Scheduler job
Cloud Scheduler will publish messages to the Pub/Sub topic at regular intervals, triggering the Cloud Run function.
- In the GCP Console, go to Cloud Scheduler.
- Click Create Job.
Provide the following configuration details:
Setting Value Name netflow-collector-hourlyRegion Select same region as Cloud Run function Frequency 0 * * * *(every hour, on the hour)Timezone Select timezone (UTC recommended) Target type Pub/Sub Topic Select the Pub/Sub topic netflow-triggerMessage body {}(empty JSON object)Click Create.
Schedule frequency options
Choose frequency based on log volume and latency requirements:
| Frequency | Cron Expression | Use Case |
|---|---|---|
| Every 5 minutes | */5 * * * * |
High-volume, low-latency |
| Every 15 minutes | */15 * * * * |
Medium volume |
| Every hour | 0 * * * * |
Standard (recommended) |
| Every 6 hours | 0 */6 * * * |
Low volume, batch processing |
Test the integration
- In the Cloud Scheduler console, find the job
netflow-collector-hourly. - Click Force run to trigger the job manually.
- Wait a few seconds.
- Go to Cloud Run > Services.
- Click on the function name
netflow-collector. - Click the Logs tab.
Verify the function executed successfully. Look for:
Started OpenTelemetry Collector (PID: ...) Netflow receiver listening on netflow://0.0.0.0:2055 Collecting netflow data for 300 seconds... Uploaded ... bytes to gs://netflow-logs-bucket/netflow/netflow_YYYYMMDD_HHMMSS.ndjson Netflow collection completed successfullyGo to Cloud Storage > Buckets.
Click on the bucket name
netflow-logs-bucket.Navigate to the prefix folder
netflow/.Verify that a new
.ndjsonfile was created with the current timestamp.
If you see errors in the logs:
- Collector exited unexpectedly: Check netflow receiver configuration
- No data to upload: Verify network devices are sending netflow data to the correct IP and port
- Permission denied: Check service account has Storage Object Admin role
- Port binding error: Ensure port 2055 is not already in use
Retrieve the Google SecOps service account
Google SecOps uses a unique service account to read data from your GCS bucket. You must grant this service account access to your bucket.
Get the service account email
- Go to SIEM Settings > Feeds.
- Click Add New Feed.
- Click Configure a single feed.
- In the Feed name field, enter a name for the feed (for example,
OpenTelemetry Netflow Logs). - Select Google Cloud Storage V2 as the Source type.
- Select NETFLOW_OTEL as the Log type.
Click Get Service Account. A unique service account email will be displayed. For example:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopy this email address for use in the next step.
Click Next.
Specify values for the following input parameters:
Storage bucket URL: Enter the GCS bucket URI with the prefix path:
gs://netflow-logs-bucket/netflow/
Source deletion option: Select the deletion option according to your preference:
- Never: Never deletes any files after transfers (recommended for testing).
- Delete transferred files: Deletes files after successful transfer.
Delete transferred files and empty directories: Deletes files and empty directories after successful transfer.
Maximum File Age: Include files modified in the last number of days (default is 180 days)
Asset namespace: The asset namespace
Ingestion labels: The label to be applied to the events from this feed
Click Next.
Review your new feed configuration in the Finalize screen, and then click Submit.
Grant IAM permissions to the Google SecOps service account
The Google SecOps service account needs Storage Object Viewer role on your GCS bucket.
- Go to Cloud Storage > Buckets.
- Click on the bucket name
netflow-logs-bucket. - Go to the Permissions tab.
- Click Grant access.
- Provide the following configuration details:
- Add principals: Paste the Google SecOps service account email
- Assign roles: Select Storage Object Viewer
- Click Save.
UDM mapping table
The following table shows how OpenTelemetry Netflow Receiver log fields map to Google SecOps UDM fields:
| OpenTelemetry Field | UDM Field | Description |
|---|---|---|
source.address |
principal.ip |
Source IP address |
source.port |
principal.port |
Source port number |
destination.address |
target.ip |
Destination IP address |
destination.port |
target.port |
Destination port number |
network.transport |
network.ip_protocol |
Transport protocol (tcp, udp) |
network.type |
network.ip_version |
IP version (ipv4, ipv6) |
flow.io.bytes |
network.sent_bytes |
Total bytes transferred |
flow.io.packets |
network.sent_packets |
Total packets transferred |
flow.type |
metadata.product_log_id |
Flow type (NETFLOW_V5, NETFLOW_V9, SFLOW_5, IPFIX) |
flow.start |
metadata.event_timestamp |
Flow start time |
flow.end |
network.session_duration |
Flow end time |
flow.sampler_address |
intermediary.ip |
Netflow exporter IP address |
flow.tcp_flags |
network.tcp_flags |
TCP flags |
Need more help? Get answers from Community members and Google SecOps professionals.