Akamai SIEM Connector ログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Akamai SIEM Connector ログを Google Security Operations に取り込む方法について説明します。Akamai SIEM 統合は、SIEM 統合 API を介して Akamai プラットフォームのセキュリティ イベントを JSON 形式で提供します。この統合では、AWS Lambda を使用して Akamai API からイベントを定期的に取得し、S3 に保存します。Google SecOps はこのイベントを取り込みます。

始める前に

  • Google SecOps インスタンス
  • Akamai Control Center への特権アクセス(Manage SIEM ユーザーロールを使用)
  • SIEM API サービスが有効になっている Akamai API 認証情報(読み取り / 書き込みアクセス レベル)
  • AWS(S3、IAM、Lambda、EventBridge)への特権アクセス

Akamai Control Center で SIEM 統合を有効にする

  1. Akamai Control Center にログインします。
  2. [WEB & DATA CENTER SECURITY>Security Configuration] に移動します。
  3. SIEM データを収集するセキュリティ構成(適切なバージョン)を開きます。
  4. [詳細設定] をクリックし、[SIEM 統合のデータ収集] を開きます。
  5. [オン] をクリックして SIEM を有効にします。
  6. データをエクスポートするセキュリティ ポリシーを選択します。

    • すべてのセキュリティ ポリシー: セキュリティ構成内のセキュリティ ポリシーのいずれかまたはすべてに違反するイベントの SIEM データを送信します。
    • 特定のセキュリティ ポリシー: プルダウン リストから 1 つ以上の特定のセキュリティ ポリシーを選択します。
  7. 省略可: アカウント プロテクターを使用しており、暗号化されていないユーザー名を含める場合は、[ユーザー名を含める] チェックボックスをオンにします。

  8. 省略可: 特定の保護タイプとアクションに属するイベントを除外する場合は、[例外を追加] をクリックし、SIEM で収集しない保護と関連するアクションを選択します。

  9. [保存] をクリックします。

  10. [SIEM 統合] セクションから [セキュリティ構成 ID(configId)] をコピーして保存します。これは Lambda の構成に必要です。

