Atlassian Confluence のログを収集する

以下でサポートされています。

このドキュメントでは、Atlassian Confluence ログを Google Security Operations に取り込む方法について説明します。パーサーは、まず Atlassian Confluence ログ用に設計された正規表現(grok パターン)を使用して、未加工のログ メッセージからフィールドを抽出します。grok 解析が失敗した場合、またはログが JSON 形式の場合、コードはメッセージを JSON として解析しようとします。最後に、抽出されたフィールドが Google SecOps UDM スキーマにマッピングされ、追加のコンテキストで拡充されます。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス
  • 監査ログへのアクセス権を持つ Atlassian Confluence Cloud アカウント、または管理者権限を持つ Confluence Data Center/Server
  • GCP ベースの方法の場合: GCP(GCS、IAM、Cloud Run、Pub/Sub、Cloud Scheduler)への特権アクセス
  • Bindplane メソッドの場合: Windows Server 2016 以降、または systemd を使用する Linux ホスト

統合オプションの概要

このガイドでは、2 つの統合パスについて説明します。

  • オプション 1: Bindplane + Syslog 経由の Confluence Data Center/Server
  • オプション 2: GCP Cloud Run 関数 + GCS を介した Confluence Cloud 監査ログ(JSON 形式)

Confluence のデプロイタイプとインフラストラクチャに最適なオプションを選択します。

オプション 1: Bindplane + Syslog 経由の Confluence Data Center/Server

このオプションは、syslog 経由でログを Bindplane エージェントに送信し、そのログを Google SecOps に転送するように Confluence Data Center または Server を構成します。

Google SecOps の取り込み認証ファイルを取得する

  1. Google SecOps コンソールにログインします。
  2. [SIEM 設定] > [収集エージェント] に移動します。
  3. [ダウンロード] をクリックして、取り込み認証ファイルをダウンロードします。
  4. Bindplane エージェントがインストールされるシステムにファイルを安全に保存します。

Google SecOps のお客様 ID を取得する

  1. Google SecOps コンソールにログインします。
  2. [SIEM 設定] > [プロファイル] に移動します。
  3. [組織の詳細情報] セクションから [お客様 ID] をコピーして保存します。

Bindplane エージェントをインストールする

次の手順に沿って、Windows または Linux オペレーティング システムに Bindplane エージェントをインストールします。

Windows のインストール

  1. 管理者としてコマンド プロンプトまたは PowerShell を開きます。
  2. 次のコマンドを実行します。

    msiexec /i "https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/observiq-otel-collector.msi" /quiet
    
  3. インストールが完了するまで待ちます。

  4. 次のコマンドを実行して、インストールの内容を確認します。

    sc query observiq-otel-collector
    

サービスは RUNNING と表示されます。

Linux のインストール

  1. root 権限または sudo 権限でターミナルを開きます。
  2. 次のコマンドを実行します。

    sudo sh -c "$(curl -fsSlL https://github.com/observIQ/bindplane-otel-collector/releases/latest/download/install_unix.sh)" install_unix.sh
    
  3. インストールが完了するまで待ちます。

  4. 次のコマンドを実行して、インストールの内容を確認します。

    sudo systemctl status observiq-otel-collector
    

サービスが [アクティブ(実行中)] と表示されます。

その他のインストール リソース

その他のインストール オプションとトラブルシューティングについては、Bindplane エージェントのインストール ガイドをご覧ください。

Syslog を取り込んで Google SecOps に送信するように Bindplane エージェントを構成する

構成ファイルを見つける

  • Linux:

    sudo nano /etc/bindplane-agent/config.yaml
    
  • Windows:

    notepad "C:\Program Files\observIQ OpenTelemetry Collector\config.yaml"
    

構成ファイルを編集します。

  1. config.yaml の内容全体を次の構成に置き換えます。

    receivers:
      udplog:
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/confluence_logs:
        compression: gzip
        creds_file_path: '/etc/bindplane-agent/ingestion-auth.json'
        customer_id: 'YOUR_CUSTOMER_ID'
        endpoint: malachiteingestion-pa.googleapis.com
        log_type: ATLASSIAN_CONFLUENCE
        raw_log_field: body
        ingestion_labels:
          service: confluence
    
    service:
      pipelines:
        logs/confluence:
          receivers:
            - udplog
          exporters:
            - chronicle/confluence_logs
    

