Cisco vManage SD-WAN のログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Cisco vManage SD-WAN ログを Google Security Operations に取り込む方法について説明します。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス
  • Cisco vManage SD-WAN 管理コンソールへの特権アクセス
  • AWS(S3、IAM、Lambda、EventBridge)への特権アクセス

Cisco vManage SD-WAN の前提条件(認証情報とベース URL)を収集する

  1. Cisco vManage 管理コンソールにログインします。
  2. [管理] > [設定] > [ユーザー] に移動します。
  3. 新しいユーザーを作成するか、API アクセス権限を持つ既存の管理者ユーザーを使用します。
  4. 次の詳細をコピーして安全な場所に保存します。
    • ユーザー名
    • パスワード
    • vManage ベース URL(例: https://your-vmanage-server:8443

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: cisco-sdwan-logs-bucket)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] で [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションで、[権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索して選択します。
  18. [次へ] をクリックします。
  19. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソールで、[IAM] > [ポリシー] > [ポリシーの作成] > [JSON] タブに移動します。
  2. 次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/state.json"
        }
      ]
    }
    
    • 別のバケット名を入力した場合は、cisco-sdwan-logs-bucket を置き換えます。
  3. [次へ] > [ポリシーを作成] をクリックします。

  4. [IAM]> [ロール] に移動します。

  5. [ロールを作成> AWS サービス > Lambda] をクリックします。

  6. 新しく作成したポリシーを関連付けます。

  7. ロールに「cisco-sdwan-lambda-role」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 cisco-sdwan-log-collector
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール cisco-sdwan-lambda-role
  4. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(cisco-sdwan-log-collector.py)を入力します。

    import json
    import boto3
    import os
    import urllib3
    import urllib.parse
    from datetime import datetime, timezone
    from botocore.exceptions import ClientError
    
    # Disable SSL warnings for self-signed certificates
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    # Environment variables
    VMANAGE_HOST = os.environ['VMANAGE_HOST']
    VMANAGE_USERNAME = os.environ['VMANAGE_USERNAME']  
    VMANAGE_PASSWORD = os.environ['VMANAGE_PASSWORD']
    S3_BUCKET = os.environ['S3_BUCKET']
    S3_PREFIX = os.environ['S3_PREFIX']
    STATE_KEY = os.environ['STATE_KEY']
    
    s3_client = boto3.client('s3')
    http = urllib3.PoolManager(cert_reqs='CERT_NONE')
    
    class VManageAPI:
        def __init__(self, host, username, password):
            self.host = host.rstrip('/')
            self.username = username
            self.password = password
            self.cookies = None
            self.token = None
    
        def authenticate(self):
            """Authenticate with vManage and get session tokens"""
            try:
                # Login to get JSESSIONID
                login_url = f"{self.host}/j_security_check"
                login_data = urllib.parse.urlencode({
                    'j_username': self.username,
                    'j_password': self.password
                })
    
                response = http.request(
                    'POST',
                    login_url,
                    body=login_data,
                    headers={'Content-Type': 'application/x-www-form-urlencoded'},
                    timeout=30
                )
    
                # Check if login was successful (vManage returns HTML on failure)
                if b'<html>' in response.data or response.status != 200:
                    raise Exception("Authentication failed")
    
                # Extract cookies
                self.cookies = {}
                if 'Set-Cookie' in response.headers:
                    cookie_header = response.headers['Set-Cookie']
                    for cookie in cookie_header.split(';'):
                        if 'JSESSIONID=' in cookie:
                            self.cookies['JSESSIONID'] = cookie.split('JSESSIONID=')[1].split(';')[0]
                            break
    
                if not self.cookies.get('JSESSIONID'):
                    raise Exception("Failed to get JSESSIONID")
    
                # Get XSRF token
                token_url = f"{self.host}/dataservice/client/token"
                headers = {
                    'Content-Type': 'application/json',
                    'Cookie': f"JSESSIONID={self.cookies['JSESSIONID']}"
                }
    
                response = http.request('GET', token_url, headers=headers, timeout=30)
    
                if response.status == 200:
                    self.token = response.data.decode('utf-8')
                    return True
                else:
                    raise Exception(f"Failed to get XSRF token: {response.status}")
    
            except Exception as e:
                print(f"Authentication error: {e}")
                return False
    
        def get_headers(self):
            """Get headers for API requests"""
            return {
                'Content-Type': 'application/json',
                'Cookie': f"JSESSIONID={self.cookies['JSESSIONID']}",
                'X-XSRF-TOKEN': self.token
            }
    
        def get_audit_logs(self, last_timestamp=None):
            """Get audit logs from vManage"""
            try:
                url = f"{self.host}/dataservice/auditlog"
                headers = self.get_headers()
    
                # Build query for recent logs
                query = {
                    "query": {
                        "condition": "AND",
                        "rules": []
                    },
                    "size": 10000
                }
    
                # Add timestamp filter if provided
                if last_timestamp:
                    # Convert timestamp to epoch milliseconds for vManage API
                    if isinstance(last_timestamp, str):
                        try:
                            dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00'))
                            epoch_ms = int(dt.timestamp() * 1000)
                        except:
                            epoch_ms = int(last_timestamp)
                    else:
                        epoch_ms = int(last_timestamp)
    
                    query["query"]["rules"].append({
                        "value": [str(epoch_ms)],
                        "field": "entry_time",
                        "type": "date",
                        "operator": "greater"
                    })
                else:
                    # Get last 1 hour of logs by default
                    query["query"]["rules"].append({
                        "value": ["1"],
                        "field": "entry_time", 
                        "type": "date",
                        "operator": "last_n_hours"
                    })
    
                response = http.request(
                    'POST',
                    url,
                    body=json.dumps(query),
                    headers=headers,
                    timeout=60
                )
    
                if response.status == 200:
                    return json.loads(response.data.decode('utf-8'))
                else:
                    print(f"Failed to get audit logs: {response.status}")
                    return None
    
            except Exception as e:
                print(f"Error getting audit logs: {e}")
                return None
    
        def get_alarms(self, last_timestamp=None):
            """Get alarms from vManage"""
            try:
                url = f"{self.host}/dataservice/alarms"
                headers = self.get_headers()
    
                # Build query for recent alarms
                query = {
                    "query": {
                        "condition": "AND",
                        "rules": []
                    },
                    "size": 10000
                }
    
                # Add timestamp filter if provided
                if last_timestamp:
                    # Convert timestamp to epoch milliseconds for vManage API
                    if isinstance(last_timestamp, str):
                        try:
                            dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00'))
                            epoch_ms = int(dt.timestamp() * 1000)
                        except:
                            epoch_ms = int(last_timestamp)
                    else:
                        epoch_ms = int(last_timestamp)
    
                    query["query"]["rules"].append({
                        "value": [str(epoch_ms)],
                        "field": "entry_time",
                        "type": "date",
                        "operator": "greater"
                    })
                else:
                    # Get last 1 hour of alarms by default
                    query["query"]["rules"].append({
                        "value": ["1"],
                        "field": "entry_time",
                        "type": "date", 
                        "operator": "last_n_hours"
                    })
    
                response = http.request(
                    'POST',
                    url,
                    body=json.dumps(query),
                    headers=headers,
                    timeout=60
                )
    
                if response.status == 200:
                    return json.loads(response.data.decode('utf-8'))
                else:
                    print(f"Failed to get alarms: {response.status}")
                    return None
    
            except Exception as e:
                print(f"Error getting alarms: {e}")
                return None
    
        def get_events(self, last_timestamp=None):
            """Get events from vManage"""
            try:
                url = f"{self.host}/dataservice/events"
                headers = self.get_headers()
    
                # Build query for recent events
                query = {
                    "query": {
                        "condition": "AND",
                        "rules": []
                    },
                    "size": 10000
                }
    
                # Add timestamp filter if provided  
                if last_timestamp:
                    # Convert timestamp to epoch milliseconds for vManage API
                    if isinstance(last_timestamp, str):
                        try:
                            dt = datetime.fromisoformat(last_timestamp.replace('Z', '+00:00'))
                            epoch_ms = int(dt.timestamp() * 1000)
                        except:
                            epoch_ms = int(last_timestamp)
                    else:
                        epoch_ms = int(last_timestamp)
    
                    query["query"]["rules"].append({
                        "value": [str(epoch_ms)],
                        "field": "entry_time",
                        "type": "date",
                        "operator": "greater"
                    })
                else:
                    # Get last 1 hour of events by default
                    query["query"]["rules"].append({
                        "value": ["1"],
                        "field": "entry_time",
                        "type": "date",
                        "operator": "last_n_hours"
                    })
    
                response = http.request(
                    'POST',
                    url,
                    body=json.dumps(query),
                    headers=headers,
                    timeout=60
                )
    
                if response.status == 200:
                    return json.loads(response.data.decode('utf-8'))
                else:
                    print(f"Failed to get events: {response.status}")
                    return None
    
            except Exception as e:
                print(f"Error getting events: {e}")
                return None
    
    def get_last_run_time():
        """Get the last successful run timestamp from S3"""
        try:
            response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            state_data = json.loads(response['Body'].read())
            return state_data.get('last_run_time')
        except ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchKey':
                print("No previous state found, collecting last hour of logs")
                return None
            else:
                print(f"Error reading state: {e}")
                return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_last_run_time(timestamp):
        """Update the last successful run timestamp in S3"""
        try:
            state_data = {
                'last_run_time': timestamp,
                'updated_at': datetime.now(timezone.utc).isoformat()
            }
    
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=STATE_KEY,
                Body=json.dumps(state_data),
                ContentType='application/json'
            )
    
            print(f"Updated state with timestamp: {timestamp}")
    
        except Exception as e:
            print(f"Error updating state: {e}")
    
    def upload_logs_to_s3(logs_data, log_type, timestamp):
        """Upload logs to S3 bucket"""
        try:
            if not logs_data or 'data' not in logs_data or not logs_data['data']:
                print(f"No {log_type} data to upload")
                return
    
            # Create filename with timestamp
            dt = datetime.now(timezone.utc)
            filename = f"{S3_PREFIX}{log_type}/{dt.strftime('%Y/%m/%d')}/{log_type}_{dt.strftime('%Y%m%d_%H%M%S')}.json"
    
            # Upload to S3
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=filename,
                Body=json.dumps(logs_data),
                ContentType='application/json'
            )
    
            print(f"Uploaded {len(logs_data['data'])} {log_type} records to s3://{S3_BUCKET}/{filename}")
    
        except Exception as e:
            print(f"Error uploading {log_type} to S3: {e}")
    
    def lambda_handler(event, context):
        """Main Lambda handler function"""
        print(f"Starting Cisco vManage log collection at {datetime.now(timezone.utc)}")
    
        try:
            # Get last run time
            last_run_time = get_last_run_time()
    
            # Initialize vManage API client
            vmanage = VManageAPI(VMANAGE_HOST, VMANAGE_USERNAME, VMANAGE_PASSWORD)
    
            # Authenticate
            if not vmanage.authenticate():
                return {
                    'statusCode': 500,
                    'body': json.dumps('Failed to authenticate with vManage')
                }
    
            print("Successfully authenticated with vManage")
    
            # Current timestamp for state tracking (store as epoch milliseconds)
            current_time = int(datetime.now(timezone.utc).timestamp() * 1000)
    
            # Collect different types of logs
            log_types = [
                ('audit_logs', vmanage.get_audit_logs),
                ('alarms', vmanage.get_alarms), 
                ('events', vmanage.get_events)
            ]
    
            total_records = 0
    
            for log_type, get_function in log_types:
                try:
                    print(f"Collecting {log_type}...")
                    logs_data = get_function(last_run_time)
    
                    if logs_data:
                        upload_logs_to_s3(logs_data, log_type, current_time)
                        if 'data' in logs_data:
                            total_records += len(logs_data['data'])
    
                except Exception as e:
                    print(f"Error processing {log_type}: {e}")
                    continue
    
            # Update state with current timestamp
            update_last_run_time(current_time)
    
            print(f"Collection completed. Total records processed: {total_records}")
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Log collection completed successfully',
                    'total_records': total_records,
                    'timestamp': datetime.now(timezone.utc).isoformat()
                })
            }
    
        except Exception as e:
            print(f"Lambda execution error: {e}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
  5. [構成] > [環境変数] に移動します。

  6. [編集>新しい環境変数を追加] をクリックします。

  7. 次の環境変数を入力し、実際の値に置き換えます。

    キー 値の例
    S3_BUCKET cisco-sdwan-logs-bucket
    S3_PREFIX cisco-sdwan/
    STATE_KEY cisco-sdwan/state.json
    VMANAGE_HOST https://your-vmanage-server:8443
    VMANAGE_USERNAME your-vmanage-username
    VMANAGE_PASSWORD your-vmanage-password
  8. 関数が作成されたら、そのページにとどまるか、[Lambda] > [Functions] > [cisco-sdwan-log-collector] を開きます。

  9. [CONFIGURATION] タブを選択します。

  10. [全般設定] パネルで、[編集] をクリックします。

  11. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour
    • ターゲット: Lambda 関数 cisco-sdwan-log-collector
    • 名前: cisco-sdwan-log-collector-1h
  3. [スケジュールを作成] をクリックします。

省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する

  1. AWS コンソールで、[IAM] > [Users] > [Add users] に移動します。
  2. [ユーザーを追加] をクリックします。
  3. 次の構成の詳細を入力します。
    • ユーザー: secops-reader
    • アクセスタイプ: アクセスキー - プログラムによるアクセス
  4. [ユーザーを作成] をクリックします。
  5. 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]
  6. JSON エディタで、次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["cisco-sdwan/*"]
            }
          }
        },
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/*"
        }
      ]
    }
    
  7. ポリシーに secops-reader-policy という名前を付けます。

  8. [ポリシーを作成] をクリックします。

  9. ユーザーの作成に戻り、secops-reader-policy を検索して選択します。

  10. [Next: Tags] をクリックします。

  11. [次へ: 確認] をクリックします。

  12. [ユーザーを作成] をクリックします。

  13. CSV をダウンロードします(これらの値はフィードに入力されます)。

Cisco vManage SD-WAN のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: Cisco SD-WAN logs)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [Cisco vManage SD-WAN] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://cisco-sdwan-logs-bucket/cisco-sdwan/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • Asset namespace: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。