SIEM 統合用の Akamai API 認証情報を作成する

  1. Akamai Control Center にログインします。
  2. [ACCOUNT ADMIN > Identity & access > API clients] に移動します。
  3. [API クライアントを作成] をクリックします。
  4. 次の構成情報を提供してください。

    • API クライアント名: わかりやすい名前を入力します(例: Google SecOps Poller)。
    • API サービス: [SIEM] を選択し、アクセスレベルを [読み取り / 書き込み] に設定します。
  5. [API クライアントを作成] をクリックします。

  6. 次の認証情報をコピーして安全に保存します。

    • クライアント トークン
    • Client Secret
    • アクセス トークン
    • ホスト(例: example.luna.akamaiapis.net

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: akamai-siem-logs)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] で [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [.csv ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、今後の参照に備えます。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションで [権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索します。
  18. ポリシーを選択します。
  19. [次へ] をクリックします。
  20. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソールで、[IAM] > [ポリシー] > [ポリシーの作成] > [JSON] タブに移動します。
  2. 次のポリシーをコピーして貼り付け、akamai-siem-logs をバケット名に置き換えます。

    ポリシー JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::akamai-siem-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::akamai-siem-logs/akamai-siem/state.json"
        }
      ]
    }
    
  3. [次へ] をクリックします。

  4. ポリシー名 AkamaiSIEMtoS3Policy を入力し、[ポリシーを作成] をクリックします。

  5. [IAM]> [ロール]> [ロールを作成] に移動します。

  6. [AWS service] を選択します。

  7. ユースケースとして [Lambda] を選択します。

  8. [次へ] をクリックします。

  9. 作成したポリシー AkamaiSIEMtoS3Policy を検索して選択します。

  10. [次へ] をクリックします。

  11. ロール名「AkamaiSIEMtoS3Role」を入力し、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 AkamaiSIEMtoS3Function
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール 既存のロールを使用する
    既存のロール AkamaiSIEMtoS3Role
  4. [関数を作成] をクリックします。

  5. 関数が作成されたら、[コード] タブを開き、スタブを削除して次のコードを貼り付けます。

    import json
    import boto3
    import os
    import urllib3
    import hmac
    import hashlib
    import base64
    from datetime import datetime
    from urllib.parse import urlparse, urljoin
    
    # Configuration from environment variables
    S3_BUCKET = os.environ['S3_BUCKET']
    S3_PREFIX = os.environ.get('S3_PREFIX', 'akamai-siem/')
    STATE_KEY = os.environ.get('STATE_KEY', 'akamai-siem/state.json')
    
    AKAMAI_HOST = os.environ['AKAMAI_HOST']
    AKAMAI_CLIENT_TOKEN = os.environ['AKAMAI_CLIENT_TOKEN']
    AKAMAI_CLIENT_SECRET = os.environ['AKAMAI_CLIENT_SECRET']
    AKAMAI_ACCESS_TOKEN = os.environ['AKAMAI_ACCESS_TOKEN']
    AKAMAI_CONFIG_IDS = os.environ['AKAMAI_CONFIG_IDS'].split(',')
    
    LIMIT = int(os.environ.get('LIMIT', '10000'))
    
    s3_client = boto3.client('s3')
    http = urllib3.PoolManager()
    
    def load_state():
        """Load offset state from S3"""
        try:
            response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read().decode('utf-8'))
        except s3_client.exceptions.NoSuchKey:
            return {}
        except Exception as e:
            print(f"Error loading state: {e}")
            return {}
    
    def save_state(state):
        """Save offset state to S3"""
        try:
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=STATE_KEY,
                Body=json.dumps(state, indent=2).encode('utf-8'),
                ContentType='application/json'
            )
        except Exception as e:
            print(f"Error saving state: {e}")
    
    def make_edgegrid_auth_header(url, method='GET'):
        """Create EdgeGrid authentication header"""
        timestamp = datetime.utcnow().strftime('%Y%m%dT%H:%M:%S+0000')
        nonce = base64.b64encode(os.urandom(16)).decode('utf-8')
    
        parsed_url = urlparse(url)
        relative_url = parsed_url.path
        if parsed_url.query:
            relative_url += '?' + parsed_url.query
    
        auth_header = f'EG1-HMAC-SHA256 ' \
                    f'client_token={AKAMAI_CLIENT_TOKEN};' \
                    f'access_token={AKAMAI_ACCESS_TOKEN};' \
                    f'timestamp={timestamp};' \
                    f'nonce={nonce};'
    
        data_to_sign = '\t'.join([
            method,
            parsed_url.scheme,
            parsed_url.netloc,
            relative_url,
            '',  # Request body for GET
            '',  # No additional headers
        ])
    
        signing_key = hmac.new(
            AKAMAI_CLIENT_SECRET.encode('utf-8'),
            timestamp.encode('utf-8'),
            hashlib.sha256
        ).digest()
    
        auth_signature = base64.b64encode(
            hmac.new(
                signing_key,
                (data_to_sign + auth_header).encode('utf-8'),
                hashlib.sha256
            ).digest()
        ).decode('utf-8')
    
        return auth_header + f'signature={auth_signature}'
    
    def fetch_akamai_events(config_id, offset=None):
        """Fetch events from Akamai SIEM API"""
        base_url = f'https://{AKAMAI_HOST}'
        endpoint = f'/siem/v1/configs/{config_id}'
    
        params = f'limit={LIMIT}'
        if offset:
            params += f'&offset={offset}'
    
        url = f'{base_url}{endpoint}?{params}'
    
        try:
            headers = {
                'Authorization': make_edgegrid_auth_header(url)
            }
    
            response = http.request('GET', url, headers=headers, timeout=120)
    
            if response.status != 200:
                print(f"Error response {response.status}: {response.data.decode('utf-8')}")
                return [], offset
    
            # Parse multi-JSON response (newline-delimited JSON)
            lines = response.data.decode('utf-8').strip().split('\n')
            events = []
            new_offset = offset
    
            for line in lines:
                if not line.strip():
                    continue
                try:
                    obj = json.loads(line)
    
                    # Check if this is offset context (metadata object with offset)
                    if 'offset' in obj and ('total' in obj or 'responseContext' in obj):
                        new_offset = obj.get('offset')
                        continue
    
                    # This is an event
                    events.append(obj)
                except json.JSONDecodeError as e:
                    print(f"Warning: Failed to parse line: {e}")
                    continue
    
            return events, new_offset
    
        except Exception as e:
            print(f"Error fetching events for config {config_id}: {e}")
            return [], offset
    
    def lambda_handler(event, context):
        """Lambda handler - fetches Akamai events and writes to S3"""
        print(f"Starting Akamai SIEM fetch at {datetime.utcnow().isoformat()}Z")
    
        state = load_state()
        total_events = 0
    
        for config_id in AKAMAI_CONFIG_IDS:
            config_id = config_id.strip()
            if not config_id:
                continue
    
            print(f"Fetching events for config: {config_id}")
    
            current_offset = state.get(config_id)
            events, new_offset = fetch_akamai_events(config_id, current_offset)
    
            if events:
                print(f"Fetched {len(events)} events for config {config_id}")
    
                # Write events to S3 as newline-delimited JSON
                timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f'{S3_PREFIX}{config_id}/{timestamp}.json'
    
                payload = '\n'.join(json.dumps(event) for event in events)
    
                try:
                    s3_client.put_object(
                        Bucket=S3_BUCKET,
                        Key=s3_key,
                        Body=payload.encode('utf-8'),
                        ContentType='application/json'
                    )
                    print(f"Wrote {len(events)} events to s3://{S3_BUCKET}/{s3_key}")
    
                    # Update offset only after successful write
                    if new_offset:
                        state[config_id] = new_offset
                        total_events += len(events)
                except Exception as e:
                    print(f"Error writing to S3: {e}")
            else:
                print(f"No new events for config {config_id}")
    
        # Save updated state
        save_state(state)
    
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'Successfully processed {total_events} events',
                'configs_processed': len(AKAMAI_CONFIG_IDS)
            })
        }
    
  6. [Deploy] をクリックしてコードを保存します。

  7. [構成] > [環境変数] に移動します。

  8. [編集] をクリックします。

  9. 次のそれぞれについて、[環境変数を追加] をクリックします。

    環境変数

    キー 値の例
    S3_BUCKET akamai-siem-logs
    S3_PREFIX akamai-siem/
    STATE_KEY akamai-siem/state.json
    AKAMAI_HOST example.luna.akamaiapis.net
    AKAMAI_CLIENT_TOKEN your-client-token
    AKAMAI_CLIENT_SECRET your-client-secret
    AKAMAI_ACCESS_TOKEN your-access-token
    AKAMAI_CONFIG_IDS 12345,67890
    LIMIT 10000
  10. [保存] をクリックします。

  11. [構成] > [全般設定] に移動します。

  12. [編集] をクリックします。

  13. [Timeout] を [5 minutes (300 seconds)] に変更します。

  14. [保存] をクリックします。

EventBridge スケジュールを作成する

  1. [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
  2. 次の構成情報を提供してください。

    • スケジュール名: 「AkamaiSIEMtoS3-5min」と入力します。
    • スケジュール パターン: [繰り返しスケジュール] を選択します。
    • スケジュール タイプ: [レートベースのスケジュール] を選択します。
    • レート式: 「5」と入力して [] を選択します。
  3. [次へ] をクリックします。

  4. 次の構成情報を提供してください。

    • ターゲット: [AWS Lambda 呼び出し] を選択します。
    • Lambda 関数: AkamaiSIEMtoS3Function を選択します。
  5. [次へ] をクリックします。

  6. [次へ] をクリックします(オプションの設定はスキップします)。

  7. 確認して [スケジュールを作成] をクリックします。

Akamai SIEM Connector のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: Akamai SIEM Connector)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [Akamai SIEM Connector] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。

    • S3 URI: s3://akamai-siem-logs/akamai-siem/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • アセットの名前空間: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。

  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。