Akamai Cloud Monitor ログを収集する

以下でサポートされています。

このドキュメントでは、Google Cloud Storage を使用して Akamai Cloud Monitor(ロードバランサ、トラフィック シェイパー、ADC)ログを Google Security Operations に取り込む方法について説明します。Akamai は JSON イベントを HTTPS エンドポイントに push します。API Gateway と Cloud Functions のレシーバは、イベントを GCS(JSONL、gz)に書き込みます。パーサーは JSON ログを UDM に変換します。JSON ペイロードからフィールドを抽出し、データ型の変換を行い、UDM スキーマに合わせてフィールドの名前を変更し、カスタム フィールドと URL 構築の特定のロジックを処理します。また、フィールドの有無に基づいてエラー処理と条件付きロジックが組み込まれています。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス
  • Cloud Storage API が有効になっている GCP プロジェクト
  • GCS バケットを作成および管理する権限
  • GCS バケットの IAM ポリシーを管理する権限
  • Cloud Functions、Pub/Sub トピック、API Gateway を作成する権限
  • Akamai Control Center と Property Manager への特権アクセス

Google Cloud Storage バケットを作成する

  1. Google Cloud Console に移動します。
  2. プロジェクトを選択するか、新しいプロジェクトを作成します。
  3. ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
  4. [バケットを作成] をクリックします。
  5. 次の構成情報を提供してください。

    設定
    バケットに名前を付ける グローバルに一意の名前(akamai-cloud-monitor など)を入力します。
    ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。
    ロケーション ロケーションを選択します(例: us-central1)。
    ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ)
    アクセス制御 均一(推奨)
    保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする
  6. [作成] をクリックします。

Akamai Cloud Monitor の構成の詳細を収集する

Akamai Control Center から次の情報が必要です。

  • Property Manager のプロパティ名
  • 収集する必要がある Cloud Monitor データセット
  • Webhook 認証用の共有シークレット トークン(省略可)

Cloud Functions のサービス アカウントを作成する

Cloud Functions には、GCS バケットに書き込む権限を持つサービス アカウントが必要です。

