收集 Zendesk CRM 記錄

支援的國家/地區:

本文說明如何使用 Google Cloud Storage,將 Zendesk 客戶關係管理 (CRM) 記錄檔擷取至 Google Security Operations。Zendesk CRM 提供客戶支援和支援單管理功能。平台會透過稽核記錄和支援單資料,追蹤顧客互動、支援單和管理活動。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • 已啟用 Cloud Storage API 的 GCP 專案
  • 建立及管理 GCS 值區的權限
  • 管理 Google Cloud Storage 值區 IAM 政策的權限
  • 建立 Cloud Run 函式、Pub/Sub 主題和 Cloud Scheduler 工作的權限
  • Zendesk 的特殊存取權 (必須具備管理員角色才能建立 API 權杖)
  • Zendesk Enterprise 方案 (存取 Audit Logs API 的必要條件)

取得 Zendesk 必要條件

確認方案和角色

您必須是 Zendesk 管理員,才能建立 API 權杖或 OAuth 用戶端。稽核記錄 API 僅適用於企業方案,且每頁最多可傳回 100 筆記錄。如果您的帳戶不是 Enterprise 帳戶,您還是可以收集增量票證資料。

啟用 API 權杖存取權 (一次性)

  1. 管理中心中,依序前往「應用程式和整合」>「API」>「Zendesk API」
  2. 在「Settings」分頁中,啟用「Token Access」

產生 API 權杖 (適用於基本驗證)

  1. 依序前往「應用程式和整合」>「API」>「Zendesk API」
  2. 按一下「新增 API 權杖」按鈕。
  3. 視需要新增 API 權杖說明
  4. 點選「建立」
  5. 立即複製並儲存 API 權杖 (之後就無法再查看)。
  6. 儲存將透過這個權杖驗證的管理員電子郵件地址。

(選用) 建立 OAuth 用戶端 (適用於 Bearer 驗證,而非 API 權杖)

  1. 依序前往「應用程式和整合」>「API」>「Zendesk API」
  2. 按一下「OAuth 用戶端」分頁標籤。
  3. 按一下「新增 OAuth 用戶端」
  4. 填寫「用戶端名稱」、「專屬 ID」 (自動)、「重新導向網址」 (如果只使用 API 鑄造權杖,可以填入預留位置)。
  5. 按一下 [儲存]
  6. 為整合服務建立存取權杖,並授予本指南規定的最低範圍:
    • tickets:read (適用於增量票券)
    • auditlogs:read (適用於稽核記錄;僅限企業版)
  7. 複製存取權杖 (貼到 ZENDESK_BEARER_TOKEN 環境變數中),並安全地記錄用戶端 ID/密鑰 (供日後權杖更新流程使用)。

記錄 Zendesk 基本網址

使用 https://<your_subdomain>.zendesk.com (貼到 ZENDESK_BASE_URL 環境變數中)。