構成パラメータ

  • 各プレースホルダを次のように置き換えます。

    • listen_address: 自社のインフラストラクチャでの必要性に応じて、ポートと IP アドレスを置き換えます。0.0.0.0:514 を使用して、ポート 514 のすべてのインターフェースをリッスンします。
    • creds_file_path: 認証ファイルが保存されたパスに更新します。
      • Linux: /etc/bindplane-agent/ingestion-auth.json
      • Windows: C:\Program Files\observIQ OpenTelemetry Collector\ingestion-auth.json
    • customer_id: YOUR_CUSTOMER_ID は、前の手順の実際のお客様 ID に置き換えます。
    • endpoint: リージョン エンドポイント URL:
      • 米国: malachiteingestion-pa.googleapis.com
      • ヨーロッパ: europe-malachiteingestion-pa.googleapis.com
      • アジア: asia-southeast1-malachiteingestion-pa.googleapis.com

構成ファイルを保存する

編集が完了したら、ファイルを保存します。

  • Linux: Ctrl+OEnterCtrl+X の順に押します。
  • Windows: [ファイル>保存] をクリックします。

Bindplane エージェントを再起動して変更を適用する

Linux で Bindplane エージェントを再起動する

  1. Linux で Bindplane エージェントを再起動するには、次のコマンドを実行します。

    sudo systemctl restart observiq-otel-collector
    
  2. サービスが実行されていることを確認します。

    sudo systemctl status observiq-otel-collector
    
  3. ログでエラーを確認します。

    sudo journalctl -u observiq-otel-collector -f
    

Windows で Bindplane エージェントを再起動する

  1. Windows で Bindplane エージェントを再起動するには、次のいずれかのオプションを選択します。

    • 管理者としてコマンド プロンプトまたは PowerShell を使用します。

      net stop observiq-otel-collector && net start observiq-otel-collector
      
    • Services コンソールを使用する場合:

      1. Win+R キーを押して「services.msc」と入力し、Enter キーを押します。
      2. observIQ OpenTelemetry Collector を見つけます。
      3. 右クリックして [再起動] を選択します。
      4. サービスが実行されていることを確認します。

        sc query observiq-otel-collector
        
      5. ログでエラーを確認します。

        type "C:\Program Files\observIQ OpenTelemetry Collector\log\collector.log"
        

Confluence Data Center/Server で Syslog 転送を構成する

  1. ファイルをログに書き込むように Confluence を構成します(デフォルトの動作)。
  2. rsyslog が存在しない場合はインストールします。

    sudo apt-get install rsyslog  # Debian/Ubuntu
    sudo yum install rsyslog      # RHEL/CentOS
    
  3. rsyslog 構成ファイル /etc/rsyslog.d/confluence.conf を作成します。

    # Forward Confluence logs to Bindplane
    $ModLoad imfile
    
    # Application logs
    $InputFileName /opt/atlassian/confluence/logs/atlassian-confluence.log
    $InputFileTag confluence-app:
    $InputFileStateFile stat-confluence-app
    $InputFileSeverity info
    $InputFileFacility local0
    $InputRunFileMonitor
    
    # Audit logs (JSON format in DC/Server)
    $InputFileName <confluence-home-directory>/log/audit/audit.log
    $InputFileTag confluence-audit:
    $InputFileStateFile stat-confluence-audit
    $InputFileSeverity info
    $InputFileFacility local1
    $InputRunFileMonitor
    
    # Forward to Bindplane agent
    *.* @@BINDPLANE_AGENT_IP:514
    
    • BINDPLANE_AGENT_IP は、Bindplane エージェントの IP アドレス(192.168.1.100 など)に置き換えます。
    • Confluence のインストールに基づいてログファイルのパスを調整します。
      • 通常、アプリケーション ログは <confluence-install>/logs/ または <local-home>/logs/ です。
      • 監査ログ: <confluence-home-directory>/log/audit/(JSON 形式)
      • Confluence のホーム ディレクトリを確認するには、[Settings] > [General Configuration] > [System Information] に移動し、[Confluence Home] または [Local Home] を探します。
  4. rsyslog を再起動します。

    sudo systemctl restart rsyslog
    

