WP Engine のログを収集する

以下でサポートされています。

このドキュメントでは、Google Cloud Storage V2 を使用して WP Engine ログを Google Security Operations に取り込む方法について説明します。

WP Engine は、セキュリティ、パフォーマンスの最適化、CDN サービスが組み込まれたエンタープライズ グレードのホスティングを提供するマネージド WordPress ホスティング プラットフォームです。アクセスログ、エラーログ、CDN イベントログが生成され、WP Engine API を介して収集できます。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス
  • Cloud Storage API が有効になっている GCP プロジェクト
  • GCS バケットを作成および管理する権限
  • GCS バケットの IAM ポリシーを管理する権限
  • Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
  • API アクセス権限を持つ WP Engine ユーザー ポータルへの特権アクセス
  • API アクセスが有効になっている WP Engine アカウント

Google Cloud Storage バケットを作成する

  1. Google Cloud Console に移動します。
  2. プロジェクトを選択するか、新しいプロジェクトを作成します。
  3. ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
  4. [バケットを作成] をクリックします。
  5. 次の構成情報を提供してください。

    設定
    バケットに名前を付ける グローバルに一意の名前を入力します(例: wpengine-logs)。
    ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。
    ロケーション ロケーションを選択します(例: us-central1)。
    ストレージ クラス Standard(アクセス頻度の高いログにおすすめ)
    アクセス制御 均一(推奨)
    保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする
  6. [作成] をクリックします。

WP Engine API 認証情報を収集する

API 認証情報を生成する

  1. WP Engine ユーザー ポータルにログインします。
  2. プロフィール名をクリックし、[Profile > API Access] に移動します。
  3. [認証情報を生成] をクリックします。
  4. 次の詳細をコピーして安全な場所に保存してください。

    • API Username: 生成された API ユーザー名
    • API パスワード: 生成された API パスワード(1 回のみ表示)

インストール名を取得する

  1. WP Engine ユーザー ポータルにログインします。
  2. ナビゲーション メニューで [サイト] に移動します。
  3. ログを収集するサイトをクリックします。
  4. サイトの概要ページに表示されている [インストール名] をメモします。各環境(本番環境、ステージング環境、開発環境)には、個別のインストール名があります。

テスト API へのアクセス

  • 統合に進む前に、認証情報をテストします。

    # Replace with your actual credentials
    WPE_USER="your-api-username"
    WPE_PASSWORD="your-api-password"
    
    # Test API access - list installs
    curl -v -u "${WPE_USER}:${WPE_PASSWORD}" "https://api.wpengineapi.com/v1/installs"
    

Cloud Run functions のサービス アカウントを作成する

Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。