可儲存的內容

  • 基準網址 (例如 https://acme.zendesk.com)
  • 管理員使用者的電子郵件地址 (適用於 API 權杖驗證)
  • API 權杖 (如果使用 AUTH_MODE=token) 或 OAuth 存取權杖 (如果使用 AUTH_MODE=bearer)
  • (選用) 用於生命週期管理的 OAuth 用戶端 ID/密鑰

建立 Google Cloud Storage 值區

  1. 前往 Google Cloud 控制台
  2. 選取專案或建立新專案。
  3. 在導覽選單中,依序前往「Cloud Storage」>「Bucket」
  4. 按一下「建立值區」
  5. 請提供下列設定詳細資料:

    設定
    為 bucket 命名 輸入全域不重複的名稱 (例如 zendesk-crm-logs)
    位置類型 根據需求選擇 (區域、雙區域、多區域)
    位置 選取位置 (例如 us-central1)
    儲存空間級別 標準 (建議用於經常存取的記錄)
    存取控管 統一 (建議)
    保護工具 選用:啟用物件版本管理或保留政策
  6. 點選「建立」

為 Cloud Run 函式建立服務帳戶

Cloud Run 函式需要具備 GCS bucket 寫入權限的服務帳戶,並由 Pub/Sub 叫用。

建立服務帳戶

  1. GCP 主控台中,依序前往「IAM & Admin」(IAM 與管理) >「Service Accounts」(服務帳戶)
  2. 按一下 [Create Service Account] (建立服務帳戶)
  3. 請提供下列設定詳細資料:
    • 服務帳戶名稱:輸入 zendesk-crm-collector-sa
    • 服務帳戶說明:輸入 Service account for Cloud Run function to collect Zendesk CRM logs
  4. 按一下「建立並繼續」
  5. 在「將專案存取權授予這個服務帳戶」部分,新增下列角色:
    1. 按一下「選擇角色」
    2. 搜尋並選取「Storage 物件管理員」
    3. 點選「+ 新增其他角色」
    4. 搜尋並選取「Cloud Run Invoker」
    5. 點選「+ 新增其他角色」
    6. 搜尋並選取「Cloud Functions Invoker」(Cloud Functions 叫用者)
  6. 按一下「繼續」
  7. 按一下 [完成]。

這些角色適用於:

  • Storage 物件管理員:將記錄檔寫入 GCS 值區,並管理狀態檔案
  • Cloud Run 叫用者:允許 Pub/Sub 叫用函式
  • Cloud Functions 叫用者:允許函式叫用

授予 GCS 值區的 IAM 權限

授予服務帳戶 GCS bucket 的寫入權限:

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:輸入服務帳戶電子郵件地址 (例如 zendesk-crm-collector-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 指派角色:選取「Storage 物件管理員」
  6. 按一下 [儲存]

建立 Pub/Sub 主題

建立 Pub/Sub 主題,Cloud Scheduler 會將訊息發布至該主題,而 Cloud Run 函式會訂閱該主題。

  1. GCP Console 中,前往「Pub/Sub」>「Topics」(主題)
  2. 按一下「建立主題」
  3. 請提供下列設定詳細資料:
    • 主題 ID:輸入 zendesk-crm-trigger
    • 其他設定保留預設值。
  4. 點選「建立」

建立 Cloud Run 函式來收集記錄

Cloud Run 函式會由 Cloud Scheduler 的 Pub/Sub 訊息觸發,從 Zendesk API 擷取記錄並寫入 GCS。

  1. 前往 GCP Console 的「Cloud Run」
  2. 按一下「Create service」(建立服務)
  3. 選取「函式」 (使用內嵌編輯器建立函式)。
  4. 在「設定」部分,提供下列設定詳細資料:

    設定
    服務名稱 zendesk-crm-collector
    區域 選取與 GCS bucket 相符的區域 (例如 us-central1)
    執行階段 選取「Python 3.12」以上版本
  5. 在「Trigger (optional)」(觸發條件 (選用)) 專區:

    1. 按一下「+ 新增觸發條件」
    2. 選取「Cloud Pub/Sub」
    3. 在「選取 Cloud Pub/Sub 主題」中,選擇主題 zendesk-crm-trigger
    4. 按一下 [儲存]
  6. 在「Authentication」(驗證) 部分:

    1. 選取「需要驗證」
    2. 檢查 Identity and Access Management (IAM)
  7. 向下捲動並展開「Containers, Networking, Security」

  8. 前往「安全性」分頁:

    • 服務帳戶:選取服務帳戶 zendesk-crm-collector-sa
  9. 前往「容器」分頁:

    1. 按一下「變數與密鑰」
    2. 針對每個環境變數,按一下「+ 新增變數」
    變數名稱 範例值 說明
    GCS_BUCKET zendesk-crm-logs GCS bucket 名稱
    GCS_PREFIX zendesk/crm/ 記錄檔的前置字串
    STATE_KEY zendesk/crm/state.json 狀態檔案路徑
    ZENDESK_BASE_URL https://your_subdomain.zendesk.com Zendesk 基準網址
    AUTH_MODE token 驗證模式 (tokenbearer)
    ZENDESK_EMAIL analyst@example.com API 權杖驗證的管理員電子郵件地址
    ZENDESK_API_TOKEN <api_token> 用於驗證的 API 權杖
    ZENDESK_BEARER_TOKEN <leave empty unless using OAuth bearer> OAuth 不記名權杖 (選用)
    RESOURCES audit_logs,incremental_tickets 要收集的資源
    MAX_PAGES 20 每次執行的頁數上限
    LOOKBACK_SECONDS 3600 初始回溯期
    HTTP_TIMEOUT 60 HTTP 要求逾時
    HTTP_RETRIES 3 HTTP 重試次數
  10. 在「變數與密鑰」部分,向下捲動至「要求」

    • 要求逾時:輸入 600 秒 (10 分鐘)。
  11. 前往「設定」分頁:

    • 在「資源」部分:
      • 記憶體:選取 512 MiB 以上。
      • CPU:選取 1
  12. 在「修訂版本資源調度」部分:

    • 執行個體數量下限:輸入 0
    • 「Maximum number of instances」(執行個體數量上限):輸入 100 (或根據預期負載調整)。
  13. 點選「建立」

  14. 等待服務建立完成 (1 到 2 分鐘)。

  15. 服務建立完成後,系統會自動開啟內嵌程式碼編輯器

新增函式程式碼

  1. 在「Function entry point」(函式進入點) 中輸入 main
  2. 在內嵌程式碼編輯器中建立兩個檔案:

    • 第一個檔案:main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone
    import base64
    import time
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch logs from Zendesk API and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        # Get environment variables
        bucket_name = os.environ.get('GCS_BUCKET')
        prefix = os.environ.get('GCS_PREFIX', 'zendesk/crm/')
        state_key = os.environ.get('STATE_KEY', 'zendesk/crm/state.json')
    
        base_url = os.environ.get('ZENDESK_BASE_URL', '').rstrip('/')
        auth_mode = os.environ.get('AUTH_MODE', 'token').lower()
        email = os.environ.get('ZENDESK_EMAIL', '')
        api_token = os.environ.get('ZENDESK_API_TOKEN', '')
        bearer = os.environ.get('ZENDESK_BEARER_TOKEN', '')
    
        resources = [r.strip() for r in os.environ.get('RESOURCES', 'audit_logs,incremental_tickets').split(',') if r.strip()]
        max_pages = int(os.environ.get('MAX_PAGES', '20'))
        lookback = int(os.environ.get('LOOKBACK_SECONDS', '3600'))
        http_timeout = int(os.environ.get('HTTP_TIMEOUT', '60'))
        http_retries = int(os.environ.get('HTTP_RETRIES', '3'))
    
        if not all([bucket_name, base_url]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(bucket_name)
    
            # Load state
            state = load_state(bucket, state_key)
    
            print(f'Processing resources: {resources}')
    
            summary = []
    
            if 'audit_logs' in resources:
                res = fetch_audit_logs(
                    bucket, prefix, state.get('audit_logs', {}),
                    base_url, auth_mode, email, api_token, bearer,
                    max_pages, http_timeout, http_retries
                )
                state['audit_logs'] = {'next_url': res.get('next_url')}
                summary.append(res)
    
            if 'incremental_tickets' in resources:
                res = fetch_incremental_tickets(
                    bucket, prefix, state.get('incremental_tickets', {}),
                    base_url, auth_mode, email, api_token, bearer,
                    max_pages, lookback, http_timeout, http_retries
                )
                state['incremental_tickets'] = {'cursor': res.get('cursor')}
                summary.append(res)
    
            # Save state
            save_state(bucket, state_key, state)
    
            print(f'Successfully processed logs: {summary}')
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def get_headers(auth_mode, email, api_token, bearer):
        """Get authentication headers."""
        if auth_mode == 'bearer' and bearer:
            return {
                'Authorization': f'Bearer {bearer}',
                'Accept': 'application/json'
            }
        if auth_mode == 'token' and email and api_token:
            auth_string = f'{email}/token:{api_token}'
            auth_bytes = auth_string.encode('utf-8')
            token = base64.b64encode(auth_bytes).decode('utf-8')
            return {
                'Authorization': f'Basic {token}',
                'Accept': 'application/json'
            }
        raise RuntimeError('Invalid auth settings: provide token (EMAIL + API_TOKEN) or BEARER')
    
    def http_get_json(url, headers, timeout, retries):
        """Make HTTP GET request with retries and exponential backoff."""
        attempt = 0
        backoff = 1.0
        while True:
            try:
                response = http.request('GET', url, headers=headers, timeout=timeout)
                if response.status == 200:
                    return json.loads(response.data.decode('utf-8'))
                elif response.status in (429, 500, 502, 503, 504) and attempt < retries:
                    retry_after = int(response.headers.get('Retry-After', int(backoff)))
                    print(f'HTTP {response.status}: Retrying after {retry_after}s (attempt {attempt + 1}/{retries})')
                    time.sleep(max(1, retry_after))
                    backoff = min(backoff * 2, 30.0)
                    attempt += 1
                    continue
                else:
                    raise Exception(f'HTTP {response.status}: {response.data.decode("utf-8")}')
            except Exception as e:
                if attempt < retries:
                    print(f'Request error: {e}. Retrying after {int(backoff)}s (attempt {attempt + 1}/{retries})')
                    time.sleep(backoff)
                    backoff = min(backoff * 2, 30.0)
                    attempt += 1
                    continue
                raise
    
    def put_page(bucket, prefix, payload, resource):
        """Write page to GCS."""
        ts = datetime.now(timezone.utc)
        key = f'{prefix}{ts.strftime("%Y/%m/%d/%H%M%S")}-zendesk-{resource}.json'
        blob = bucket.blob(key)
        blob.upload_from_string(
            json.dumps(payload),
            content_type='application/json'
        )
        return key
    
    def fetch_audit_logs(bucket, prefix, state, base_url, auth_mode, email, api_token, bearer, max_pages, timeout, retries):
        """Fetch audit logs with pagination."""
        headers = get_headers(auth_mode, email, api_token, bearer)
        next_url = state.get('next_url') or f'{base_url}/api/v2/audit_logs.json'
    
        pages = 0
        written = 0
        last_next = None
    
        while pages < max_pages and next_url:
            data = http_get_json(next_url, headers, timeout, retries)
            put_page(bucket, prefix, data, 'audit_logs')
            written += len(data.get('audit_logs', []))
    
            # Use next_page for pagination
            last_next = data.get('next_page')
            next_url = last_next
            pages += 1
    
            print(f'Audit logs page {pages}: Retrieved {len(data.get("audit_logs", []))} records')
    
        return {
            'resource': 'audit_logs',
            'pages': pages,
            'written': written,
            'next_url': last_next
        }
    
    def fetch_incremental_tickets(bucket, prefix, state, base_url, auth_mode, email, api_token, bearer, max_pages, lookback, timeout, retries):
        """Fetch incremental tickets with cursor-based pagination."""
        headers = get_headers(auth_mode, email, api_token, bearer)
        cursor = state.get('cursor')
    
        if not cursor:
            start = int(time.time()) - lookback
            next_url = f'{base_url}/api/v2/incremental/tickets/cursor.json?start_time={start}'
        else:
            next_url = f'{base_url}/api/v2/incremental/tickets/cursor.json?cursor={cursor}'
    
        pages = 0
        written = 0
        last_cursor = None
    
        while pages < max_pages and next_url:
            data = http_get_json(next_url, headers, timeout, retries)
            put_page(bucket, prefix, data, 'incremental_tickets')
            written += len(data.get('tickets', []))
    
            # Extract cursor from after_cursor field
            last_cursor = data.get('after_cursor')
            if last_cursor:
                next_url = f'{base_url}/api/v2/incremental/tickets/cursor.json?cursor={last_cursor}'
            else:
                next_url = None
    
            pages += 1
    
            print(f'Incremental tickets page {pages}: Retrieved {len(data.get("tickets", []))} records')
    
        return {
            'resource': 'incremental_tickets',
            'pages': pages,
            'written': written,
            'cursor': last_cursor
        }
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f'Warning: Could not load state: {str(e)}')
        return {'audit_logs': {}, 'incremental_tickets': {}}
    
    def save_state(bucket, key, state):
        """Save state to GCS."""
        try:
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state),
                content_type='application/json'
            )
        except Exception as e:
            print(f'Warning: Could not save state: {str(e)}')
    
    • 第二個檔案:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. 點選「部署」來儲存並部署函式。

  4. 等待部署作業完成 (2 到 3 分鐘)。