オプション B: Log4j2 Syslog 転送を構成する

このオプションを使用するには、Log4j2 構成を変更する必要があります。シンプルにするには、オプション A(rsyslog)をおすすめします。

  1. SSH または RDP 経由で Confluence サーバーにログインします。
  2. Log4j2 構成ファイルは次の場所にあります。

    <confluence-install>/confluence/WEB-INF/classes/log4j2.xml
    
  3. 構成ファイルを編集して Syslog アペンダーを追加します。

    <Configuration>
      <Appenders>
        <!-- Existing appenders -->
        <Syslog name="SyslogAppender" 
                host="BINDPLANE_AGENT_IP" 
                port="514" 
                protocol="UDP"
                format="RFC5424"
                facility="LOCAL0">
          <PatternLayout pattern="%d{ISO8601} %p [%t] [%c{1}] %m%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <Root level="info">
          <AppenderRef ref="SyslogAppender"/>
          <!-- Other appender refs -->
        </Root>
    
        <!-- Audit logger -->
        <Logger name="com.atlassian.confluence.event.events.security.AuditEvent" 
                level="info" 
                additivity="false">
          <AppenderRef ref="SyslogAppender"/>
        </Logger>
      </Loggers>
    </Configuration>
    
    • BINDPLANE_AGENT_IP は、Bindplane エージェントの IP アドレス(192.168.1.100 など)に置き換えます。
  4. Confluence を再起動して変更を適用します。

    sudo systemctl restart confluence
    

オプション 2: GCP Cloud Run 関数と GCS を介した Confluence Cloud 監査ログ

このメソッドは、GCP Cloud Run functions を使用して、Confluence Audit REST API 経由で監査ログを定期的に取得し、Google SecOps の取り込み用に GCS に保存します。

Confluence Cloud API 認証情報を収集する

  1. Atlassian アカウントにログインします。
  2. https://id.atlassian.com/manage-profile/security/api-tokens にアクセスします。
  3. [Create API Token] をクリックします。
  4. トークンのラベルを入力します(例: Google Security Operations Integration)。
  5. [作成] をクリックします。
  6. API トークンをコピーして安全に保存します。
  7. Confluence Cloud サイトの URL(例: https://yoursite.atlassian.net)をメモします。
  8. Atlassian アカウントのメールアドレス(認証に使用)をメモします。

権限を確認する

アカウントに必要な権限があることを確認するには:

  1. Confluence Cloud にログインします。
  2. 右上の設定アイコン(⚙️)をクリックします。
  3. 左側のナビゲーションに [モニタリング] > [監査ログ] が表示されている場合は、必要な権限があります。
  4. このオプションが表示されない場合は、管理者にお問い合わせのうえ、Confluence 管理者権限を付与してもらってください。

テスト API アクセス

  • 統合に進む前に、認証情報をテストします。

    # Replace with your actual credentials
    CONFLUENCE_EMAIL="your-email@example.com"
    CONFLUENCE_API_TOKEN="your-api-token"
    CONFLUENCE_URL="https://yoursite.atlassian.net"
    
    # Test API access
    curl -u "${CONFLUENCE_EMAIL}:${CONFLUENCE_API_TOKEN}" \
      -H "Accept: application/json" \
      "${CONFLUENCE_URL}/wiki/rest/api/audit"
    

Google Cloud Storage バケットを作成する

  1. Google Cloud Console に移動します。
  2. プロジェクトを選択するか、新しいプロジェクトを作成します。
  3. ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
  4. [バケットを作成] をクリックします。
  5. 次の構成情報を提供してください。

    設定
    バケットに名前を付ける グローバルに一意の名前(confluence-audit-logs など)を入力します。
    ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。
    ロケーション ロケーションを選択します(例: us-central1)。
    ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ)
    アクセス制御 均一(推奨)
    保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする
  6. [作成] をクリックします。

Cloud Run functions のサービス アカウントを作成する

Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。