サービス アカウントの作成

  1. GCP Console で、[IAM と管理>サービス アカウント] に移動します。
  2. [サービス アカウントを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • サービス アカウント名: 「akamai-cloud-monitor-sa」と入力します。
    • サービス アカウントの説明: 「Service account for Cloud Function to collect Akamai Cloud Monitor logs」と入力します。
  4. [作成して続行] をクリックします。
  5. [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次の操作を行います。
    1. [ロールを選択] をクリックします。
    2. [ストレージ オブジェクト管理者] を検索して選択します。
    3. [+ 別のロールを追加] をクリックします。
    4. [Cloud Run 起動元] を検索して選択します。
    5. [+ 別のロールを追加] をクリックします。
    6. [Cloud Functions 起動元] を検索して選択します。
  6. [続行] をクリックします。
  7. [完了] をクリックします。

これらのロールは、次の目的で必要です。

  • Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
  • Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
  • Cloud Functions 起動元: 関数の呼び出しを許可する

GCS バケットに対する IAM 権限を付与する

GCS バケットに対する書き込み権限をサービス アカウントに付与します。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: サービス アカウントのメールアドレス(例: akamai-cloud-monitor-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。
    • ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
  6. [保存] をクリックします。

Akamai ログを受信する Cloud Functions を作成する

Cloud Functions は Akamai Cloud Monitor から HTTP POST リクエストを受信し、ログを GCS に書き込みます。

  1. GCP Console で、[Cloud Functions] に移動します。
  2. [関数を作成] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    環境 [第 2 世代] を選択します。
    関数名 akamai-cloud-monitor-receiver
    リージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。
  4. [トリガー] セクションで次の操作を行います。

    • トリガーのタイプ: [HTTPS] を選択します。
    • 認証: [未認証の呼び出しを許可] を選択します(Akamai は未認証のリクエストを送信します)。
  5. [保存] をクリックして、トリガーの設定を保存します。

  6. [ランタイム、ビルド、接続、セキュリティの設定] を開きます。

  7. [ランタイム] セクションで、次の操作を行います。

    • 割り当てられたメモリ: [512 MiB] を選択します。
    • タイムアウト: 600 秒(10 分)を入力します。
    • ランタイム サービス アカウント: サービス アカウント(akamai-cloud-monitor-sa)を選択します。
  8. [ランタイム環境変数] セクションで、次の項目ごとに [+ 変数を追加] をクリックします。

    変数名 値の例
    GCS_BUCKET akamai-cloud-monitor
    GCS_PREFIX akamai/cloud-monitor/json
    INGEST_TOKEN random-shared-secret(任意)
  9. [次へ] をクリックして、コードエディタに進みます。

  10. [ランタイム] プルダウンで [Python 3.12] を選択します。

関数コードを追加する

  1. [関数のエントリ ポイント] に「main」と入力します。
  2. インライン コードエディタで、次の 2 つのファイルを作成します。

    • 最初のファイル: main.py:
    import os
    import json
    import gzip
    import io
    import uuid
    import datetime as dt
    from google.cloud import storage
    import functions_framework
    
    GCS_BUCKET = os.environ.get("GCS_BUCKET")
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "akamai/cloud-monitor/json").strip("/") + "/"
    INGEST_TOKEN = os.environ.get("INGEST_TOKEN")  # optional shared secret
    
    storage_client = storage.Client()
    
    def _write_jsonl_gz(objs: list) -> str:
        """Write JSON objects to GCS as gzipped JSONL."""
        timestamp = dt.datetime.utcnow()
        key = f"{timestamp:%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz"
    
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for o in objs:
                gz.write((json.dumps(o, separators=(",", ":")) + "\n").encode())
        buf.seek(0)
    
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}{key}")
        blob.upload_from_file(buf, content_type="application/json", content_encoding="gzip")
    
        return f"gs://{GCS_BUCKET}/{GCS_PREFIX}{key}"
    
    def _parse_records_from_request(request) -> list:
        """Parse JSON records from HTTP request body."""
        body = request.get_data(as_text=True)
    
        if not body:
            return []
    
        try:
            data = json.loads(body)
        except Exception:
            # Accept line-delimited JSON as pass-through
            try:
                return [json.loads(line) for line in body.splitlines() if line.strip()]
            except Exception:
                return []
    
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return []
    
    @functions_framework.http
    def main(request):
        """
        Cloud Function HTTP handler for Akamai Cloud Monitor logs.
    
        Args:
            request: Flask request object
    
        Returns:
            Tuple of (response_body, status_code, headers)
        """
        # Optional shared-secret verification via query parameter (?token=...)
        if INGEST_TOKEN:
            token = request.args.get("token")
            if token != INGEST_TOKEN:
                return ("Forbidden", 403)
    
        records = _parse_records_from_request(request)
    
        if not records:
            return ("No content", 204)
    
        try:
            gcs_key = _write_jsonl_gz(records)
    
            response = {
                "ok": True,
                "gcs_key": gcs_key,
                "count": len(records)
            }
    
            return (json.dumps(response), 200, {"Content-Type": "application/json"})
    
        except Exception as e:
            print(f"Error writing to GCS: {str(e)}")
            return (f"Internal server error: {str(e)}", 500)
    
    • 2 つ目のファイル: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    
  3. [デプロイ] をクリックして Cloud Functions の関数をデプロイします。

  4. デプロイが完了するまで待ちます(2 ~ 3 分)。

  5. デプロイ後、[トリガー] タブに移動して、[トリガー URL] をコピーします。この URL は Akamai の構成で使用します。

ログを push するように Akamai Cloud Monitor を構成する

  1. Akamai Control Center にログインします。
  2. プロパティ マネージャーでプロパティを開きます。
  3. [ルールを追加> クラウド管理を選択] をクリックします。
  4. Cloud Monitor Instrumentation を追加し、必要なデータセットを選択します。
  5. Cloud Monitor Data Delivery を追加します。
  6. 次の構成情報を提供してください。

    • 配信ホスト名: Cloud Functions のトリガー URL からホスト名を入力します(例: us-central1-your-project.cloudfunctions.net)。
    • 配信 URL パス: Cloud Functions のトリガー URL からのパスと、必要に応じてクエリ トークンを入力します。

      • トークンなし: /akamai-cloud-monitor-receiver
      • トークンあり: /akamai-cloud-monitor-receiver?token=<INGEST_TOKEN>

      • <INGEST_TOKEN> は、Cloud Functions 環境変数で設定した値に置き換えます。

  7. [保存] をクリックします。

  8. [有効化] をクリックして、プロパティ バージョンを有効にします。

Google SecOps サービス アカウントを取得する

Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。

サービス アカウントのメールアドレスを取得する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Akamai Cloud Monitor - GCS)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [Akamai Cloud Monitor] を選択します。
  7. [サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. このメールアドレスをコピーして、次のステップで使用します。

Google SecOps サービス アカウントに IAM 権限を付与する

Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
    • ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
  6. [保存] をクリックします。

Akamai Cloud Monitor のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Akamai Cloud Monitor - GCS)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [Akamai Cloud Monitor] を選択します。
  7. [次へ] をクリックします。
  8. 次の入力パラメータの値を指定します。

    • ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。

      gs://akamai-cloud-monitor/akamai/cloud-monitor/json/
      
      • 次のように置き換えます。

        • akamai-cloud-monitor: GCS バケット名。
        • akamai/cloud-monitor/json: ログが保存される接頭辞パス(Cloud Functions の GCS_PREFIX と一致する必要があります)。
    • Source deletion option: 必要に応じて削除オプションを選択します。

      • なし: 転送後にファイルを削除しません(テストにおすすめ)。
      • 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
      • 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。

    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。

    • アセットの Namespace: akamai.cloud_monitor

    • 取り込みラベル: このフィードのすべてのイベントにラベルが追加されます(source=akamai_cloud_monitorformat=json など)。

  9. [次へ] をクリックします。

  10. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

