Zendesk CRM ログを収集する
このドキュメントでは、Google Cloud Storage を使用して Zendesk 顧客管理(CRM)ログを Google Security Operations に取り込む方法について説明します。Zendesk CRM は、カスタマー サポートとチケット管理機能を提供します。このプラットフォームは、監査ログとチケットデータを通じて、お客様とのやり取り、サポート チケット、管理アクティビティを追跡します。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run 関数、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- Zendesk への特権アクセス(API トークンの作成には管理者ロールが必要)
- Zendesk Enterprise プラン(Audit Logs API へのアクセスに必要)
Zendesk の前提条件を取得する
プランとロールを確認する
API トークンまたは OAuth クライアントを作成するには、Zendesk 管理者である必要があります。監査ログ API は Enterprise プランでのみ利用可能で、1 ページあたり最大 100 件のレコードを返します。アカウントが Enterprise でなくても、増分チケット データを収集できます。
API トークン アクセスを有効にする(1 回限り)
- 管理センターで、[アプリとインテグレーション> API > Zendesk API] に移動します。
- [設定] タブで [トークン アクセス] を有効にします。
API トークンを生成する(基本認証の場合)
- [Apps and integrations> APIs> Zendesk API] に移動します。
- [Add API token] ボタンをクリックします。
- 必要に応じて、API トークンの説明を追加します。
- [作成] をクリックします。
- API トークンをコピーして保存します(再度表示することはできません)。
このトークンで認証される管理者メールを保存します。
(省略可)OAuth クライアントを作成する(API トークンではなく Bearer 認証の場合)
- [Apps and integrations> APIs> Zendesk API] に移動します。
- [OAuth クライアント] タブをクリックします。
- [OAuth クライアントを追加] をクリックします。
- [クライアント名]、[一意の識別子](自動)、[リダイレクト URL](API でのみトークンを生成する場合はプレースホルダにできます)を入力します。
- [保存] をクリックします。
- 統合のアクセス トークンを作成し、このガイドで必要な最小限のスコープを付与します。
tickets:read(増分チケットの場合)auditlogs:read(監査ログの場合。Enterprise のみ)
- アクセス トークンをコピーし(
ZENDESK_BEARER_TOKEN環境変数に貼り付け)、クライアント ID とシークレットを安全に記録します(今後のトークン更新フローで使用します)。
Zendesk のベース URL を記録する
https://<your_subdomain>.zendesk.com を使用します(ZENDESK_BASE_URL 環境変数に貼り付けます)。
後で使用するために保存する内容
- ベース URL(例:
https://acme.zendesk.com) - 管理者ユーザーのメールアドレス(API トークン認証の場合)
- API トークン(
AUTH_MODE=tokenを使用している場合)または OAuth アクセス トークン(AUTH_MODE=bearerを使用している場合) - (省略可): ライフサイクル管理用の OAuth クライアント ID/シークレット
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( zendesk-crm-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
zendesk-crm-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect Zendesk CRM logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
zendesk-crm-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
zendesk-crm-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Zendesk API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 zendesk-crm-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、トピック
zendesk-crm-triggerを選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウント
zendesk-crm-collector-saを選択します。
- サービス アカウント: サービス アカウント
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKETzendesk-crm-logsGCS バケット名 GCS_PREFIXzendesk/crm/ログファイルの接頭辞 STATE_KEYzendesk/crm/state.json状態ファイルのパス ZENDESK_BASE_URLhttps://your_subdomain.zendesk.comZendesk ベース URL AUTH_MODEtoken認証モード( tokenまたはbearer)ZENDESK_EMAILanalyst@example.comAPI トークン認証の管理者メールアドレス ZENDESK_API_TOKEN<api_token>認証用の API トークン ZENDESK_BEARER_TOKEN<leave empty unless using OAuth bearer>OAuth 署名なしトークン(省略可) RESOURCESaudit_logs,incremental_tickets収集するリソース MAX_PAGES20実行あたりの最大ページ数 LOOKBACK_SECONDS3600最初のルックバック期間 HTTP_TIMEOUT60HTTP リクエスト タイムアウト HTTP_RETRIES3HTTP 再試行回数 [変数とシークレット] セクションで、[リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [リソース] セクションで次の操作を行います。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone import base64 import time # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch logs from Zendesk API and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'zendesk/crm/') state_key = os.environ.get('STATE_KEY', 'zendesk/crm/state.json') base_url = os.environ.get('ZENDESK_BASE_URL', '').rstrip('/') auth_mode = os.environ.get('AUTH_MODE', 'token').lower() email = os.environ.get('ZENDESK_EMAIL', '') api_token = os.environ.get('ZENDESK_API_TOKEN', '') bearer = os.environ.get('ZENDESK_BEARER_TOKEN', '') resources = [r.strip() for r in os.environ.get('RESOURCES', 'audit_logs,incremental_tickets').split(',') if r.strip()] max_pages = int(os.environ.get('MAX_PAGES', '20')) lookback = int(os.environ.get('LOOKBACK_SECONDS', '3600')) http_timeout = int(os.environ.get('HTTP_TIMEOUT', '60')) http_retries = int(os.environ.get('HTTP_RETRIES', '3')) if not all([bucket_name, base_url]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Load state state = load_state(bucket, state_key) print(f'Processing resources: {resources}') summary = [] if 'audit_logs' in resources: res = fetch_audit_logs( bucket, prefix, state.get('audit_logs', {}), base_url, auth_mode, email, api_token, bearer, max_pages, http_timeout, http_retries ) state['audit_logs'] = {'next_url': res.get('next_url')} summary.append(res) if 'incremental_tickets' in resources: res = fetch_incremental_tickets( bucket, prefix, state.get('incremental_tickets', {}), base_url, auth_mode, email, api_token, bearer, max_pages, lookback, http_timeout, http_retries ) state['incremental_tickets'] = {'cursor': res.get('cursor')} summary.append(res) # Save state save_state(bucket, state_key, state) print(f'Successfully processed logs: {summary}') except Exception as e: print(f'Error processing logs: {str(e)}') raise def get_headers(auth_mode, email, api_token, bearer): """Get authentication headers.""" if auth_mode == 'bearer' and bearer: return { 'Authorization': f'Bearer {bearer}', 'Accept': 'application/json' } if auth_mode == 'token' and email and api_token: auth_string = f'{email}/token:{api_token}' auth_bytes = auth_string.encode('utf-8') token = base64.b64encode(auth_bytes).decode('utf-8') return { 'Authorization': f'Basic {token}', 'Accept': 'application/json' } raise RuntimeError('Invalid auth settings: provide token (EMAIL + API_TOKEN) or BEARER') def http_get_json(url, headers, timeout, retries): """Make HTTP GET request with retries and exponential backoff.""" attempt = 0 backoff = 1.0 while True: try: response = http.request('GET', url, headers=headers, timeout=timeout) if response.status == 200: return json.loads(response.data.decode('utf-8')) elif response.status in (429, 500, 502, 503, 504) and attempt < retries: retry_after = int(response.headers.get('Retry-After', int(backoff))) print(f'HTTP {response.status}: Retrying after {retry_after}s (attempt {attempt + 1}/{retries})') time.sleep(max(1, retry_after)) backoff = min(backoff * 2, 30.0) attempt += 1 continue else: raise Exception(f'HTTP {response.status}: {response.data.decode("utf-8")}') except Exception as e: if attempt < retries: print(f'Request error: {e}. Retrying after {int(backoff)}s (attempt {attempt + 1}/{retries})') time.sleep(backoff) backoff = min(backoff * 2, 30.0) attempt += 1 continue raise def put_page(bucket, prefix, payload, resource): """Write page to GCS.""" ts = datetime.now(timezone.utc) key = f'{prefix}{ts.strftime("%Y/%m/%d/%H%M%S")}-zendesk-{resource}.json' blob = bucket.blob(key) blob.upload_from_string( json.dumps(payload), content_type='application/json' ) return key def fetch_audit_logs(bucket, prefix, state, base_url, auth_mode, email, api_token, bearer, max_pages, timeout, retries): """Fetch audit logs with pagination.""" headers = get_headers(auth_mode, email, api_token, bearer) next_url = state.get('next_url') or f'{base_url}/api/v2/audit_logs.json' pages = 0 written = 0 last_next = None while pages < max_pages and next_url: data = http_get_json(next_url, headers, timeout, retries) put_page(bucket, prefix, data, 'audit_logs') written += len(data.get('audit_logs', [])) # Use next_page for pagination last_next = data.get('next_page') next_url = last_next pages += 1 print(f'Audit logs page {pages}: Retrieved {len(data.get("audit_logs", []))} records') return { 'resource': 'audit_logs', 'pages': pages, 'written': written, 'next_url': last_next } def fetch_incremental_tickets(bucket, prefix, state, base_url, auth_mode, email, api_token, bearer, max_pages, lookback, timeout, retries): """Fetch incremental tickets with cursor-based pagination.""" headers = get_headers(auth_mode, email, api_token, bearer) cursor = state.get('cursor') if not cursor: start = int(time.time()) - lookback next_url = f'{base_url}/api/v2/incremental/tickets/cursor.json?start_time={start}' else: next_url = f'{base_url}/api/v2/incremental/tickets/cursor.json?cursor={cursor}' pages = 0 written = 0 last_cursor = None while pages < max_pages and next_url: data = http_get_json(next_url, headers, timeout, retries) put_page(bucket, prefix, data, 'incremental_tickets') written += len(data.get('tickets', [])) # Extract cursor from after_cursor field last_cursor = data.get('after_cursor') if last_cursor: next_url = f'{base_url}/api/v2/incremental/tickets/cursor.json?cursor={last_cursor}' else: next_url = None pages += 1 print(f'Incremental tickets page {pages}: Retrieved {len(data.get("tickets", []))} records') return { 'resource': 'incremental_tickets', 'pages': pages, 'written': written, 'cursor': last_cursor } def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return {'audit_logs': {}, 'incremental_tickets': {}} def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state), content_type='application/json' ) except Exception as e: print(f'Warning: Could not save state: {str(e)}')- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 zendesk-crm-collector-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック トピック zendesk-crm-triggerを選択します。メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 5 分毎 */5 * * * *大容量、低レイテンシ 15 分ごと */15 * * * *検索量が普通 1 時間ごと 0 * * * *標準(推奨) 6 時間ごと 0 */6 * * *少量、バッチ処理 毎日 0 0 * * *履歴データの収集
統合をテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして、ジョブを手動でトリガーします。
- 数秒待ちます。
- Cloud Run > サービスに移動します。
- 関数名
zendesk-crm-collectorをクリックします。 - [Logs] タブをクリックします。
関数が正常に実行されたことを確認します。次の内容を確認します。
Processing resources: ['audit_logs', 'incremental_tickets'] Audit logs page 1: Retrieved X records Incremental tickets page 1: Retrieved X records Successfully processed logs: [...][Cloud Storage] > [バケット] に移動します。
バケット名をクリックします。
プレフィックス フォルダ
zendesk/crm/に移動します。新しい
.jsonファイルが現在のタイムスタンプで作成されたことを確認します。
ログにエラーが表示された場合:
- HTTP 401: 環境変数で API 認証情報を確認する
- HTTP 403: アカウントに必要な権限(監査ログの管理者ロール、Enterprise プラン)があることを確認します。
- HTTP 429: レート制限 - 関数は指数バックオフで自動的に再試行されます
- 環境変数が不足している: 必要な変数がすべて設定されていることを確認します
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Zendesk CRM logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Zendesk CRM] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
Zendesk CRM のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Zendesk CRM logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Zendesk CRM] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://zendesk-crm-logs/zendesk/crm/次のように置き換えます。
zendesk-crm-logs: GCS バケット名。zendesk/crm/: ログが保存される接頭辞/フォルダパス。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。