Rippling のアクティビティ ログを収集する
このドキュメントでは、Google Cloud Storage を使用して Rippling アクティビティ ログを Google Security Operations に取り込む方法について説明します。Rippling は、給与計算、福利厚生、従業員のオンボーディング、デバイス管理、アプリケーション プロビジョニングなどの人事、IT、財務ソリューションを提供する従業員管理プラットフォームです。Company Activity API は、Rippling プラットフォーム全体での管理アクションとユーザー アクションの監査ログを提供します。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- Rippling への特権アクセス(会社のアクティビティにアクセスできる API トークン)
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( rippling-activity-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Rippling API 認証情報を収集する
- Rippling Admin にログインします。
- [Search>API Tokens] に移動します。
- 別のパス: [設定] > [会社の設定] > [API トークン]。
- [Create API Token] をクリックします。
- 次の構成の詳細を指定します。
- 名前: 一意でわかりやすい名前を入力します(例:
Google SecOps GCS Export)。 - API バージョン: [Base API(v1)] を選択します。
- スコープ/権限: company:activity:read を有効にします(会社のアクティビティに必要)。
- 名前: 一意でわかりやすい名前を入力します(例:
- [作成] をクリックします。
トークン値をコピーして安全な場所に保存します。これは署名なしトークンとして使用します。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込む権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
rippling-logs-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect Rippling activity logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次の操作を行います。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
rippling-logs-collector-sa@your-project.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
rippling-activity-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Rippling Company Activity API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 rippling-activity-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、トピック
rippling-activity-triggerを選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウント
rippling-logs-collector-saを選択します。
- サービス アカウント: サービス アカウント
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 GCS_BUCKETrippling-activity-logsGCS_PREFIXrippling/activity/STATE_KEYrippling/activity/state.jsonRIPPLING_API_TOKENyour-api-tokenRIPPLING_ACTIVITY_URLhttps://api.rippling.com/platform/api/company_activityLIMIT1000MAX_PAGES10LOOKBACK_MINUTES60END_LAG_SECONDS120[変数とシークレット] タブで [リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[コンテナ] の [設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [完了] をクリックします。
- [リソース] セクションで次の操作を行います。
[実行環境] までスクロールします。
- [デフォルト](推奨)を選択します。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time # Initialize HTTP client http = urllib3.PoolManager(timeout=urllib3.Timeout(connect=5.0, read=60.0), retries=False) # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch logs from Rippling Company Activity API and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'rippling/activity/') state_key = os.environ.get('STATE_KEY', 'rippling/activity/state.json') # Rippling API configuration api_token = os.environ.get('RIPPLING_API_TOKEN') activity_url = os.environ.get('RIPPLING_ACTIVITY_URL', 'https://api.rippling.com/platform/api/company_activity') limit = int(os.environ.get('LIMIT', '1000')) max_pages = int(os.environ.get('MAX_PAGES', '10')) lookback_minutes = int(os.environ.get('LOOKBACK_MINUTES', '60')) end_lag_seconds = int(os.environ.get('END_LAG_SECONDS', '120')) if not all([bucket_name, api_token]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Load state (last processed timestamp and cursor) state = load_state(bucket, state_key) since_iso = state.get('since') next_cursor = state.get('next') # Calculate time window run_end = datetime.now(timezone.utc) - timedelta(seconds=end_lag_seconds) end_iso = run_end.replace(microsecond=0).isoformat().replace('+00:00', 'Z') if since_iso is None: since_iso = iso_from_epoch(time.time() - lookback_minutes * 60) else: try: since_iso = (parse_iso(since_iso) + timedelta(seconds=1)).replace(microsecond=0).isoformat().replace('+00:00', 'Z') except Exception: since_iso = iso_from_epoch(time.time() - lookback_minutes * 60) print(f'Processing logs from {since_iso} to {end_iso}') run_ts_iso = end_iso pages = 0 total = 0 newest_ts = None pending_next = None # Fetch logs with pagination while pages < max_pages: params = {'limit': str(limit)} if next_cursor: params['next'] = next_cursor else: params['startDate'] = since_iso params['endDate'] = end_iso # Build URL with query parameters url = build_url(activity_url, params) # Fetch data from Rippling API headers = { 'Authorization': f'Bearer {api_token}', 'Accept': 'application/json' } # Implement exponential backoff for rate limiting backoff = 1.0 max_retries = 3 retry_count = 0 while retry_count < max_retries: response = http.request('GET', url, headers=headers, timeout=60.0) if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f'Rate limited (429). Retrying after {retry_after}s...') time.sleep(retry_after) backoff = min(backoff * 2, 30.0) retry_count += 1 continue break if response.status != 200: print(f'Error: API returned status {response.status}') break data = json.loads(response.data.decode('utf-8')) # Write page to GCS write_to_gcs(bucket, prefix, data, run_ts_iso, pages) # Extract events events = data.get('events') or [] total += len(events) if isinstance(events, list) else 0 # Track newest timestamp if isinstance(events, list): for ev in events: t = ev.get('timestamp') or ev.get('time') or ev.get('event_time') if isinstance(t, str): try: dt_ts = parse_iso(t) if newest_ts is None or dt_ts > newest_ts: newest_ts = dt_ts except Exception: pass # Check for next page nxt = data.get('next') pages += 1 if nxt: next_cursor = nxt pending_next = nxt continue else: pending_next = None break # Update state new_since_iso = (newest_ts or run_end).replace(microsecond=0).isoformat().replace('+00:00', 'Z') save_state(bucket, state_key, {'since': new_since_iso, 'next': pending_next}) print(f'Successfully processed {total} events across {pages} pages') print(f'Updated state: since={new_since_iso}, next={pending_next}') except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return {} def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, separators=(',', ':')), content_type='application/json' ) except Exception as e: print(f'Warning: Could not save state: {str(e)}') def write_to_gcs(bucket, prefix, payload, run_ts_iso, page_index): """Write payload to GCS.""" try: day_path = parse_iso(run_ts_iso).strftime('%Y/%m/%d') key = f"{prefix.strip('/')}/{day_path}/{run_ts_iso.replace(':', '').replace('-', '')}-page{page_index:05d}-company_activity.json" blob = bucket.blob(key) blob.upload_from_string( json.dumps(payload, separators=(',', ':')), content_type='application/json' ) print(f'Wrote page {page_index} to {key}') except Exception as e: print(f'Error writing to GCS: {str(e)}') raise def parse_iso(ts): """Parse ISO 8601 timestamp.""" if ts.endswith('Z'): ts = ts[:-1] + '+00:00' return datetime.fromisoformat(ts) def iso_from_epoch(sec): """Convert epoch seconds to ISO 8601 timestamp.""" return datetime.fromtimestamp(sec, tz=timezone.utc).replace(microsecond=0).isoformat().replace('+00:00', 'Z') def build_url(base, params): """Build URL with query parameters.""" if not params: return base query_string = '&'.join([f'{k}={v}' for k, v in params.items()]) return f'{base}?{query_string}'- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 rippling-activity-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック トピック rippling-activity-triggerを選択します。メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 5 分毎 */5 * * * *大容量、低レイテンシ 15 分ごと */15 * * * *検索量が普通 1 時間ごと 0 * * * *標準(推奨) 6 時間ごと 0 */6 * * *少量、バッチ処理 毎日 0 0 * * *履歴データの収集
スケジューラ ジョブをテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして手動でトリガーします。
- 数秒待ってから、[Cloud Run> サービス> rippling-activity-collector > ログ] に移動します。
- 関数が正常に実行されたことを確認します。
- GCS バケットをチェックして、ログが書き込まれたことを確認します。
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Rippling Activity Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Rippling アクティビティ ログ] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
Rippling アクティビティ ログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Rippling Activity Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Rippling アクティビティ ログ] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://rippling-activity-logs/rippling/activity/次のように置き換えます。
rippling-activity-logs: GCS バケット名。rippling/activity/: ログが保存される接頭辞/フォルダパス(GCS_PREFIX環境変数と一致する必要があります)。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間(
rippling.activityなど)。Ingestion labels: このフィードのイベントに適用される省略可能なラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。