サポートされている Akamai Cloud Monitor のサンプルログ

  • JSON:

    {
      "UA": "-",
      "accLang": "-",
      "bytes": "3929",
      "cacheStatus": "1",
      "cliIP": "0.0.0.0",
      "cookie": "-",
      "cp": "848064",
      "customField": "-",
      "dnsLookupTimeMSec": "-",
      "errorCode": "-",
      "maxAgeSec": "31536000",
      "objSize": "3929",
      "overheadBytes": "240",
      "proto": "HTTPS",
      "queryStr": "-",
      "range": "-",
      "referer": "-",
      "reqEndTimeMSec": "4",
      "reqHost": "www.example.com",
      "reqId": "1ce83c03",
      "reqMethod": "GET",
      "reqPath": "assets/images/placeholder-tagline.png",
      "reqPort": "443",
      "reqTimeSec": "1622470405.760",
      "rspContentLen": "3929",
      "rspContentType": "image/png",
      "statusCode": "200",
      "tlsOverheadTimeMSec": "0",
      "tlsVersion": "TLSv1.2",
      "totalBytes": "4599",
      "transferTimeMSec": "0",
      "turnAroundTimeMSec": "0",
      "uncompressedSize": "-",
      "version": "1",
      "xForwardedFor": "-"
    }
    

UDM マッピング テーブル

