Rippling のアクティビティ ログを収集する
このドキュメントでは、Amazon S3 を使用して Rippling アクティビティ ログを Google Security Operations に取り込む方法について説明します。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス。
- Rippling への特権アクセス(会社のアクティビティへのアクセス権を持つ API トークン)。
- AWS(S3、Identity and Access Management(IAM)、Lambda、EventBridge)への特権アクセス。
Rippling の前提条件を確認する
- Rippling Admin にログインします。
- [検索] > [API トークン] を開きます。
別の方法: [設定] > [会社設定] > [API トークン]。 - [API トークンを作成] をクリックします。
- 次の構成の詳細を入力します。
- 名前: 一意で意味のある名前を指定します(例:
Google SecOps S3 Export
)。 - API バージョン: ベース API(v1)
- スコープ/権限:
company:activity:read
を有効にします(会社のアクティビティに必要)。
- 名前: 一意で意味のある名前を指定します(例:
- [作成] をクリックし、トークン値を安全な場所に保存します。(これはベアラートークンとして使用します)。
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
rippling-activity-logs
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションの [権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索します。
- ポリシーを選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM] > [ポリシー] に移動します。
- [ポリシーを作成> [JSON] タブ] をクリックします。
- 次のポリシーをコピーして貼り付けます。
ポリシー JSON(別のバケットまたは接頭辞を入力した場合は、値を置き換えます)。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::rippling-activity-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::rippling-activity-logs/rippling/activity/state.json" } ] } ````
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
WriteRipplingToS3Role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 rippling_activity_to_s3
ランタイム Python 3.13 アーキテクチャ x86_64 実行ロール WriteRipplingToS3Role
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
rippling_activity_to_s3.py
)を貼り付けます。#!/usr/bin/env python3 # Lambda: Pull Rippling Company Activity logs to S3 (raw JSON, no transforms) import os, json, time, urllib.parse from urllib.request import Request, urlopen from datetime import datetime, timezone, timedelta import boto3 API_TOKEN = os.environ["RIPPLING_API_TOKEN"] ACTIVITY_URL = os.environ.get("RIPPLING_ACTIVITY_URL", "https://api.rippling.com/platform/api/company_activity") S3_BUCKET = os.environ["S3_BUCKET"] S3_PREFIX = os.environ.get("S3_PREFIX", "rippling/activity/") STATE_KEY = os.environ.get("STATE_KEY", "rippling/activity/state.json") LIMIT = int(os.environ.get("LIMIT", "1000")) MAX_PAGES = int(os.environ.get("MAX_PAGES", "10")) LOOKBACK_MINUTES = int(os.environ.get("LOOKBACK_MINUTES", "60")) END_LAG_SECONDS = int(os.environ.get("END_LAG_SECONDS", "120")) s3 = boto3.client("s3") def _headers(): return {"Authorization": f"Bearer {API_TOKEN}", "Accept": "application/json"} def _get_state(): try: obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) j = json.loads(obj["Body"].read()) return {"since": j.get("since"), "next": j.get("next")} except Exception: return {"since": None, "next": None} def _put_state(since_iso, next_cursor): body = json.dumps({"since": since_iso, "next": next_cursor}, separators=(",", ":")).encode("utf-8") s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body) def _get(url): req = Request(url, method="GET") for k, v in _headers().items(): req.add_header(k, v) with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) def _build_url(base, params): qs = urllib.parse.urlencode(params) return f"{base}?{qs}" if qs else base def _parse_iso(ts): if ts.endswith("Z"): ts = ts[:-1] + "+00:00" return datetime.fromisoformat(ts) def _iso_from_epoch(sec): return datetime.fromtimestamp(sec, tz=timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def _write(payload, run_ts_iso, page_index, source="company_activity"): day_path = _parse_iso(run_ts_iso).strftime("%Y/%m/%d") key = f"{S3_PREFIX.strip('/')}/{day_path}/{run_ts_iso.replace(':','').replace('-','')}-page{page_index:05d}-{source}.json" s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8")) return key def lambda_handler(event=None, context=None): state = _get_state() run_end = datetime.now(timezone.utc) - timedelta(seconds=END_LAG_SECONDS) end_iso = run_end.replace(microsecond=0).isoformat().replace("+00:00", "Z") since_iso = state["since"] next_cursor = state["next"] if since_iso is None: since_iso = _iso_from_epoch(time.time() - LOOKBACK_MINUTES * 60) else: try: since_iso = (_parse_iso(since_iso) + timedelta(seconds=1)).replace(microsecond=0).isoformat().replace("+00:00", "Z") except Exception: since_iso = _iso_from_epoch(time.time() - LOOKBACK_MINUTES * 60) run_ts_iso = end_iso pages = 0 total = 0 newest_ts = None pending_next = None while pages < MAX_PAGES: params = {"limit": str(LIMIT)} if next_cursor: params["next"] = next_cursor else: params["startDate"] = since_iso params["endDate"] = end_iso url = _build_url(ACTIVITY_URL, params) data = _get(url) _write(data, run_ts_iso, pages) events = data.get("events") or data.get("items") or data.get("data") or [] total += len(events) if isinstance(events, list) else 0 if isinstance(events, list): for ev in events: t = ev.get("timestamp") or ev.get("time") or ev.get("event_time") if isinstance(t, str): try: dt_ts = _parse_iso(t) if newest_ts is None or dt_ts > newest_ts: newest_ts = dt_ts except Exception: pass nxt = data.get("next") or data.get("next_cursor") or None pages += 1 if nxt: next_cursor = nxt pending_next = nxt continue else: pending_next = None break new_since_iso = (newest_ts or run_end).replace(microsecond=0).isoformat().replace("+00:00", "Z") _put_state(new_since_iso, pending_next) return {"ok": True, "pages": pages, "events": total, "since": new_since_iso, "next": pending_next}
[構成] > [環境変数] に移動します。
[編集>新しい環境変数を追加] をクリックします。
次の表に示す環境変数を入力し、サンプル値を実際の値に置き換えます。
環境変数
キー 値の例 S3_BUCKET
rippling-activity-logs
S3_PREFIX
rippling/activity/
STATE_KEY
rippling/activity/state.json
RIPPLING_API_TOKEN
your-api-token
RIPPLING_ACTIVITY_URL
https://api.rippling.com/platform/api/company_activity
LIMIT
1000
MAX_PAGES
10
LOOKBACK_MINUTES
60
END_LAG_SECONDS
120
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数
rippling_activity_to_s3
。 - 名前:
rippling-activity-logs-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
(省略可)Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [Users] > [Add users] に移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 「
secops-reader
」と入力します。 - アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
- ユーザー: 「
- [ユーザーを作成] をクリックします。
- 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::rippling-activity-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::rippling-activity-logs" } ] }
名前 =
secops-reader-policy
。[ポリシーを作成> 検索/選択> 次へ> 権限を追加] をクリックします。
secops-reader
のアクセスキーを作成します。[セキュリティ認証情報] > [アクセスキー] に移動します。[アクセスキーを作成] をクリックします。
.CSV
をダウンロードします。(これらの値はフィードに貼り付けます)。
Rippling アクティビティ ログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Rippling Activity Logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Rippling アクティビティ ログ] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://rippling-activity-logs/rippling/activity/
- Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- アセットの Namespace:
rippling.activity
- 省略可: 取り込みラベル: 取り込みラベルを追加します。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。