サービス アカウントの作成

  1. GCP Console で、[IAM と管理>サービス アカウント] に移動します。
  2. [サービス アカウントを作成] をクリックします。
  3. 次の構成の詳細を入力します。
    • サービス アカウント名: 「wpengine-logs-collector-sa」と入力します。
    • サービス アカウントの説明: Service account for Cloud Run function to collect WP Engine logs と入力します。
  4. [作成して続行] をクリックします。
  5. [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
    1. [ロールを選択] をクリックします。
    2. [ストレージ オブジェクト管理者] を検索して選択します。
    3. [+ 別のロールを追加] をクリックします。
    4. [Cloud Run 起動元] を検索して選択します。
    5. [+ 別のロールを追加] をクリックします。
    6. [Cloud Functions 起動元] を検索して選択します。
  6. [続行] をクリックします。
  7. [完了] をクリックします。

これらのロールは、次の操作に必要です。

  • Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
  • Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可します
  • Cloud Functions 起動元: 関数の呼び出しを許可する

GCS バケットに対する IAM 権限を付与する

GCS バケットに対する書き込み権限をサービス アカウントに付与します。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名(例: wpengine-logs)をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を入力します。
    • プリンシパルを追加: サービス アカウントのメールアドレス(例: wpengine-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。
    • ロールを割り当てる: [ストレージ オブジェクト管理者] を選択します。
  6. [保存] をクリックします。

Pub/Sub トピックの作成

Cloud Scheduler がパブリッシュし、Cloud Run functions の関数がサブスクライブする Pub/Sub トピックを作成します。

  1. GCP Console で、[Pub/Sub> トピック] に移動します。
  2. [トピックを作成] をクリックします。
  3. 次の構成の詳細を入力します。
    • トピック ID: 「wpengine-logs-trigger」と入力します。
    • その他の設定はデフォルトのままにします。
  4. [作成] をクリックします。

ログを収集する Cloud Run 関数を作成する

Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、WP Engine API からログを取得して GCS に書き込みます。

  1. GCP Console で、[Cloud Run] に移動します。
  2. [サービスを作成] をクリックします。
  3. [関数] を選択します(インライン エディタを使用して関数を作成します)。
  4. [構成] セクションで、次の構成の詳細を指定します。

    設定
    サービス名 wpengine-logs-collector
    リージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。
    ランタイム [Python 3.12] 以降を選択します。
  5. [トリガー(省略可)] セクションで、次の操作を行います。

    1. [+ トリガーを追加] をクリックします。
    2. [Cloud Pub/Sub] を選択します。
    3. [Cloud Pub/Sub トピックを選択してください] で、トピック wpengine-logs-trigger を選択します。
    4. [保存] をクリックします。
  6. [認証] セクションで次の操作を行います。

    1. [認証が必要] を選択します。
    2. Identity and Access Management(IAM)を確認します。
  7. 下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。

  8. [セキュリティ] タブに移動します。

    • サービス アカウント: サービス アカウント wpengine-logs-collector-sa を選択します。
  9. [コンテナ] タブに移動します。

    1. [変数とシークレット] をクリックします。
    2. 環境変数ごとに [+ 変数を追加] をクリックします。
    変数名 値の例 説明
    GCS_BUCKET wpengine-logs GCS バケット名
    GCS_PREFIX wpengine ログファイルの接頭辞
    STATE_KEY wpengine/state.json 状態ファイルのパス
    WPE_API_USER your-api-username WP Engine API ユーザー名
    WPE_API_PASSWORD your-api-password WP Engine API パスワード
    WPE_INSTALL_ID myinstall WP Engine のインストール名
    MAX_RECORDS 5000 実行あたりの最大レコード数
    PAGE_SIZE 100 1 ページあたりのレコード数
    LOOKBACK_HOURS 24 最初のルックバック期間
  10. [変数とシークレット] セクションで、[リクエスト] まで下にスクロールします。

    • リクエストのタイムアウト: 600 秒(10 分)を入力します。
  11. [設定] タブに移動します。

    • [リソース] セクションで次の操作を行います。
      • メモリ: 512 MiB 以上を選択します。
      • CPU: 1 を選択します。
  12. [リビジョン スケーリング] セクションで、次の操作を行います。

    • [インスタンスの最小数]: 「0」と入力します。
    • インスタンスの最大数: 100 と入力します(または、予想される負荷に基づいて調整します)。
  13. [作成] をクリックします。

  14. サービスが作成されるまで待ちます(1 ~ 2 分)。

  15. サービスが作成されると、インライン コードエディタが自動的に開きます。

関数コードを追加する

  1. [エントリ ポイント] フィールドに「main」と入力します。
  2. インライン コードエディタで、次の 2 つのファイルを作成します。

    • 最初のファイル: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
      timeout=urllib3.Timeout(connect=5.0, read=30.0),
      retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'wpengine')
    STATE_KEY = os.environ.get('STATE_KEY', 'wpengine/state.json')
    WPE_API_USER = os.environ.get('WPE_API_USER')
    WPE_API_PASSWORD = os.environ.get('WPE_API_PASSWORD')
    WPE_INSTALL_ID = os.environ.get('WPE_INSTALL_ID')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '5000'))
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
    
    # WP Engine API base URL
    API_BASE = 'https://api.wpengineapi.com/v1'
    
    # Log types to fetch
    LOG_TYPES = ['access', 'error']
    
    def get_auth_header():
      """Generate HTTP Basic auth header for WP Engine API."""
      credentials = f"{WPE_API_USER}:{WPE_API_PASSWORD}"
      encoded = base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
      return f"Basic {encoded}"
    
    @functions_framework.cloud_event
    def main(cloud_event):
      """
      Cloud Run function triggered by Pub/Sub to fetch WP Engine
      logs and write to GCS.
    
      Args:
        cloud_event: CloudEvent object containing Pub/Sub message
      """
    
      if not all([GCS_BUCKET, WPE_API_USER, WPE_API_PASSWORD, WPE_INSTALL_ID]):
        print('Error: Missing required environment variables')
        return
    
      try:
        bucket = storage_client.bucket(GCS_BUCKET)
    
        # Load state
        state = load_state(bucket, STATE_KEY)
    
        # Determine time window
        now = datetime.now(timezone.utc)
        last_offsets = {}
    
        if isinstance(state, dict) and state.get("last_offsets"):
          last_offsets = state["last_offsets"]
    
        print(f"Fetching logs for install: {WPE_INSTALL_ID}")
    
        auth_header = get_auth_header()
        all_records = []
    
        # Fetch both access and error log types
        for log_type in LOG_TYPES:
          last_offset = last_offsets.get(log_type, 0)
    
          records = fetch_logs(
            auth_header=auth_header,
            install_id=WPE_INSTALL_ID,
            log_type=log_type,
            start_offset=last_offset,
            page_size=PAGE_SIZE,
            max_records=MAX_RECORDS,
          )
    
          # Tag records with log type
          for record in records:
            record['_wpe_log_type'] = log_type
    
          all_records.extend(records)
    
          # Update offset for this log type
          if records:
            last_offsets[log_type] = last_offset + len(records)
    
          print(f"Fetched {len(records)} {log_type} log records")
    
        if not all_records:
          print("No new log records found.")
          save_state(bucket, STATE_KEY, last_offsets)
          return
    
        # Write to GCS as NDJSON
        timestamp = now.strftime('%Y%m%d_%H%M%S')
        object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
        blob = bucket.blob(object_key)
    
        ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in all_records]) + '\n'
        blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
        print(f"Wrote {len(all_records)} records to gs://{GCS_BUCKET}/{object_key}")
    
        # Update state
        save_state(bucket, STATE_KEY, last_offsets)
    
        print(f"Successfully processed {len(all_records)} records")
    
      except Exception as e:
        print(f'Error processing logs: {str(e)}')
        raise
    
    def load_state(bucket, key):
      """Load state from GCS."""
      try:
        blob = bucket.blob(key)
        if blob.exists():
          state_data = blob.download_as_text()
          return json.loads(state_data)
      except Exception as e:
        print(f"Warning: Could not load state: {e}")
    
      return {}
    
    def save_state(bucket, key, last_offsets: dict):
      """Save the last offsets to GCS state file."""
      try:
        state = {'last_offsets': last_offsets}
        blob = bucket.blob(key)
        blob.upload_from_string(
          json.dumps(state, indent=2),
          content_type='application/json'
        )
        print(f"Saved state: last_offsets={last_offsets}")
      except Exception as e:
        print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(auth_header: str, install_id: str, log_type: str, start_offset: int, page_size: int, max_records: int):
      """
      Fetch logs from WP Engine API with offset-based pagination
      and rate limiting.
    
      Args:
        auth_header: HTTP Basic auth header
        install_id: WP Engine install name
        log_type: Log type to fetch (access or error)
        start_offset: Starting offset for pagination
        page_size: Number of records per page
        max_records: Maximum total records to fetch
    
      Returns:
        List of log records
      """
      headers = {
        'Authorization': auth_header,
        'Accept': 'application/json',
        'User-Agent': 'GoogleSecOps-WPEngineCollector/1.0'
      }
    
      records = []
      offset = start_offset
      page_num = 0
      backoff = 1.0
    
      while True:
        page_num += 1
    
        if len(records) >= max_records:
          print(f"Reached max_records limit ({max_records}) for {log_type}")
          break
    
        limit = min(page_size, max_records - len(records))
        url = f"{API_BASE}/installs/{install_id}/logs?type={log_type}&limit={limit}&offset={offset}"
    
        try:
          response = http.request('GET', url, headers=headers)
    
          # Handle rate limiting with exponential backoff
          if response.status == 429:
            retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
            print(f"Rate limited (429). Retrying after {retry_after}s...")
            time.sleep(retry_after)
            backoff = min(backoff * 2, 30.0)
            continue
    
          backoff = 1.0
    
          if response.status != 200:
            print(f"HTTP Error: {response.status}")
            response_text = response.data.decode('utf-8')
            print(f"Response body: {response_text}")
            return []
    
          data = json.loads(response.data.decode('utf-8'))
    
          page_results = data.get('results', data.get('data', []))
    
          if not page_results:
            print(f"No more results (empty page) for {log_type}")
            break
    
          print(f"Page {page_num}: Retrieved {len(page_results)} {log_type} events")
          records.extend(page_results)
    
          offset += len(page_results)
    
          # If we got fewer results than requested, no more pages
          if len(page_results) < limit:
            print(f"Last page reached for {log_type}")
            break
    
        except Exception as e:
          print(f"Error fetching {log_type} logs: {e}")
          return []
    
      print(f"Retrieved {len(records)} total {log_type} records from {page_num} pages")
      return records
    
    • 2 つ目のファイル: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. [デプロイ] をクリックして、関数を保存してデプロイします。

  4. デプロイが完了するまで待ちます(2 ~ 3 分)。