ログフィールド UDM マッピング 論理
accLang network.http.user_agent 「-」または空の文字列でない場合、直接マッピングされます。
city principal.location.city 「-」または空の文字列でない場合、直接マッピングされます。
cliIP principal.ip 空の文字列でない場合、直接マッピングされます。
country principal.location.country_or_region 「-」または空の文字列でない場合、直接マッピングされます。
cp additional.fields キー「cp」で Key-Value ペアとしてマッピングされます。
customField about.ip、about.labels、src.ip Key-Value ペアとして解析されます。「eIp」と「pIp」をそれぞれ src.ip と about.ip にマッピングするための特別な処理。その他のキーは、about 内のラベルとしてマッピングされます。
errorCode security_result.summary、security_result.severity 存在する場合は、security_result.severity を「ERROR」に設定し、値を security_result.summary にマッピングします。
geo.city principal.location.city 都市が「-」または空の文字列の場合、直接マッピングされます。
geo.country principal.location.country_or_region 国が「-」または空の文字列の場合、直接マッピングされます。
geo.lat principal.location.region_latitude 直接マッピングされ、浮動小数点数に変換されます。
geo.long principal.location.region_longitude 直接マッピングされ、浮動小数点数に変換されます。
geo.region principal.location.state 直接マッピングされます。
id metadata.product_log_id 空の文字列でない場合、直接マッピングされます。
message.cliIP principal.ip cliIP が空の文字列の場合、直接マッピングされます。
message.fwdHost principal.hostname 直接マッピングされます。
message.reqHost target.hostname、target.url target.url の構築と target.hostname の抽出に使用されます。
message.reqLen network.sent_bytes 直接マッピングされます。totalBytes が空または「-」の場合、符号なし整数に変換されます。
message.reqMethod network.http.method reqMethod が空の文字列の場合、直接マッピングされます。
message.reqPath target.url target.url に追加されます。
message.reqPort target.port 直接マッピングされ、reqPort が空の文字列の場合は整数に変換されます。
message.respLen network.received_bytes 直接マッピングされ、符号なし整数に変換されます。
message.sslVer network.tls.version 直接マッピングされます。
message.status network.http.response_code 直接マッピングされます。statusCode が空または「-」の場合、整数に変換されます。
message.UA network.http.user_agent UA が「-」または空の文字列の場合、直接マッピングされます。
network.asnum additional.fields キー「asnum」を持つ Key-Value ペアとしてマッピングされます。
network.edgeIP intermediary.ip 直接マッピングされます。
network.network additional.fields キー「network」で Key-Value ペアとしてマッピングされます。
network.networkType additional.fields キー「networkType」で Key-Value ペアとしてマッピングされます。
proto network.application_protocol network.application_protocol の決定に使用されます。
queryStr target.url 「-」または空の文字列でない場合、target.url に追加されます。
リファラー network.http.referral_url, about.hostname 「-」でない場合、直接マッピングされます。抽出されたホスト名は about.hostname にマッピングされます。
reqHost target.hostname、target.url target.url の構築と target.hostname の抽出に使用されます。
reqId metadata.product_log_id、network.session_id ID が空の文字列の場合、直接マッピングされます。network.session_id にもマッピングされます。
reqMethod network.http.method 空の文字列でない場合、直接マッピングされます。
reqPath target.url 「-」でない場合は target.url に追加されます。
reqPort target.port 直接マッピングされ、整数に変換されます。
reqTimeSec metadata.event_timestamp, timestamp イベント タイムスタンプの設定に使用されます。
start metadata.event_timestamp, timestamp reqTimeSec が空の文字列の場合に、イベント タイムスタンプを設定するために使用されます。
statusCode network.http.response_code 直接マッピングされます。「-」または空の文字列でない場合、整数に変換されます。
tlsVersion network.tls.version 直接マッピングされます。
totalBytes network.sent_bytes 空でないか「-」でない場合、直接マッピングされ、符号なし整数に変換されます。
type metadata.product_event_type 直接マッピングされます。
UA network.http.user_agent 「-」または空の文字列でない場合、直接マッピングされます。
version metadata.product_version 直接マッピングされます。
xForwardedFor principal.ip 「-」または空の文字列でない場合、直接マッピングされます。
(パーサー ロジック) metadata.vendor_name 「Akamai」に設定します。
(パーサー ロジック) metadata.product_name 「Cloud Monitor」に設定します。
(パーサー ロジック) metadata.event_type 「NETWORK_HTTP」に設定します。
(パーサー ロジック) metadata.product_version バージョンが空の文字列の場合は「2」に設定します。
(パーサー ロジック) metadata.log_type 「AKAMAI_CLOUD_MONITOR」に設定します。
(パーサー ロジック) network.application_protocol proto または message.proto から決定されます。いずれかに「HTTPS」が含まれている場合は「HTTPS」に設定し、それ以外の場合は「HTTP」に設定します(大文字と小文字を区別しない)。
(パーサー ロジック) security_result.severity errorCode が「-」または空の文字列の場合は、「INFORMATIONAL」に設定します。
(パーサー ロジック) target.url protocol、reqHost(または message.reqHost)、reqPath(または message.reqPath)、queryStr から構築されます。

ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。