建立 Cloud Scheduler 工作

Cloud Scheduler 會定期將訊息發布至 Pub/Sub 主題,觸發 Cloud Run 函式。

  1. 前往 GCP 主控台的「Cloud Scheduler」
  2. 點選「建立工作」
  3. 請提供下列設定詳細資料:

    設定
    名稱 zendesk-crm-collector-hourly
    區域 選取與 Cloud Run 函式相同的區域
    頻率 0 * * * * (每小時整點)
    時區 選取時區 (建議使用世界標準時間)
    目標類型 Pub/Sub
    主題 選取主題 zendesk-crm-trigger
    郵件內文 {} (空白 JSON 物件)
  4. 點選「建立」

排程頻率選項

  • 根據記錄檔量和延遲時間要求選擇頻率:

    頻率 Cron 運算式 用途
    每 5 分鐘 */5 * * * * 高容量、低延遲
    每 15 分鐘檢查一次 */15 * * * * 普通量
    每小時 0 * * * * 標準 (建議採用)
    每 6 小時 0 */6 * * * 少量、批次處理
    每日 0 0 * * * 歷來資料集合

測試整合項目

  1. Cloud Scheduler 控制台中找出您的工作。
  2. 按一下「強制執行」,手動觸發工作。
  3. 稍等幾秒鐘。
  4. 前往「Cloud Run」>「Services」
  5. 按一下函式名稱 zendesk-crm-collector
  6. 按一下 [Logs] (記錄) 分頁標籤。
  7. 確認函式是否已順利執行。請找出以下項目:

    Processing resources: ['audit_logs', 'incremental_tickets']
    Audit logs page 1: Retrieved X records
    Incremental tickets page 1: Retrieved X records
    Successfully processed logs: [...]
    
  8. 依序前往「Cloud Storage」>「Buckets」

  9. 按一下 bucket 名稱。

  10. 前往前置字元資料夾 zendesk/crm/

  11. 確認是否已建立含有目前時間戳記的新 .json 檔案。