Cloud Scheduler ジョブの作成

Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。

  1. GCP Console で、[Cloud Scheduler] に移動します。
  2. [ジョブを作成] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 wpengine-logs-collector-hourly
    リージョン Cloud Run functions と同じリージョンを選択する
    周波数 0 * * * *(1 時間ごとに正時)
    タイムゾーン タイムゾーンを選択します(UTC を推奨)
    ターゲット タイプ Pub/Sub
    トピック トピック wpengine-logs-trigger を選択します。
    メッセージ本文 {}(空の JSON オブジェクト)
  4. [作成] をクリックします。

スケジュールの頻度のオプション

ログの量とレイテンシの要件に基づいて頻度を選択します。

頻度 CRON 式 ユースケース
5 分おき */5 * * * * 大容量、低レイテンシ
15 分ごと */15 * * * * 検索ボリュームが普通
1 時間ごと 0 * * * * 標準(推奨)
6 時間ごと 0 */6 * * * 少量、バッチ処理
毎日 0 0 * * * 過去のデータの収集

統合をテストする

  1. Cloud Scheduler コンソールで、ジョブを見つけます。
  2. [強制実行] をクリックして、ジョブを手動でトリガーします。
  3. 数秒待ちます。
  4. Cloud Run > サービスに移動します。
  5. wpengine-logs-collector をクリックします。
  6. [Logs] タブをクリックします。
  7. 関数が正常に実行されたことを確認します。以下のものを探します。

    Fetching logs for install: myinstall
    Page 1: Retrieved X access events
    Fetched X access log records
    Page 1: Retrieved X error events
    Fetched X error log records
    Wrote X records to gs://wpengine-logs/wpengine/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. [Cloud Storage] > [バケット] に移動します。

  9. バケット名(wpengine-logs)をクリックします。

  10. wpengine/ フォルダに移動します。

  11. 現在のタイムスタンプで新しい .ndjson ファイルが作成されたことを確認します。

