Zendesk CRM 로그 수집
이 문서에서는 Google Cloud Storage를 사용하여 Zendesk 고객 관계 관리 (CRM) 로그를 Google Security Operations에 수집하는 방법을 설명합니다. Zendesk CRM은 고객 지원 및 티켓 관리 기능을 제공합니다. 이 플랫폼은 감사 로그와 티켓 데이터를 통해 고객 상호작용, 지원 티켓, 관리 활동을 추적합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
- Cloud Storage API가 사용 설정된 GCP 프로젝트
- GCS 버킷을 만들고 관리할 수 있는 권한
- GCS 버킷의 IAM 정책을 관리할 수 있는 권한
- Cloud Run 함수, Pub/Sub 주제, Cloud Scheduler 작업을 만들 수 있는 권한
- Zendesk에 대한 권한 있는 액세스 (API 토큰 생성에 관리자 역할 필요)
- Zendesk Enterprise 요금제 (감사 로그 API 액세스에 필요)
Zendesk 사전 요구사항 확인
요금제 및 역할 확인
API 토큰 또는 OAuth 클라이언트를 만들려면 Zendesk 관리자여야 합니다. 감사 로그 API는 Enterprise 요금제에서만 사용할 수 있으며 페이지당 최대 100개의 레코드를 반환합니다. 계정이 엔터프라이즈가 아닌 경우에도 증분 티켓 데이터를 수집할 수 있습니다.
API 토큰 액세스 사용 설정 (일회성)
- 관리 센터에서 앱 및 통합 > API > Zendesk API로 이동합니다.
- 설정 탭에서 토큰 액세스를 사용 설정합니다.
API 토큰 생성 (기본 인증용)
- 앱 및 통합 > API > Zendesk API로 이동합니다.
- API 토큰 추가 버튼을 클릭합니다.
- API 토큰 설명을 추가할 수 있습니다(선택사항).
- 만들기를 클릭합니다.
- 지금 API 토큰을 복사하여 저장하세요 (다시 볼 수 없음).
이 토큰으로 인증할 관리자 이메일을 저장합니다.
(선택사항) OAuth 클라이언트 만들기 (API 토큰 대신 Bearer 인증용)
- 앱 및 통합 > API > Zendesk API로 이동합니다.
- OAuth 클라이언트 탭을 클릭합니다.
- OAuth 클라이언트 추가를 클릭합니다.
- 클라이언트 이름, 고유 식별자 (자동), 리디렉션 URL을 입력합니다 (API로만 토큰을 생성하는 경우 자리표시자 사용 가능).
- 저장을 클릭합니다.
- 통합의 액세스 토큰을 만들고 이 가이드에 필요한 최소 범위를 부여합니다.
tickets:read(증분 티켓의 경우)auditlogs:read(감사 로그, 엔터프라이즈만 해당)
- 액세스 토큰을 복사하고 (
ZENDESK_BEARER_TOKEN환경 변수에 붙여넣기) 클라이언트 ID/보안 비밀번호를 안전하게 기록합니다 (향후 토큰 새로고침 흐름용).
Zendesk 기본 URL 기록
https://<your_subdomain>.zendesk.com 사용 (ZENDESK_BASE_URL 환경 변수에 붙여넣기)
나중에 볼 항목
- 기본 URL (예:
https://acme.zendesk.com) - 관리자 사용자의 이메일 주소 (API 토큰 인증용)
- API 토큰 (
AUTH_MODE=token사용 시) 또는 OAuth 액세스 토큰 (AUTH_MODE=bearer사용 시) - (선택사항): 수명 주기 관리를 위한 OAuth 클라이언트 ID/보안 비밀번호
Google Cloud Storage 버킷 만들기
- Google Cloud Console로 이동합니다.
- 프로젝트를 선택하거나 새 프로젝트를 만듭니다.
- 탐색 메뉴에서 Cloud Storage> 버킷으로 이동합니다.
- 버킷 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 버킷 이름 지정 전역적으로 고유한 이름 (예: zendesk-crm-logs)을 입력합니다.위치 유형 필요에 따라 선택 (리전, 이중 리전, 멀티 리전) 위치 위치를 선택합니다 (예: us-central1).스토리지 클래스 Standard (자주 액세스하는 로그에 권장) 액세스 제어 균일 (권장) 보호 도구 선택사항: 객체 버전 관리 또는 보관 정책 사용 설정 만들기를 클릭합니다.
Cloud Run 함수의 서비스 계정 만들기
Cloud Run 함수에는 GCS 버킷에 쓸 수 있고 Pub/Sub에서 호출할 수 있는 권한이 있는 서비스 계정이 필요합니다.
서비스 계정 만들기
- GCP 콘솔에서 IAM 및 관리자 > 서비스 계정으로 이동합니다.
- 서비스 계정 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 서비스 계정 이름:
zendesk-crm-collector-sa을 입력합니다. - 서비스 계정 설명:
Service account for Cloud Run function to collect Zendesk CRM logs을 입력합니다.
- 서비스 계정 이름:
- 만들고 계속하기를 클릭합니다.
- 이 서비스 계정에 프로젝트에 대한 액세스 권한 부여 섹션에서 다음 역할을 추가합니다.
- 역할 선택을 클릭합니다.
- 스토리지 객체 관리자를 검색하여 선택합니다.
- + 다른 역할 추가를 클릭합니다.
- Cloud Run 호출자를 검색하여 선택합니다.
- + 다른 역할 추가를 클릭합니다.
- Cloud Functions 호출자를 검색하여 선택합니다.
- 계속을 클릭합니다.
- 완료를 클릭합니다.
이러한 역할은 다음 작업에 필요합니다.
- 스토리지 객체 관리자: GCS 버킷에 로그를 쓰고 상태 파일을 관리합니다.
- Cloud Run 호출자: Pub/Sub가 함수를 호출하도록 허용
- Cloud Functions 호출자: 함수 호출 허용
GCS 버킷에 대한 IAM 권한 부여
GCS 버킷에 대한 쓰기 권한을 서비스 계정에 부여합니다.
- Cloud Storage> 버킷으로 이동합니다.
- 버킷 이름을 클릭합니다.
- 권한 탭으로 이동합니다.
- 액세스 권한 부여를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주 구성원 추가: 서비스 계정 이메일 (예:
zendesk-crm-collector-sa@PROJECT_ID.iam.gserviceaccount.com)을 입력합니다. - 역할 할당: 스토리지 객체 관리자를 선택합니다.
- 주 구성원 추가: 서비스 계정 이메일 (예:
- 저장을 클릭합니다.
게시/구독 주제 만들기
Cloud Scheduler가 게시하고 Cloud Run 함수가 구독할 Pub/Sub 주제를 만듭니다.
- GCP Console에서 Pub/Sub > 주제로 이동합니다.
- 주제 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주제 ID:
zendesk-crm-trigger를 입력합니다. - 다른 설정은 기본값으로 둡니다.
- 주제 ID:
- 만들기를 클릭합니다.
로그를 수집하는 Cloud Run 함수 만들기
Cloud Run 함수는 Cloud Scheduler의 Pub/Sub 메시지에 의해 트리거되어 Zendesk API에서 로그를 가져오고 GCS에 씁니다.
- GCP 콘솔에서 Cloud Run으로 이동합니다.
- 서비스 만들기를 클릭합니다.
- 함수를 선택합니다 (인라인 편집기를 사용하여 함수 만들기).
구성 섹션에서 다음 구성 세부정보를 제공합니다.
설정 값 서비스 이름 zendesk-crm-collector리전 GCS 버킷과 일치하는 리전을 선택합니다 (예: us-central1).런타임 Python 3.12 이상 선택 트리거 (선택사항) 섹션에서 다음을 수행합니다.
- + 트리거 추가를 클릭합니다.
- Cloud Pub/Sub를 선택합니다.
- Cloud Pub/Sub 주제 선택에서
zendesk-crm-trigger주제를 선택합니다. - 저장을 클릭합니다.
인증 섹션에서 다음을 구성합니다.
- 인증 필요를 선택합니다.
- ID 및 액세스 관리 (IAM)를 확인합니다.
아래로 스크롤하고 컨테이너, 네트워킹, 보안을 펼칩니다.
보안 탭으로 이동합니다.
- 서비스 계정: 서비스 계정
zendesk-crm-collector-sa를 선택합니다.
- 서비스 계정: 서비스 계정
컨테이너 탭으로 이동합니다.
- 변수 및 보안 비밀을 클릭합니다.
- 각 환경 변수에 대해 + 변수 추가를 클릭합니다.
변수 이름 예시 값 설명 GCS_BUCKETzendesk-crm-logsGCS 버킷 이름 GCS_PREFIXzendesk/crm/로그 파일의 접두사 STATE_KEYzendesk/crm/state.json상태 파일 경로 ZENDESK_BASE_URLhttps://your_subdomain.zendesk.comZendesk 기본 URL AUTH_MODEtoken인증 모드 ( token또는bearer)ZENDESK_EMAILanalyst@example.comAPI 토큰 인증을 위한 관리자 이메일 ZENDESK_API_TOKEN<api_token>인증을 위한 API 토큰 ZENDESK_BEARER_TOKEN<leave empty unless using OAuth bearer>OAuth 베어러 토큰 (선택사항) RESOURCESaudit_logs,incremental_tickets수집할 리소스 MAX_PAGES20실행당 최대 페이지 수 LOOKBACK_SECONDS3600초기 전환 확인 기간 HTTP_TIMEOUT60HTTP 요청 시간초과 HTTP_RETRIES3HTTP 재시도 횟수 변수 및 보안 비밀 섹션에서 요청까지 아래로 스크롤합니다.
- 요청 제한 시간:
600초 (10분)를 입력합니다.
- 요청 제한 시간:
설정 탭으로 이동합니다.
- 리소스 섹션에서 다음을 수행합니다.
- 메모리: 512MiB 이상을 선택합니다.
- CPU: 1을 선택합니다.
- 리소스 섹션에서 다음을 수행합니다.
버전 확장 섹션에서 다음을 수행합니다.
- 최소 인스턴스 수:
0를 입력합니다. - 최대 인스턴스 수:
100을 입력합니다 (또는 예상 부하에 따라 조정).
- 최소 인스턴스 수:
만들기를 클릭합니다.
서비스가 생성될 때까지 기다립니다 (1~2분).
서비스가 생성되면 인라인 코드 편집기가 자동으로 열립니다.
함수 코드 추가
- 함수 진입점에 main을 입력합니다.
인라인 코드 편집기에서 다음 두 파일을 만듭니다.
- 첫 번째 파일: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone import base64 import time # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch logs from Zendesk API and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'zendesk/crm/') state_key = os.environ.get('STATE_KEY', 'zendesk/crm/state.json') base_url = os.environ.get('ZENDESK_BASE_URL', '').rstrip('/') auth_mode = os.environ.get('AUTH_MODE', 'token').lower() email = os.environ.get('ZENDESK_EMAIL', '') api_token = os.environ.get('ZENDESK_API_TOKEN', '') bearer = os.environ.get('ZENDESK_BEARER_TOKEN', '') resources = [r.strip() for r in os.environ.get('RESOURCES', 'audit_logs,incremental_tickets').split(',') if r.strip()] max_pages = int(os.environ.get('MAX_PAGES', '20')) lookback = int(os.environ.get('LOOKBACK_SECONDS', '3600')) http_timeout = int(os.environ.get('HTTP_TIMEOUT', '60')) http_retries = int(os.environ.get('HTTP_RETRIES', '3')) if not all([bucket_name, base_url]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Load state state = load_state(bucket, state_key) print(f'Processing resources: {resources}') summary = [] if 'audit_logs' in resources: res = fetch_audit_logs( bucket, prefix, state.get('audit_logs', {}), base_url, auth_mode, email, api_token, bearer, max_pages, http_timeout, http_retries ) state['audit_logs'] = {'next_url': res.get('next_url')} summary.append(res) if 'incremental_tickets' in resources: res = fetch_incremental_tickets( bucket, prefix, state.get('incremental_tickets', {}), base_url, auth_mode, email, api_token, bearer, max_pages, lookback, http_timeout, http_retries ) state['incremental_tickets'] = {'cursor': res.get('cursor')} summary.append(res) # Save state save_state(bucket, state_key, state) print(f'Successfully processed logs: {summary}') except Exception as e: print(f'Error processing logs: {str(e)}') raise def get_headers(auth_mode, email, api_token, bearer): """Get authentication headers.""" if auth_mode == 'bearer' and bearer: return { 'Authorization': f'Bearer {bearer}', 'Accept': 'application/json' } if auth_mode == 'token' and email and api_token: auth_string = f'{email}/token:{api_token}' auth_bytes = auth_string.encode('utf-8') token = base64.b64encode(auth_bytes).decode('utf-8') return { 'Authorization': f'Basic {token}', 'Accept': 'application/json' } raise RuntimeError('Invalid auth settings: provide token (EMAIL + API_TOKEN) or BEARER') def http_get_json(url, headers, timeout, retries): """Make HTTP GET request with retries and exponential backoff.""" attempt = 0 backoff = 1.0 while True: try: response = http.request('GET', url, headers=headers, timeout=timeout) if response.status == 200: return json.loads(response.data.decode('utf-8')) elif response.status in (429, 500, 502, 503, 504) and attempt < retries: retry_after = int(response.headers.get('Retry-After', int(backoff))) print(f'HTTP {response.status}: Retrying after {retry_after}s (attempt {attempt + 1}/{retries})') time.sleep(max(1, retry_after)) backoff = min(backoff * 2, 30.0) attempt += 1 continue else: raise Exception(f'HTTP {response.status}: {response.data.decode("utf-8")}') except Exception as e: if attempt < retries: print(f'Request error: {e}. Retrying after {int(backoff)}s (attempt {attempt + 1}/{retries})') time.sleep(backoff) backoff = min(backoff * 2, 30.0) attempt += 1 continue raise def put_page(bucket, prefix, payload, resource): """Write page to GCS.""" ts = datetime.now(timezone.utc) key = f'{prefix}{ts.strftime("%Y/%m/%d/%H%M%S")}-zendesk-{resource}.json' blob = bucket.blob(key) blob.upload_from_string( json.dumps(payload), content_type='application/json' ) return key def fetch_audit_logs(bucket, prefix, state, base_url, auth_mode, email, api_token, bearer, max_pages, timeout, retries): """Fetch audit logs with pagination.""" headers = get_headers(auth_mode, email, api_token, bearer) next_url = state.get('next_url') or f'{base_url}/api/v2/audit_logs.json' pages = 0 written = 0 last_next = None while pages < max_pages and next_url: data = http_get_json(next_url, headers, timeout, retries) put_page(bucket, prefix, data, 'audit_logs') written += len(data.get('audit_logs', [])) # Use next_page for pagination last_next = data.get('next_page') next_url = last_next pages += 1 print(f'Audit logs page {pages}: Retrieved {len(data.get("audit_logs", []))} records') return { 'resource': 'audit_logs', 'pages': pages, 'written': written, 'next_url': last_next } def fetch_incremental_tickets(bucket, prefix, state, base_url, auth_mode, email, api_token, bearer, max_pages, lookback, timeout, retries): """Fetch incremental tickets with cursor-based pagination.""" headers = get_headers(auth_mode, email, api_token, bearer) cursor = state.get('cursor') if not cursor: start = int(time.time()) - lookback next_url = f'{base_url}/api/v2/incremental/tickets/cursor.json?start_time={start}' else: next_url = f'{base_url}/api/v2/incremental/tickets/cursor.json?cursor={cursor}' pages = 0 written = 0 last_cursor = None while pages < max_pages and next_url: data = http_get_json(next_url, headers, timeout, retries) put_page(bucket, prefix, data, 'incremental_tickets') written += len(data.get('tickets', [])) # Extract cursor from after_cursor field last_cursor = data.get('after_cursor') if last_cursor: next_url = f'{base_url}/api/v2/incremental/tickets/cursor.json?cursor={last_cursor}' else: next_url = None pages += 1 print(f'Incremental tickets page {pages}: Retrieved {len(data.get("tickets", []))} records') return { 'resource': 'incremental_tickets', 'pages': pages, 'written': written, 'cursor': last_cursor } def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return {'audit_logs': {}, 'incremental_tickets': {}} def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state), content_type='application/json' ) except Exception as e: print(f'Warning: Could not save state: {str(e)}')- 두 번째 파일: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0배포를 클릭하여 함수를 저장하고 배포합니다.
배포가 완료될 때까지 기다립니다 (2~3분).
Cloud Scheduler 작업 만들기
Cloud Scheduler는 일정 간격으로 Pub/Sub 주제에 메시지를 게시하여 Cloud Run 함수를 트리거합니다.
- GCP Console에서 Cloud Scheduler로 이동합니다.
- 작업 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 이름 zendesk-crm-collector-hourly리전 Cloud Run 함수와 동일한 리전 선택 주파수 0 * * * *(매시간 정각)시간대 시간대 선택 (UTC 권장) 타겟 유형 Pub/Sub 주제 zendesk-crm-trigger주제를 선택합니다.메일 본문 {}(빈 JSON 객체)만들기를 클릭합니다.
일정 빈도 옵션
로그 볼륨 및 지연 시간 요구사항에 따라 빈도를 선택합니다.
빈도 크론 표현식 사용 사례 5분마다 */5 * * * *대용량, 저지연 15분마다 */15 * * * *검색량 보통 1시간마다 0 * * * *일반(권장) 6시간마다 0 */6 * * *양이 적은 일괄 처리 매일 0 0 * * *이전 데이터 수집
통합 테스트
- Cloud Scheduler 콘솔에서 작업을 찾습니다.
- 강제 실행을 클릭하여 작업을 수동으로 트리거합니다.
- 몇 초 동안 기다립니다.
- Cloud Run > 서비스로 이동합니다.
- 함수 이름
zendesk-crm-collector을 클릭합니다. - 로그 탭을 클릭합니다.
함수가 성공적으로 실행되었는지 확인합니다. 다음을 확인하세요.
Processing resources: ['audit_logs', 'incremental_tickets'] Audit logs page 1: Retrieved X records Incremental tickets page 1: Retrieved X records Successfully processed logs: [...]Cloud Storage> 버킷으로 이동합니다.
버킷 이름을 클릭합니다.
접두사 폴더
zendesk/crm/로 이동합니다.새
.json파일이 현재 타임스탬프로 생성되었는지 확인합니다.
로그에 오류가 표시되면 다음 단계를 따르세요.
- HTTP 401: 환경 변수에서 API 사용자 인증 정보 확인
- HTTP 403: 계정에 필요한 권한 (감사 로그의 관리자 역할, Enterprise 요금제)이 있는지 확인합니다.
- HTTP 429: 비율 제한 - 함수가 지수 백오프로 자동 재시도됩니다.
- 환경 변수 누락: 필수 변수가 모두 설정되었는지 확인
Google SecOps 서비스 계정 가져오기
Google SecOps는 고유한 서비스 계정을 사용하여 GCS 버킷에서 데이터를 읽습니다. 이 서비스 계정에 버킷에 대한 액세스 권한을 부여해야 합니다.
서비스 계정 이메일 가져오기
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 단일 피드 구성을 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
Zendesk CRM logs). - 소스 유형으로 Google Cloud Storage V2를 선택합니다.
- 로그 유형으로 Zendesk CRM을 선택합니다.
서비스 계정 가져오기를 클릭합니다. 고유한 서비스 계정 이메일이 표시됩니다. 예를 들면 다음과 같습니다.
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com다음 단계에서 사용할 수 있도록 이 이메일 주소를 복사합니다.
Google SecOps 서비스 계정에 IAM 권한 부여
Google SecOps 서비스 계정에는 GCS 버킷에 대한 스토리지 객체 뷰어 역할이 필요합니다.
- Cloud Storage> 버킷으로 이동합니다.
- 버킷 이름을 클릭합니다.
- 권한 탭으로 이동합니다.
- 액세스 권한 부여를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주 구성원 추가: Google SecOps 서비스 계정 이메일을 붙여넣습니다.
- 역할 할당: 스토리지 객체 뷰어를 선택합니다.
저장을 클릭합니다.
Zendesk CRM 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 단일 피드 구성을 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
Zendesk CRM logs). - 소스 유형으로 Google Cloud Storage V2를 선택합니다.
- 로그 유형으로 Zendesk CRM을 선택합니다.
- 다음을 클릭합니다.
다음 입력 매개변수의 값을 지정합니다.
스토리지 버킷 URL: 다음 접두사 경로를 사용하여 GCS 버킷 URI를 입력합니다.
gs://zendesk-crm-logs/zendesk/crm/다음과 같이 바꿉니다.
zendesk-crm-logs: GCS 버킷 이름입니다.zendesk/crm/: 로그가 저장되는 접두사/폴더 경로입니다.
소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 삭제 안함: 전송 후 파일을 삭제하지 않습니다 (테스트에 권장).
- 전송된 파일 삭제: 전송이 완료되면 파일을 삭제합니다.
전송된 파일 및 빈 디렉터리 삭제: 전송이 완료되면 파일과 빈 디렉터리를 삭제합니다.
최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
애셋 네임스페이스: 애셋 네임스페이스입니다.
수집 라벨: 이 피드의 이벤트에 적용할 라벨입니다.
다음을 클릭합니다.
확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
도움이 더 필요한가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.