如果在記錄中發現錯誤:

  • HTTP 401:檢查環境變數中的 API 憑證
  • HTTP 403:確認帳戶具備必要權限 (管理員角色、稽核記錄的企業方案)
  • HTTP 429:頻率限制 - 函式會自動重試,並採用指數輪詢間隔
  • 缺少環境變數:檢查是否已設定所有必要變數

擷取 Google SecOps 服務帳戶

Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。

取得服務帳戶電子郵件地址

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Zendesk CRM logs)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「Zendesk CRM」做為「記錄類型」
  7. 按一下「取得服務帳戶」。系統會顯示專屬的服務帳戶電子郵件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 複製這個電子郵件地址,以便在下一步中使用。

將 IAM 權限授予 Google SecOps 服務帳戶

Google SecOps 服務帳戶需要 GCS bucket 的「Storage 物件檢視者」角色。

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址。
    • 指派角色:選取「Storage 物件檢視者」
  6. 按一下 [儲存]

在 Google SecOps 中設定動態饋給,擷取 Zendesk CRM 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Zendesk CRM logs)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「Zendesk CRM」做為「記錄類型」
  7. 點選 [下一步]。
  8. 指定下列輸入參數的值:

    • 儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:

      gs://zendesk-crm-logs/zendesk/crm/
      
      • 取代:

        • zendesk-crm-logs:您的 GCS bucket 名稱。
        • zendesk/crm/:儲存記錄的前置字元/資料夾路徑。
    • 來源刪除選項:根據偏好設定選取刪除選項:

      • 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
      • 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
      • 刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。

    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。

    • 資產命名空間資產命名空間

    • 擷取標籤:要套用至這個動態饋給事件的標籤。

  9. 點選 [下一步]。

  10. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

需要其他協助嗎?向社群成員和 Google SecOps 專業人員尋求答案。