Collect Group-IB Threat Intelligence logs
This document explains how to ingest Group-IB Threat Intelligence logs to Google Security Operations using Google Cloud Storage.
Group-IB Threat Intelligence & Attribution (TI&A) is a cyber threat intelligence platform that provides real-time data on threat actors, indicators of compromise (IOCs), malware, command-and-control (C2) infrastructure, compromised credentials, phishing campaigns, and vulnerabilities. It aggregates intelligence from open, deep, and dark web sources, enabling security teams to proactively detect and respond to threats.
Before you begin
Make sure you have the following prerequisites:
- A Google SecOps instance
- A Group-IB TI&A account with API access enabled
- Access to the Group-IB TI&A portal (
tap.group-ib.com) - A Google Cloud project with the following APIs enabled:
- Cloud Storage API
- Cloud Functions API
- Cloud Scheduler API
- Cloud Build API
Generate Group-IB API key
- Sign in to the Group-IB TI&A portal at
https://tap.group-ib.com. - Click your name in the top-right corner and select Profile.
- Click Go to my settings.
- Go to the Security and Access tab.
- In the Personal token section, click Generate new token.
Copy and save the API key securely.
Create Google Cloud Storage bucket
- Go to the Google Cloud Console.
- Select your project or create a new one.
- In the navigation menu, go to Cloud Storage > Buckets.
- Click Create bucket.
Provide the following configuration details:
Setting Value Name your bucket Enter a globally unique name (for example, groupib-ti-logs)Location type Choose based on your needs (Region, Dual-region, Multi-region) Location Select the location (for example, us-central1)Storage class Standard (recommended for frequently accessed logs) Access control Uniform (recommended) Click Create.
Deploy Cloud Function to pull Group-IB data
Create a Cloud Function that pulls threat intelligence data from the Group-IB TI&A API and writes it to the GCS bucket as NDJSON files for Google SecOps to ingest.
Create the Cloud Function
- Go to the Google Cloud Console.
- Go to Cloud Functions.
- Click Create Function.
Provide the following configuration details:
Setting Value Environment 2nd gen Function name groupib-to-gcsRegion Select the region closest to your GCS bucket Trigger type HTTPS Authentication Require authentication Memory allocated 512 MB (increase if fetching large collections) Timeout 540 seconds Click Next.
Set the Runtime to
Python 3.11(or later).Set the Entry point to
main.Replace the contents of
main.pywith the following code:import json import os import requests import functions_framework from datetime import datetime, timedelta, timezone from urllib.parse import urljoin from requests.auth import HTTPBasicAuth from google.cloud import storage GIB_API_URL = os.environ.get('GIB_API_URL', 'https://tap.group-ib.com/api/v2/') GIB_USERNAME = os.environ.get('GIB_USERNAME') GIB_API_KEY = os.environ.get('GIB_API_KEY') GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'groupib-ti') COLLECTIONS = os.environ.get('GIB_COLLECTIONS', 'compromised/account,malware/cnc,apt/threat,hi/threat').split(',') DEFAULT_DAYS_BACK = int(os.environ.get('DEFAULT_DAYS_BACK', '3')) # Max items per request: 100 for most collections, 20 for apt/threat and hi/threat BIG_DATA_COLLECTIONS = ['apt/threat', 'hi/threat'] # File to persist seqUpdate values between runs (use GCS for durability) STATE_BLOB = '_state/seq_updates.json' def load_state(): """Load seqUpdate state from GCS.""" client = storage.Client() bucket = client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_BLOB}") if blob.exists(): return json.loads(blob.download_as_text()) return {} def save_state(state): """Save seqUpdate state to GCS.""" client = storage.Client() bucket = client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_BLOB}") blob.upload_from_string(json.dumps(state), content_type='application/json') def gib_request(session, url, params=None): """Send authenticated GET request to Group-IB API.""" resp = session.get(url, params=params) if resp.status_code == 301: raise Exception('IP not whitelisted by Group-IB. Contact Group-IB support.') resp.raise_for_status() return resp.json() def get_seq_update_by_date(session, collection, date_str): """Get seqUpdate value for a collection starting from a given date.""" url = urljoin(GIB_API_URL, 'sequence_list') data = gib_request(session, url, {'date': date_str, 'collection': collection}) return data.get('list', {}).get(collection) def fetch_collection(session, collection, seq_update): """Fetch all new items from a collection starting after the given seqUpdate.""" limit = 20 if collection in BIG_DATA_COLLECTIONS else 100 url = urljoin(GIB_API_URL, f"{collection}/updated") all_items = [] last_seq = seq_update while True: data = gib_request(session, url, {'seqUpdate': str(last_seq), 'limit': limit}) items = data.get('items', []) if not items: break all_items.extend(items) last_seq = items[-1].get('seqUpdate') return all_items, last_seq def write_to_gcs(items, collection_name): """Write items to GCS as NDJSON.""" if not items: return 0 client = storage.Client() bucket = client.bucket(GCS_BUCKET) timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') safe_name = collection_name.replace('/', '_') blob_path = f"{GCS_PREFIX}/{safe_name}_{timestamp}.ndjson" blob = bucket.blob(blob_path) ndjson = '\n'.join(json.dumps(item, ensure_ascii=False) for item in items) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') return len(items) @functions_framework.http def main(request): """Cloud Function entry point.""" session = requests.Session() session.auth = HTTPBasicAuth(GIB_USERNAME, GIB_API_KEY) session.headers.update({'Accept': '*/*'}) state = load_state() total = 0 for collection in COLLECTIONS: collection = collection.strip() seq_update = state.get(collection) if seq_update is None: default_date = (datetime.now(timezone.utc) - timedelta(days=DEFAULT_DAYS_BACK)).strftime('%Y-%m-%d') seq_update = get_seq_update_by_date(session, collection, default_date) if seq_update is None: continue items, last_seq = fetch_collection(session, collection, seq_update) if items: write_to_gcs(items, collection) total += len(items) state[collection] = last_seq save_state(state) return json.dumps({'status': 'success', 'total_items': total}), 200Replace the contents of
requirements.txtwith the following dependencies:functions-framework==3.* requests>=2.28.0 google-cloud-storage>=2.0.0Click Deploy.
Configure environment variables
- After deployment, go to your function details page.
- Click Edit.
- Expand the Runtime, build, connections and security settings section.
Under Runtime environment variables, add the following variables:
Variable Value GIB_API_URLhttps://tap.group-ib.com/api/v2/(orhttps://bt.group-ib.com/api/v2/depending on your region)GIB_USERNAMEYour Group-IB account email address GIB_API_KEYYour Group-IB API key (personal token) GCS_BUCKETYour GCS bucket name (for example, groupib-ti-logs)GCS_PREFIXPrefix for log files (for example, groupib-ti)GIB_COLLECTIONSComma-separated list of collections to fetch (see below) DEFAULT_DAYS_BACKNumber of days to look back on first run (default: 3)Click Deploy.
Available Group-IB collections
Configure the GIB_COLLECTIONS variable with the collections relevant to your use case:
| Collection | Description | Max limit |
|---|---|---|
compromised/account |
Compromised account credentials (login, password, domain) | 100 |
compromised/card |
Compromised bank cards | 100 |
compromised/mule |
Money mule accounts | 100 |
compromised/imei |
Compromised mobile device IMEIs | 100 |
compromised/file |
Compromised files with malware attribution | 100 |
attacks/ddos |
DDoS attack data (target IPs, domains) | 100 |
attacks/deface |
Website defacement incidents | 100 |
attacks/phishing |
Phishing URLs and domains | 100 |
attacks/phishing_kit |
Phishing kit hashes and target brands | 100 |
bp/phishing |
Brand protection — phishing incidents | 100 |
bp/phishing_kit |
Brand protection — phishing kits | 100 |
hi/threat |
Cybercriminal (HI) threat reports with IOCs and MITRE ATT&CK mapping | 20 |
hi/threat_actor |
Cybercriminal threat actor profiles | 100 |
apt/threat |
APT (nation-state) threat reports with IOCs and MITRE ATT&CK mapping | 20 |
apt/threat_actor |
APT threat actor profiles | 100 |
malware/cnc |
Command-and-Control server indicators (IPs, domains) | 100 |
malware/malware |
Malware descriptions and threat levels | 100 |
malware/targeted_malware |
Targeted malware samples (hashes, filenames) | 100 |
osi/git_leak |
Git repository data leaks | 100 |
osi/public_leak |
Public data leaks (pastes, dumps) | 100 |
osi/vulnerability |
Vulnerability data with CVSS scores | 100 |
suspicious_ip/tor_node |
Tor exit node IP addresses | 100 |
suspicious_ip/open_proxy |
Open proxy IP addresses | 100 |
suspicious_ip/socks_proxy |
SOCKS proxy IP addresses | 100 |
Example: To collect compromised credentials, C2 infrastructure, and APT data:
compromised/account,malware/cnc,apt/threat,hi/threat
Schedule the Cloud Function
Use Cloud Scheduler to trigger the function at regular intervals.
- Go to Cloud Scheduler in the Google Cloud Console.
- Click Create Job.
Provide the following configuration details:
Setting Value Name groupib-to-gcs-scheduleRegion Same region as your Cloud Function Frequency 0 */1 * * *(every hour) or0 0 * * *(daily)Timezone Select your timezone Under Configure the execution:
- Target type: Select HTTP.
- URL: Enter the Cloud Function trigger URL.
- HTTP method: Select POST.
- Auth header: Select Add OIDC token.
- Service account: Select a service account with
roles/cloudfunctions.invokerpermission.
Click Create.
Test the data export
- In the Cloud Scheduler console, click Force Run next to your job to trigger the function manually.
- Check the Cloud Function logs in Cloud Logging to verify that data was fetched from Group-IB.
- Go to Cloud Storage > Buckets in the Google Cloud Console.
- Click your bucket name (for example,
groupib-ti-logs). - Navigate to the prefix folder (for example,
groupib-ti/). Verify that new
.ndjsonfiles are appearing in the bucket.
Retrieve the Google SecOps service account
Google SecOps uses a unique service account to read data from your GCS bucket. You must grant this service account access to your bucket.
Get the service account email
- Go to SIEM Settings > Feeds.
- Click Add New Feed.
- Click Configure a single feed.
- In the Feed name field, enter a name for the feed (for example,
Group-IB Threat Intelligence). - Select Google Cloud Storage V2 as the Source type.
- Select Group-IB Threat Intelligence as the Log type.
Click Get Service Account. A unique service account email is displayed, for example:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopy this email address for use in the next step.
Grant IAM permissions to the Google SecOps service account
The Google SecOps service account needs Storage Object Viewer role on your GCS bucket.
- Go to Cloud Storage > Buckets.
- Click your bucket name (for example,
groupib-ti-logs). - Go to the Permissions tab.
- Click Grant access.
- Provide the following configuration details:
- Add principals: Paste the Google SecOps service account email.
- Assign roles: Select Storage Object Viewer.
Click Save.
Configure a feed in Google SecOps to ingest Group-IB Threat Intelligence logs
- Go to SIEM Settings > Feeds.
- Click Add New Feed.
- Click Configure a single feed.
- In the Feed name field, enter a name for the feed (for example,
Group-IB Threat Intelligence). - Select Google Cloud Storage V2 as the Source type.
- Select Group-IB Threat Intelligence as the Log type.
- Click Next.
Specify values for the following input parameters:
Storage bucket URL: Enter the GCS bucket URI with the prefix path:
gs://groupib-ti-logs/groupib-ti/Replace:
groupib-ti-logs: Your GCS bucket name.groupib-ti: The prefix/folder path where logs are stored.
Source deletion option: Select the deletion option according to your preference:
- Never: Never deletes any files after transfers (recommended for testing).
- Delete transferred files: Deletes files after successful transfer.
- Delete transferred files and empty directories: Deletes files and empty directories after successful transfer.
Maximum File Age: Include files modified in the last number of days. Default is 180 days.
Asset namespace: The asset namespace.
Ingestion labels: The label to be applied to the events from this feed.
Click Next.
Review your new feed configuration in the Finalize screen, and then click Submit.
Need more help? Get answers from Community members and Google SecOps professionals.