サービス アカウントの作成

  1. GCP Console で、[IAM と管理>サービス アカウント] に移動します。
  2. [サービス アカウントを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • サービス アカウント名: 「confluence-audit-collector-sa」と入力します。
    • サービス アカウントの説明: 「Service account for Cloud Run function to collect Confluence Cloud audit logs」と入力します。
  4. [作成して続行] をクリックします。
  5. [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
    1. [ロールを選択] をクリックします。
    2. [ストレージ オブジェクト管理者] を検索して選択します。
    3. [+ 別のロールを追加] をクリックします。
    4. [Cloud Run 起動元] を検索して選択します。
    5. [+ 別のロールを追加] をクリックします。
    6. [Cloud Functions 起動元] を検索して選択します。
  6. [続行] をクリックします。
  7. [完了] をクリックします。

これらのロールは、次の目的で必要です。

  • Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
  • Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
  • Cloud Functions 起動元: 関数の呼び出しを許可する

GCS バケットに対する IAM 権限を付与する

GCS バケットに対する書き込み権限をサービス アカウントに付与します。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: サービス アカウントのメールアドレス(例: confluence-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。
    • ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
  6. [保存] をクリックします。

Pub/Sub トピックの作成

Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。

  1. GCP Console で、[Pub/Sub> トピック] に移動します。
  2. [トピックを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • トピック ID: 「confluence-audit-trigger」と入力します。
    • その他の設定はデフォルトのままにします。
  4. [作成] をクリックします。

ログを収集する Cloud Run 関数を作成する

Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Confluence Cloud Audit API からログを取得して GCS に書き込みます。

  1. GCP Console で、[Cloud Run] に移動します。
  2. [サービスを作成] をクリックします。
  3. [関数] を選択します(インライン エディタを使用して関数を作成します)。
  4. [構成] セクションで、次の構成の詳細を指定します。

    設定
    サービス名 confluence-audit-collector
    リージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。
    ランタイム [Python 3.12] 以降を選択します。
  5. [トリガー(省略可)] セクションで、次の操作を行います。

    1. [+ トリガーを追加] をクリックします。
    2. [Cloud Pub/Sub] を選択します。
    3. [Cloud Pub/Sub トピックを選択してください] で、[confluence-audit-trigger] を選択します。
    4. [保存] をクリックします。
  6. [認証] セクションで、次の操作を行います。

    1. [認証が必要] を選択します。
    2. Identity and Access Management(IAM)を確認します。
  7. 下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。

  8. [セキュリティ] タブに移動します。

    • サービス アカウント: confluence-audit-collector-sa を選択します。
  9. [コンテナ] タブに移動します。

    1. [変数とシークレット] をクリックします。
    2. 環境変数ごとに [+ 変数を追加] をクリックします。
    変数名 値の例 説明
    GCS_BUCKET confluence-audit-logs GCS バケット名
    GCS_PREFIX confluence-audit ログファイルの接頭辞
    STATE_KEY confluence-audit/state.json 状態ファイルのパス
    CONFLUENCE_URL https://yoursite.atlassian.net Confluence サイトの URL
    CONFLUENCE_EMAIL your-email@example.com Atlassian アカウントのメールアドレス
    CONFLUENCE_API_TOKEN your-api-token-here API トークン
    MAX_RECORDS 1000 実行あたりの最大レコード数
  10. [変数とシークレット] セクションで、[リクエスト] まで下にスクロールします。

    • リクエストのタイムアウト: 600 秒(10 分)を入力します。
  11. [設定] タブに移動します。

    • [リソース] セクションで次の操作を行います。
      • メモリ: 512 MiB 以上を選択します。
      • CPU: [1] を選択します。
  12. [リビジョン スケーリング] セクションで、次の操作を行います。

    • [インスタンスの最小数] に「0」と入力します。
    • インスタンスの最大数: 100 と入力します(または、予想される負荷に基づいて調整します)。
  13. [作成] をクリックします。

  14. サービスが作成されるまで待ちます(1 ~ 2 分)。

  15. サービスを作成すると、インライン コードエディタが自動的に開きます。

関数コードを追加する

  1. [関数のエントリ ポイント] に「main」と入力します。
  2. インライン コードエディタで、次の 2 つのファイルを作成します。

    • 最初のファイル: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'confluence-audit/')
    STATE_KEY = os.environ.get('STATE_KEY', 'confluence-audit/state.json')
    CONFLUENCE_URL = os.environ.get('CONFLUENCE_URL')
    CONFLUENCE_EMAIL = os.environ.get('CONFLUENCE_EMAIL')
    CONFLUENCE_API_TOKEN = os.environ.get('CONFLUENCE_API_TOKEN')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Confluence Cloud audit logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, CONFLUENCE_URL, CONFLUENCE_EMAIL, CONFLUENCE_API_TOKEN]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=24)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_base=CONFLUENCE_URL,
                email=CONFLUENCE_EMAIL,
                api_token=CONFLUENCE_API_TOKEN,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_base: str, email: str, api_token: str, start_time_ms: int, end_time_ms: int, max_records: int):
        """
        Fetch logs from Confluence Cloud Audit API with pagination and rate limiting.
    
        Args:
            api_base: Confluence site URL
            email: Atlassian account email
            api_token: API token
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up URL
        base_url = api_base.rstrip('/')
    
        # Build authentication header
        auth_string = f"{email}:{api_token}"
        auth_bytes = auth_string.encode('utf-8')
        auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')
        headers = {
            'Authorization': f'Basic {auth_b64}',
            'Accept': 'application/json',
            'User-Agent': 'GoogleSecOps-ConfluenceCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request URL
            url = f"{base_url}/wiki/rest/api/audit?startDate={start_time_ms}&endDate={end_time_ms}&start={start_index}&limit=100"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                page_results = data.get('results', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # creationDate is in Unix milliseconds
                        event_time_ms = event.get('creationDate')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < 100:
                    print(f"Reached last page (size={current_size} < limit=100)")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records[:max_records], newest_time
    
    • 2 つ目のファイル: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. [デプロイ] をクリックして、関数を保存してデプロイします。

  4. デプロイが完了するまで待ちます(2 ~ 3 分)。

Cloud Scheduler ジョブの作成

Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。

  1. GCP Console で、[Cloud Scheduler] に移動します。
  2. [ジョブを作成] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 confluence-audit-collector-hourly
    リージョン Cloud Run functions と同じリージョンを選択する
    周波数 0 * * * *(1 時間ごとに正時)
    タイムゾーン タイムゾーンを選択します(UTC を推奨)。
    ターゲット タイプ Pub/Sub
    トピック confluence-audit-trigger を選択
    メッセージ本文 {}(空の JSON オブジェクト)
  4. [作成] をクリックします。

スケジュールの頻度のオプション

  • ログの量とレイテンシの要件に基づいて頻度を選択します。

    頻度 CRON 式 ユースケース
    5 分毎 */5 * * * * 大容量、低レイテンシ
    15 分ごと */15 * * * * 検索量が普通
    1 時間ごと 0 * * * * 標準(推奨)
    6 時間ごと 0 */6 * * * 少量、バッチ処理
    毎日 0 0 * * * 履歴データの収集

統合をテストする

  1. Cloud Scheduler コンソールで、ジョブを見つけます。
  2. [強制実行] をクリックして、ジョブを手動でトリガーします。
  3. 数秒待ちます。
  4. Cloud Run > サービスに移動します。
  5. confluence-audit-collector をクリックします。
  6. [Logs] タブをクリックします。
  7. 関数が正常に実行されたことを確認します。次の内容を確認します。

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. [Cloud Storage] > [バケット] に移動します。

  9. バケット名をクリックします。

  10. confluence-audit/ フォルダに移動します。

  11. 現在のタイムスタンプで新しい .ndjson ファイルが作成されたことを確認します。

ログにエラーが表示された場合:

  • HTTP 401: 環境変数で API 認証情報を確認する
  • HTTP 403: アカウントに Confluence 管理者権限があることを確認する
  • HTTP 429: レート制限 - 関数はバックオフで自動的に再試行されます
  • 環境変数が不足している: 必要な変数がすべて設定されていることを確認します

Google SecOps サービス アカウントを取得する

Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。

サービス アカウントのメールアドレスを取得する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Confluence Cloud Audit Logs)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [Log type] で [Atlassian Confluence] を選択します。
  7. [サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. このメールアドレスをコピーして、次のステップで使用します。

Google SecOps サービス アカウントに IAM 権限を付与する

Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
    • ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
  6. [保存] をクリックします。

Confluence のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Confluence Cloud Audit Logs)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [Log type] で [Atlassian Confluence] を選択します。
  7. [次へ] をクリックします。
  8. 次の入力パラメータの値を指定します。

    • ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。

      gs://confluence-audit-logs/confluence-audit/
      
      • 次のように置き換えます。

        • confluence-audit-logs: GCS バケット名。
        • confluence-audit: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
      • 例:

        • ルートバケット: gs://company-logs/
        • 接頭辞あり: gs://company-logs/confluence-audit/
        • サブフォルダあり: gs://company-logs/confluence/audit/
    • Source deletion option: 必要に応じて削除オプションを選択します。

      • なし: 転送後にファイルを削除しません(テストにおすすめ)。
      • 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
      • 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。

    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。

    • アセットの名前空間: アセットの名前空間

    • Ingestion labels: このフィードのイベントに適用されるラベル。

  9. [次へ] をクリックします。

  10. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング 論理
エージェント read_only_udm.network.http.user_agent 「agent」フィールドから取得された値。
app_protocol read_only_udm.network.application_protocol 「app_protocol」フィールドから派生します。「app_protocol」に「HTTPS」、「HTTP」、「SSH」、「RDP」が含まれている場合は、対応するプロトコルが使用されます。それ以外の場合は、デフォルトで「UNKNOWN_APPLICATION_PROTOCOL」になります。
app_protocol read_only_udm.network.application_protocol_version 「app_protocol」フィールドから取得された値。
auditType.action read_only_udm.security_result.action 「auditType.action」フィールドから派生します。「auditType.action」に「successful」が含まれている場合、値は「ALLOW」に設定されます。「restricted」が含まれている場合、値は「BLOCK」に設定されます。
auditType.action read_only_udm.security_result.summary 「auditType」が空でなく、「auditType_area」が「SECURITY」の場合、「auditType.action」フィールドから取得された値。
auditType.actionI18nKey read_only_udm.metadata.product_event_type 「auditType」が空でない場合、「auditType.actionI18nKey」フィールドから取得された値。
auditType.area read_only_udm.security_result.detection_fields.value 「auditType.area」フィールドから取得された値が、検出フィールドの「value」フィールドに割り当てられます。この検出フィールドの「key」フィールドは「auditType area」に設定されています。このマッピングは、「auditType」が空でない場合に実行されます。
auditType.category read_only_udm.security_result.category_details 「auditType」が空でない場合、「auditType.category」フィールドから取得された値。
auditType.categoryI18nKey read_only_udm.security_result.detection_fields.value 「auditType.categoryI18nKey」フィールドから取得された値が、検出フィールドの「value」フィールドに割り当てられます。この検出フィールドの「key」フィールドは「auditType categoryI18nKey」に設定されています。このマッピングは、「auditType」が空でない場合に実行されます。
auditType.level read_only_udm.security_result.detection_fields.value 「auditType.level」フィールドから取得された値が、検出フィールドの「value」フィールドに割り当てられます。この検出フィールドの「key」フィールドは「auditType level」に設定されています。このマッピングは、「auditType」が空でない場合に実行されます。
author.displayName read_only_udm.principal.user.user_display_name 「author.displayName」フィールドから取得された値。
author.externalCollaborator read_only_udm.security_result.about.resource.attribute.labels.value 「author.externalCollaborator」フィールドから取得された値が、「key」フィールドが「externalCollaborator」に設定されたラベルの「value」フィールドに割り当てられます。
author.id read_only_udm.principal.user.userid 「author.type」が「user」で、「principal_user_present」が「false」の場合、「author.id」フィールドから取得された値。
author.isExternalCollaborator read_only_udm.security_result.about.resource.attribute.labels.value 「author.isExternalCollaborator」フィールドから取得された値が、ラベルの「value」フィールドに割り当てられます。このラベルの「key」フィールドは「isExternalCollaborator」に設定されています。
author.name read_only_udm.principal.user.user_display_name 「author.type」が「user」で、「principal_user_present」が「false」の場合、「author.name」フィールドから取得された値。
bytes_in read_only_udm.network.received_bytes 「bytes_in」フィールドに数字が含まれている場合は、その値が使用されます。それ以外の場合は、デフォルトの 0 になります。
category read_only_udm.security_result.category_details 「category」フィールドから取得された値。
changedValues read_only_udm.principal.resource.attribute.labels 「changedValues」の各要素を反復処理し、「changedValue [index] [key]」のようなキーと、「changedValues」配列の対応する値から値を持つラベルを作成します。
作成日 read_only_udm.metadata.event_timestamp 「creationDate」フィールドから取得された値。UNIX または UNIX_MS タイムスタンプとして解析されます。
extraAttributes read_only_udm.principal.resource.attribute.labels 「extraAttributes」の各要素を反復処理し、「name」フィールドと「nameI18nKey」フィールドに基づいてキーを持つラベルと、対応する「value」フィールドの値を作成します。
http_verb read_only_udm.network.http.method 「http_verb」フィールドから取得された値。
ip read_only_udm.target.ip 「ip」フィールドから取得された値。
principal_host read_only_udm.principal.hostname 「principal_host」フィールドから取得された値。
referral_url read_only_udm.network.http.referral_url 「referral_url」フィールドから取得された値。
remoteAddress read_only_udm.principal.ip 「remoteAddress」フィールドから取得された値。IP アドレスとして解析されます。
response_code read_only_udm.network.http.response_code 「response_code」フィールドから取得された値。
session_duration read_only_udm.additional.fields.value.string_value 「session_duration」フィールドから取得した値が、ラベルの「string_value」フィールドに割り当てられます。このラベルの「key」フィールドは「Session Duration」に設定されています。
source read_only_udm.principal.ip 「source」フィールドから取得された値。IP アドレスとして解析されます。
src_ip read_only_udm.principal.ip 「remoteAddress」が空の場合、「src_ip」フィールドから取得された値。
概要 read_only_udm.security_result.summary 「summary」フィールドから取得された値。
sysAdmin read_only_udm.security_result.about.resource.attribute.labels.value 「sysAdmin」フィールドから取得された値が、ラベルの「value」フィールドに割り当てられます。このラベルの「key」フィールドは「sysAdmin」に設定されています。
superAdmin read_only_udm.security_result.about.resource.attribute.labels.value 「superAdmin」フィールドから取得された値が、ラベルの「value」フィールドに割り当てられます。このラベルの「key」フィールドは「superAdmin」に設定されています。
target_url read_only_udm.target.url 「target_url」フィールドから取得された値。
timestamp read_only_udm.metadata.event_timestamp 「timestamp」フィールドから取得された値。日付と時刻の文字列として解析されます。
user_id read_only_udm.principal.user.userid 「user_id」フィールドから取得された値。
read_only_udm.metadata.event_type このフィールドの値は一連のチェックによって決定され、デフォルトは「GENERIC_EVENT」です。「principal_host」、「user_id」、「has_principal」、「author.type」などの他のフィールドの有無と内容に基づいて、「NETWORK_HTTP」、「USER_UNCATEGORIZED」、「STATUS_UPDATE」などの特定の値に設定されます。
read_only_udm.metadata.vendor_name 「ATLASSIAN」に設定します。
read_only_udm.metadata.product_name 「CONFLUENCE」に設定します。
read_only_udm.metadata.log_type 「ATLASSIAN_CONFLUENCE」に設定します。
read_only_udm.principal.user.user_display_name このフィールドの値は、コンテキストに応じて「author.displayName」または「affectedObject.name」のいずれかから取得できます。
read_only_udm.target.process.pid このフィールドの値は、コンテキストに応じて「principal_host」または「pid」のいずれかから取得できます。
read_only_udm.principal.resource.attribute.labels このフィールドには、「affectedObjects」、「changedValues」、「extraAttributes」などのフィールドから派生したさまざまなラベルが入力されます。これらのラベルのキーと値は、これらのフィールドの特定のコンテンツに基づいて動的に生成されます。

ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。