ログにエラーが表示された場合:

  • HTTP 401: 環境変数で API 認証情報を確認する
  • HTTP 403: WP Engine ユーザー ポータルで API アクセスが有効になっていることを確認する
  • HTTP 429: レート制限 - 関数はバックオフで自動的に再試行されます
  • 環境変数が不足している: 必要な変数がすべて設定されていることを確認します

WP Engine のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: WP Engine Logs)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [WPEngine] を選択します。
  7. [サービス アカウントを取得する] をクリックします。一意のサービス アカウントのメールアドレスが表示されます(例:)。

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. このメールアドレスをコピーします。

  9. [次へ] をクリックします。

  10. 次の入力パラメータの値を指定します。

    • ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。

      gs://wpengine-logs/wpengine/
      
      • 次のように置き換えます。
        • wpengine-logs: GCS バケット名。
        • wpengine: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
    • Source deletion option: 必要に応じて削除オプションを選択します。

      • なし: 転送後にファイルを削除しません(テストにおすすめ)。
      • 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
      • 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。

    • ファイルの最大経過日数: 過去の日数内に変更されたファイルを含めます(デフォルトは 180 日)。

    • アセットの名前空間: アセットの名前空間

    • Ingestion labels: このフィードのイベントに適用されるラベル

  11. [次へ] をクリックします。

  12. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

Google SecOps サービス アカウントに IAM 権限を付与する

Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を入力します。
    • プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
    • ロールを割り当てる: [Storage オブジェクト閲覧者] を選択します。
  6. [保存] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
request、sig、blog_id、kind、name、slug、ver additional.fields リクエストのラベル(request_label)、sig(sig_label)、blog_id(blog_id_label)、kind(kind_label)、name(name_label)、slug(slug_label)、ver(ver_label)の各ラベルが空でない場合は、これらのラベルと統合されます。
msg metadata.description 値を直接コピー
metadata.event_type has_principal が true の場合は「STATUS_UPDATE」、それ以外の場合は「GENERIC_EVENT」に設定します。
プロトコル network.application_protocol 値を直接コピー
version network.application_protocol_version 文字列に変換しました
method network.http.method 値を直接コピー
user_agent network.http.parsed_user_agent parseduseragent に変換されました
secure_url network.http.referral_url 値を直接コピー
response_code network.http.response_code 文字列に変換してから整数に変換
user_agent network.http.user_agent 値を直接コピー
received_bytes network.received_bytes 文字列に変換してから uinteger に変換
ホスト名 principal.asset.hostname 値を直接コピー
client_ip principal.asset.ip 値を直接コピー
ホスト名 principal.hostname 値を直接コピー
client_ip principal.ip 値を直接コピー
ポート principal.port 文字列に変換してから整数に変換
pid principal.process.pid 文字列に変換しました
scan_type、scan_value security_result.description scan_value が空でない場合は scan_value の値、それ以外の場合は scan_type の値(scan_type が空でない場合)

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。