Zendesk CRM ログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Zendesk 顧客管理(CRM)ログを Google Security Operations に取り込む方法について説明します。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス。
  • Zendesk への特権アクセス。
  • AWS(S3、Identity and Access Management(IAM)、Lambda、EventBridge)への特権アクセス。

Zendesk の前提条件を取得する

  1. プランと役割を確認する
    1. API トークン / OAuth クライアントを作成するには、Zendesk 管理者である必要があります。監査ログ API は、Enterprise プランでのみ利用できます。(アカウントが Enterprise ではない場合は、RESOURCESaudit_logs をスキップします)。
  2. API トークン アクセスを有効にする(1 回限り)
    1. 管理センターで、[アプリとインテグレーション> API > API 構成] に移動します。
    2. [API トークン アクセスを許可する] を有効にします。
  3. API トークンを生成する(基本認証用)
    1. [Apps and integrations] > [APIs] > [API tokens] に移動します。
    2. [API トークンを追加] をクリック > (必要に応じて)説明を追加 > [保存] をクリックします。
    3. API トークンをコピーして保存します(再度表示することはできません)。
    4. このトークンで認証される管理者メールアドレスを保存します。
      • Lambda で使用される基本認証の形式: email_address/token:api_token
  4. (省略可)OAuth クライアントを作成する(API トークンではなく Bearer 認証の場合)
    1. [アプリとインテグレーション> API > OAuth クライアント > OAuth クライアントを追加] に移動します。
    2. [名前]、[一意の識別子](自動)、[リダイレクト URL](API でのみトークンを生成する場合はプレースホルダにできます)を入力し、[保存] をクリックします。
    3. 統合のアクセス トークンを作成し、このガイドで必要な最小限のスコープを付与します。
      • tickets:read(増分チケットの場合)
      • auditlogs:read(監査ログの場合。Enterprise のみ)
      • 不明な場合は、read も読み取り専用アクセスに使用できます。
    4. アクセス トークンをコピーし(ZENDESK_BEARER_TOKEN に貼り付け)、クライアント ID/シークレットを安全に記録します(今後のトークン更新フローで使用します)。
  5. Zendesk のベース URL を記録する

    • https://<your_subdomain>.zendesk.com を使用します(ZENDESK_BASE_URL 環境変数に貼り付けます)。

    後で使うためにコピーして保存する内容

    • ベース URL(例: https://acme.zendesk.com
    • 管理者ユーザーのメールアドレス(API トークン認証の場合)
    • API トークンAUTH_MODE=token を使用している場合)
    • または OAuth アクセス トークンAUTH_MODE=bearer を使用している場合)
    • (省略可): ライフサイクル管理用の OAuth クライアント ID/シークレット

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: zendesk-crm-logs)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] として [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [.csv ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションの [権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索します。
  18. ポリシーを選択します。
  19. [次へ] をクリックします。
  20. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソールで、[IAM] > [ポリシー] に移動します。
  2. [ポリシーを作成> [JSON] タブ] をクリックします。
  3. 次のポリシーをコピーして貼り付けます。
  4. ポリシー JSON(別のバケット名を入力した場合は zendesk-crm-logs を置き換えます):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::zendesk-crm-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::zendesk-crm-logs/zendesk/crm/state.json"
        }
      ]
    }
    
  5. [次へ] > [ポリシーを作成] をクリックします。

  6. [IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。

  7. 新しく作成したポリシーを関連付けます。

  8. ロールに「ZendeskCRMToS3Role」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 zendesk_crm_to_s3
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール ZendeskCRMToS3Role
  4. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(zendesk_crm_to_s3.py)を貼り付けます。

    #!/usr/bin/env python3
    
    import os, json, time, base64
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    S3_BUCKET    = os.environ["S3_BUCKET"]
    S3_PREFIX    = os.environ.get("S3_PREFIX", "zendesk/crm/")
    STATE_KEY    = os.environ.get("STATE_KEY", "zendesk/crm/state.json")
    BASE_URL     = os.environ["ZENDESK_BASE_URL"].rstrip("/")  # e.g. https://your_subdomain.zendesk.com
    AUTH_MODE    = os.environ.get("AUTH_MODE", "token").lower()  # token|bearer
    EMAIL        = os.environ.get("ZENDESK_EMAIL", "")
    API_TOKEN    = os.environ.get("ZENDESK_API_TOKEN", "")
    BEARER       = os.environ.get("ZENDESK_BEARER_TOKEN", "")
    RESOURCES    = [r.strip() for r in os.environ.get("RESOURCES", "audit_logs,incremental_tickets").split(",") if r.strip()]
    MAX_PAGES    = int(os.environ.get("MAX_PAGES", "20"))
    LOOKBACK     = int(os.environ.get("LOOKBACK_SECONDS", "3600"))  # 1h default
    HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60"))
    HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3"))
    
    s3 = boto3.client("s3")
    
    def _headers() -> dict:
        if AUTH_MODE == "bearer" and BEARER:
            return {"Authorization": f"Bearer {BEARER}", "Accept": "application/json"}
        if AUTH_MODE == "token" and EMAIL and API_TOKEN:
            token = base64.b64encode(f"{EMAIL}/token:{API_TOKEN}".encode()).decode()
            return {"Authorization": f"Basic {token}", "Accept": "application/json"}
        raise RuntimeError("Invalid auth settings: provide token (EMAIL + API_TOKEN) or BEARER")
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            b = obj["Body"].read()
            return json.loads(b) if b else {"audit_logs": {}, "incremental_tickets": {}}
        except Exception:
            return {"audit_logs": {}, "incremental_tickets": {}}
    
    def _put_state(st: dict) -> None:
        s3.put_object(
            Bucket=S3_BUCKET, Key=STATE_KEY,
            Body=json.dumps(st, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _http_get_json(url: str) -> dict:
        attempt = 0
        while True:
            try:
                req = Request(url, method="GET")
                for k, v in _headers().items():
                    req.add_header(k, v)
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    return json.loads(r.read().decode("utf-8"))
            except HTTPError as e:
                if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES:
                    ra = 1 + attempt
                    try:
                        ra = int(e.headers.get("Retry-After", ra))
                    except Exception:
                        pass
                    time.sleep(max(1, ra))
                    attempt += 1
                    continue
                raise
            except URLError:
                if attempt < HTTP_RETRIES:
                    time.sleep(1 + attempt)
                    attempt += 1
                    continue
                raise
    
    def _put_page(payload: dict, resource: str) -> str:
        ts = time.gmtime()
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-zendesk-{resource}.json"
        s3.put_object(
            Bucket=S3_BUCKET, Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def fetch_audit_logs(state: dict):
        """GET /api/v2/audit_logs.json with pagination via `next_page` (Zendesk)."""
        next_url = state.get("next_url") or f"{BASE_URL}/api/v2/audit_logs.json?page=1"
        pages = 0
        written = 0
        last_next = None
        while pages < MAX_PAGES and next_url:
            data = _http_get_json(next_url)
            _put_page(data, "audit_logs")
            written += len(data.get("audit_logs", []))
            last_next = data.get("next_page")
            next_url = last_next
            pages += 1
        return {"resource": "audit_logs", "pages": pages, "written": written, "next_url": last_next}
    
    def fetch_incremental_tickets(state: dict):
        """Cursor-based incremental export: /api/v2/incremental/tickets/cursor.json (pagination via `links.next`)."""
        next_link = state.get("next")
        if not next_link:
            start = int(time.time()) - LOOKBACK
            next_link = f"{BASE_URL}/api/v2/incremental/tickets/cursor.json?start_time={start}"
        pages = 0
        written = 0
        last_next = None
        while pages < MAX_PAGES and next_link:
            data = _http_get_json(next_link)
            _put_page(data, "incremental_tickets")
            written += len(data.get("tickets", []))
            links = data.get("links") or {}
            next_link = links.get("next")
            last_next = next_link
            pages += 1
        return {"resource": "incremental_tickets", "pages": pages, "written": written, "next": last_next}
    
    def lambda_handler(event=None, context=None):
        state = _get_state()
        summary = []
    
        if "audit_logs" in RESOURCES:
            res = fetch_audit_logs(state.get("audit_logs", {}))
            state["audit_logs"] = {"next_url": res.get("next_url")}
            summary.append(res)
    
        if "incremental_tickets" in RESOURCES:
            res = fetch_incremental_tickets(state.get("incremental_tickets", {}))
            state["incremental_tickets"] = {"next": res.get("next")}
            summary.append(res)
    
        _put_state(state)
        return {"ok": True, "summary": summary}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. [構成] > [環境変数] に移動します。

  6. [編集>新しい環境変数を追加] をクリックします。

  7. 次の表に示す環境変数を入力し、サンプル値を自分の値に置き換えます。

    環境変数

    キー 値の例
    S3_BUCKET zendesk-crm-logs
    S3_PREFIX zendesk/crm/
    STATE_KEY zendesk/crm/state.json
    ZENDESK_BASE_URL https://your_subdomain.zendesk.com
    AUTH_MODE token
    ZENDESK_EMAIL analyst@example.com
    ZENDESK_API_TOKEN <api_token>
    ZENDESK_BEARER_TOKEN <leave empty unless using OAuth bearer>
    RESOURCES audit_logs,incremental_tickets
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
    HTTP_TIMEOUT 60
  8. 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。

  9. [CONFIGURATION] タブを選択します。

  10. [全般設定] パネルで、[編集] をクリックします。

  11. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour)。
    • ターゲット: Lambda 関数 zendesk_crm_to_s3
    • 名前: zendesk_crm_to_s3-1h
  3. [スケジュールを作成] をクリックします。

(省略可)Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する

  1. AWS コンソール > IAM > ユーザー > ユーザーを追加 に移動します。
  2. [ユーザーを追加] をクリックします。
  3. 次の構成の詳細を入力します。
    • ユーザー: 「secops-reader」と入力します。
    • アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
  4. [ユーザーを作成] をクリックします。
  5. 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::zendesk-crm-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::zendesk-crm-logs"
        }
      ]
    }
    
  7. 名前 = secops-reader-policy

  8. [ポリシーを作成> 検索/選択> 次へ> 権限を追加] をクリックします。

  9. secops-reader のアクセスキーを作成します。[セキュリティ認証情報] > [アクセスキー] に移動します。

  10. [アクセスキーを作成] をクリックします。

  11. .CSV をダウンロードします。(これらの値はフィードに貼り付けます)。

Zendesk CRM のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: Zendesk CRM logs)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [Zendesk CRM] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://zendesk-crm-logs/zendesk/crm/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • アセットの名前空間: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。