1Password のログを収集する
このドキュメントでは、Google Cloud Storage V2 を使用して 1Password ログを Google Security Operations に取り込む方法について説明します。
1Password は、チームが認証情報、シークレット、機密情報を安全に保存、共有、管理できるようにするパスワード管理プラットフォームです。1Password Events API を使用すると、1Password Business アカウントのログイン試行とアイテムの使用状況データにアクセスできます。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- 1Password Business アカウント
- 1Password アカウントのオーナーまたは管理者ロール
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( onepassword-secops-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(アクセス頻度の高いログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
1Password の認証情報を収集する
イベント レポートの統合を設定する
- 1Password.com でアカウントにログインします。
- サイドバーで [統合] を選択します。
- 他の統合をすでに設定している場合は、[統合] ページで [ディレクトリ] を選択します。
- [イベント レポート] セクションで、[その他] を選択します。
- [名前] フィールドに、統合の名前を入力します(例:
Google SecOps Integration)。 - [統合を追加] をクリックします。
署名なしトークンを作成する
- 統合の詳細ページで、[ベアラートークンを追加] をクリックします。
- 次の構成の詳細を指定します。
- トークン名: わかりやすい名前を入力します(例:
SecOps GCS Collector - SignIn and ItemUsage)。 - Expires After: 必要に応じて有効期限を選択します。有効期限のないトークンには [なし] を選択し、有効期限のあるトークンには [30 日]、[90 日]、[180 日] のいずれかを選択します。
- [レポートするイベント]: 次のチェックボックスをオンにします。
- ログイン試行
- アイテム使用イベント
- トークン名: わかりやすい名前を入力します(例:
- [Issue Token] をクリックします。
[Save your token] ページで、[Save in 1Password] をクリックするか、トークンをコピーして安全な場所に保存します。
[統合の詳細を表示] をクリックして、統合が有効になっていることを確認します。
Events API のベース URL を確認する
ベース URL は、1Password アカウントをホストするサーバーによって異なります。
| アカウントが | Events API のベース URL は次のとおりです。 |
|---|---|
1password.com |
https://events.1password.com |
ent.1password.com |
https://events.ent.1password.com |
1password.ca |
https://events.1password.ca |
1password.eu |
https://events.1password.eu |
テスト API へのアクセス
統合に進む前に、認証情報をテストします。
# Replace with your actual bearer token and base URL BEARER_TOKEN="<your-bearer-token>" API_BASE="https://events.1password.com" # Test API access using the introspect endpoint curl -v \ -H "Authorization: Bearer $BEARER_TOKEN" \ "$API_BASE/api/v2/auth/introspect"
成功したレスポンスは、トークンがアクセスできるイベントタイプ(["signinattempts", "itemusages"] など)を一覧表示する features フィールドを含む JSON オブジェクトを返します。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
onepassword-logs-collector-sa」と入力します。 - サービス アカウントの説明:
Service account for Cloud Run function to collect 1Password logsと入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット(
onepassword-secops-logs)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: 「
onepassword-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com」と入力します。 - ロールを割り当てる: [ストレージ オブジェクト管理者] を選択します。
- プリンシパルを追加: 「
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
onepassword-logs-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、1Password Events API からログイン試行ログとアイテム使用ログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 onepassword-logs-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、[
onepassword-logs-trigger] を選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント:
onepassword-logs-collector-saを選択します。
- サービス アカウント:
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKETonepassword-secops-logsGCS バケット名 GCS_PREFIXonepasswordログファイルの接頭辞 STATE_KEYonepassword/state.json状態ファイルのパス OP_API_BASEhttps://events.1password.com1Password Events API のベース URL OP_BEARER_TOKEN<your-bearer-token>1Password Events API ベアラー トークン MAX_RECORDS10000実行あたりのエンドポイントあたりの最大レコード数 PAGE_SIZE1000ページあたりのレコード数(最大 1,000) LOOKBACK_HOURS24最初のルックバック期間(時間単位)
[変数とシークレット] セクションで、[リクエスト] までスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: 1 を選択します。
- [リソース] セクションで次の操作を行います。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数]: 「
0」と入力します。 - インスタンスの最大数:
100と入力します(予想される負荷に基づいて調整します)。
- [インスタンスの最小数]: 「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスが作成されると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [エントリ ポイント] フィールドに「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'onepassword') STATE_KEY = os.environ.get('STATE_KEY', 'onepassword/state.json') API_BASE = os.environ.get('OP_API_BASE') BEARER_TOKEN = os.environ.get('OP_BEARER_TOKEN') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '10000')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) # 1Password Events API v2 endpoints for sign-in and item usage ENDPOINTS = { 'signinattempts': '/api/v2/signinattempts', 'itemusages': '/api/v2/itemusages', } def parse_datetime(value: str) -> datetime: """Parse RFC 3339 datetime string to datetime object.""" if value.endswith('Z'): value = value[:-1] + '+00:00' return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch 1Password sign-in attempt and item usage logs and write them to GCS as NDJSON. """ if not all([GCS_BUCKET, API_BASE, BEARER_TOKEN]): print('Error: Missing required environment variables ' '(GCS_BUCKET, OP_API_BASE, OP_BEARER_TOKEN)') return try: bucket = storage_client.bucket(GCS_BUCKET) # Load state (stores cursors per endpoint) state = load_state(bucket, STATE_KEY) now = datetime.now(timezone.utc) total_records = 0 for event_type, path in ENDPOINTS.items(): print(f'--- Processing endpoint: {event_type} ---') saved_cursor = state.get(f'cursor_{event_type}') records, last_cursor = fetch_endpoint( api_base=API_BASE, path=path, bearer_token=BEARER_TOKEN, saved_cursor=saved_cursor, lookback_hours=LOOKBACK_HOURS, page_size=PAGE_SIZE, max_records=MAX_RECORDS, ) if records: timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = (f'{GCS_PREFIX}/' f'{event_type}_{timestamp}.ndjson') blob = bucket.blob(object_key) ndjson = '\n'.join( json.dumps(r, ensure_ascii=False) for r in records ) + '\n' blob.upload_from_string( ndjson, content_type='application/x-ndjson' ) print(f'Wrote {len(records)} {event_type} records ' f'to gs://{GCS_BUCKET}/{object_key}') # Save cursor even if no records (for next poll) if last_cursor: state[f'cursor_{event_type}'] = last_cursor total_records += len(records) # Save state with all cursors state['last_run'] = now.isoformat() save_state(bucket, STATE_KEY, state) print(f'Successfully processed {total_records} total records') except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {e}') return {} def save_state(bucket, key, state: dict): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json', ) print(f'Saved state: {json.dumps(state)}') except Exception as e: print(f'Warning: Could not save state: {e}') def fetch_endpoint(api_base, path, bearer_token, saved_cursor, lookback_hours, page_size, max_records): """ Fetch events from a single 1Password Events API v2 endpoint. The 1Password Events API uses cursor-based pagination with POST requests. The first request sends a ResetCursor object with optional start_time and limit. Subsequent requests send the cursor returned from the previous response. Args: api_base: Events API base URL path: Endpoint path bearer_token: JWT bearer token saved_cursor: Cursor from previous run (or None) lookback_hours: Hours to look back on first run page_size: Max events per page (1-1000) max_records: Max total events per run Returns: Tuple of (records list, last_cursor string) """ url = f'{api_base.rstrip("/")}{path}' headers = { 'Authorization': f'Bearer {bearer_token}', 'Content-Type': 'application/json', 'Accept': 'application/json', 'User-Agent': 'GoogleSecOps-1PasswordCollector/1.0', } records = [] cursor = saved_cursor page_num = 0 backoff = 1.0 while True: page_num += 1 if len(records) >= max_records: print(f'Reached max_records limit ({max_records})') break # Build request body if cursor: # Continuing cursor: resume from last position body = json.dumps({'cursor': cursor}) else: # ResetCursor: first request or no saved state start_time = ( datetime.now(timezone.utc) - timedelta(hours=lookback_hours) ) body = json.dumps({ 'limit': page_size, 'start_time': start_time.strftime( '%Y-%m-%dT%H:%M:%SZ' ), }) try: response = http.request( 'POST', url, body=body, headers=headers, ) # Handle rate limiting (600 req/min, 30000 req/hour) if response.status == 429: retry_after = int( response.headers.get( 'Retry-After', str(int(backoff)) ) ) print(f'Rate limited (429). Retrying after ' f'{retry_after}s...') time.sleep(retry_after) backoff = min(backoff * 2, 60.0) continue backoff = 1.0 if response.status != 200: response_text = response.data.decode('utf-8') print(f'HTTP Error {response.status}: ' f'{response_text}') break data = json.loads(response.data.decode('utf-8')) page_items = data.get('items', []) cursor = data.get('cursor') has_more = data.get('has_more', False) if page_items: print(f'Page {page_num}: Retrieved ' f'{len(page_items)} events') records.extend(page_items) if not has_more: print(f'No more pages (has_more=false)') break if not cursor: print(f'No cursor returned, stopping') break except urllib3.exceptions.HTTPError as e: print(f'HTTP error: {str(e)}') break except Exception as e: print(f'Error fetching events: {str(e)}') break print(f'Retrieved {len(records)} total records ' f'from {page_num} pages') return records, cursor2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0
[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 onepassword-logs-collector-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック onepassword-logs-triggerを選択メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
| 頻度 | CRON 式 | ユースケース |
|---|---|---|
| 5 分おき | */5 * * * * |
大容量、低レイテンシ |
| 15 分ごと | */15 * * * * |
検索量が普通 |
| 1 時間ごと | 0 * * * * |
標準(推奨) |
| 6 時間ごと | 0 */6 * * * |
少量、バッチ処理 |
| 毎日 | 0 0 * * * |
履歴データの収集 |
統合をテストする
- Cloud Scheduler コンソールで、
onepassword-logs-collector-hourlyを見つけます。 - [強制実行] をクリックして、ジョブを手動でトリガーします。
- 数秒待ちます。
- Cloud Run > サービスに移動します。
- [
onepassword-logs-collector] をクリックします。 - [Logs] タブをクリックします。
関数が正常に実行されたことを確認します。以下のものを探します。
--- Processing endpoint: signinattempts --- Page 1: Retrieved X events Wrote X signinattempts records to gs://onepassword-secops-logs/onepassword/signinattempts_YYYYMMDD_HHMMSS.ndjson --- Processing endpoint: itemusages --- Page 1: Retrieved X events Wrote X itemusages records to gs://onepassword-secops-logs/onepassword/itemusages_YYYYMMDD_HHMMSS.ndjson Successfully processed X total records[Cloud Storage] > [バケット] に移動します。
[
onepassword-secops-logs] をクリックします。onepassword/フォルダに移動します。新しい
.ndjsonファイルが現在のタイムスタンプで作成されたことを確認します。
ログにエラーが表示された場合:
- HTTP 401:
OP_BEARER_TOKEN環境変数のベアラートークンを確認します。トークンの有効期限が切れている可能性があります。 - HTTP 429: レート制限。関数はバックオフで自動的に再試行します。1Password Events API では、1 分あたり 600 件のリクエストと 1 時間あたり 30,000 件のリクエストが許可されます。
- 環境変数が不足している: 必要な変数がすべて Cloud Run functions の構成で設定されていることを確認します。
- レコードが返されない: イントロスペクション エンドポイントを使用して、ベアラートークンがリクエストされたイベントタイプにアクセスできることを確認します。
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
1Password Sign-In and Item Usage Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [1Password] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
[次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://onepassword-secops-logs/onepassword/
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます(デフォルトは 180 日)。
アセットの名前空間: アセットの名前空間
Ingestion labels: このフィードのイベントに適用されるラベル
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- [
onepassword-secops-logs] をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [Storage オブジェクト閲覧者] を選択します。
[保存] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | 論理 |
|---|---|---|
| category | security_result.category_details | この値は、未加工ログの category フィールドから取得されます。 |
| client.app_name | principal.application | この値は、未加工ログの client.app_name フィールドから取得されます。 |
| client.app_version | metadata.product_version | この値は、未加工ログの client.app_version フィールドから取得されます。 |
| client.ip_address | principal.ip | この値は、未加工ログの client.ip_address フィールドから取得されます。 |
| client.os_name | principal.platform | 値は未加工ログの client.os_name フィールドから取得され、対応する UDM プラットフォーム値(LINUX、WINDOWS、MAC)にマッピングされます。 |
| client.os_version | principal.platform_version | この値は、未加工ログの client.os_version フィールドから取得されます。 |
| client.platform_name | principal.resource.attribute.labels | この値は、未加工ログの client.platform_name フィールドから取得されます。 |
| client.platform_version | principal.asset.platform_software.platform_version | この値は、未加工ログの client.platform_version フィールドから取得されます。 |
| country | principal.location.country_or_region | location.country が存在しない場合、この値は未加工ログの country フィールドから取得されます。 |
| item_uuid | security_result.about.resource.attribute.labels | この値は、未加工ログの item_uuid フィールドから取得されます。 |
| location.city | principal.location.city | この値は、未加工ログの location.city フィールドから取得されます。 |
| location.country | principal.location.country_or_region | この値は、未加工ログの location.country フィールドから取得されます。 |
| location.latitude | principal.location.region_latitude | この値は、未加工ログの location.latitude フィールドから取得されます。 |
| location.longitude | principal.location.region_longitude | この値は、未加工ログの location.longitude フィールドから取得されます。 |
| location.region | principal.location.name | この値は、未加工ログの location.region フィールドから取得されます。 |
| session.ip | principal.ip | この値は、未加工ログの session.ip フィールドから取得されます。 |
| session_uuid | network.session_id | この値は、未加工ログの session_uuid フィールドから取得されます。 |
| target_user.email | target.user.email_addresses | この値は、未加工ログの target_user.email フィールドから取得されます。 |
| target_user.uuid | target.user.userid | この値は、未加工ログの target_user.uuid フィールドから取得されます。 |
| timestamp | metadata.event_timestamp | 値は、未加工ログの timestamp フィールドから取得され、秒とナノ秒に変換されます。 |
| type | additional.fields | この値は、未加工ログの type フィールドから取得されます。 |
| user.email | principal.user.email_addresses | この値は、未加工ログの user.email フィールドから取得されます。 |
| user.name | principal.user.user_display_name | この値は、未加工ログの user.name フィールドから取得されます。 |
| used_version | additional.fields | この値は、未加工ログの used_version フィールドから取得されます。 |
| uuid | principal.resource.attribute.labels | この値は、未加工ログの uuid フィールドから取得されます。 |
| vault_uuid | security_result.about.resource.attribute.labels | この値は、未加工ログの vault_uuid フィールドから取得されます。 |
| なし | extensions.auth | このフィールドには空のオブジェクトが作成されます。 |
| なし | metadata.event_type | カテゴリが success または firewall_reported_success の場合は USER_LOGIN、ユーザー情報がない場合は STATUS_UPDATE、それ以外の場合は USER_UNCATEGORIZED に設定します。 |
| なし | metadata.log_type | ONEPASSWORD に設定します。 |
| なし | metadata.product_name | ONEPASSWORD に設定します。 |
| なし | metadata.vendor_name | ONEPASSWORD に設定します。 |
| なし | security_result.action | カテゴリが success または firewall_reported_success の場合は ALLOW に、カテゴリが credentials_failed、mfa_failed、modern_version_failed、firewall_failed の場合は BLOCK に設定します。それ以外の場合は空のままにします。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。