ポリシーの作成、パッチ適用、削除

このページでは、ポリシーの作成、パッチ適用、削除の方法を示すコードサンプルを提供します。

始める前に

Policy API の設定の手順を完了していることを確認します。

一般的なユーティリティ

インポート、定数、ヘルパー関数の次のスニペットは、このページの作成、パッチ、削除スクリプトで再利用されます。

from collections.abc import Mapping, Sequence
import json
import pprint
from typing import Any, Optional
import urllib.error
import urllib.request
from absl import app
from absl import flags
import google.auth
from google.auth import iam
from google.auth.transport import requests
from google.oauth2 import service_account


AUTH_SCOPES = ['https://www.googleapis.com/auth/iam']
POLICY_SCOPES = ['https://www.googleapis.com/auth/cloud-identity.policies']
BASE_URL = 'https://cloudidentity.googleapis.com/'
VERSIONED_BASE_URL = f'{BASE_URL}v1/'
TOKEN_URI = 'https://accounts.google.com/o/oauth2/token'


_ADMIN_EMAIL = flags.DEFINE_string(
    name='admin_email',
    default=None,
    help='Administrator email to call as',
    required=True,
)

_POLICY_NAME = flags.DEFINE_string(
    name='policy_name',
    default=None,
    help='The resource name of the policy, e.g., policies/12345',
    required=True,
)

def create_delegated_credentials(
    admin_email: str
) -> google.auth.credentials.Credentials:
  """Creates delegated credentials for the user.

  Args:
    admin_email: The administrator email to call as.

  Returns:
    The delegated credentials for the user.
  """

  # Fetch application default credentials (ADC)
  credentials, _ = google.auth.default(scopes=AUTH_SCOPES)

  # Populate account information
  request = requests.Request()
  credentials.refresh(request)

  # Create an IAM signer
  signer = iam.Signer(request, credentials,
                      credentials.service_account_email)

  # Create domain-wide delegated (DWD) credentials
  delegated_credentials = service_account.Credentials(
      signer=signer,
      service_account_email=credentials.service_account_email,
      token_uri=TOKEN_URI,
      scopes=POLICY_SCOPES,
      subject=admin_email
  )

  return delegated_credentials

def print_json_response(response: Mapping[str, Any]) -> None:
  pp = pprint.PrettyPrinter(indent=4)
  pp.pprint(response)


def get_access_token(admin_email: str) -> str:
  """Creates delegated credentials and returns an access token.

  Args:
    admin_email: The administrator email to call as.

  Returns:
    The access token.
  """
  dc = create_delegated_credentials(admin_email)
  dc.refresh(requests.Request())
  if dc.token is None:
    raise ValueError(
        'Failed to refresh credentials and obtain an access token.'
    )
  return dc.token


def execute_request(request: urllib.request.Request, message: str) -> Optional[Mapping[str, Any]]:
  """Executes a request and prints the response.

  Args:
    request: The request to execute.
    message: The message to print before the response.

  Returns:
    Policy API response or None in case of exception.
  """
  try:
    with urllib.request.urlopen(request) as response:
      content = response.read()
      op = json.loads(content)
      print(message)
      print_json_response(op)
      return op

  except urllib.error.HTTPError as e:
    print(f'HTTPError: {e.code} {e.reason}')
    print(e.read().decode('utf-8'))

ポリシーを作成

次の例は、Python を使用して組織にポリシーを作成する方法を示しています。この例では、データ保護ルールとデータ保護検出機能を作成する方法を具体的に示します。

"""Script to interact with the Cloud Identity Policies API.

This script lets you create delegated credentials, build, and
send requests to the Create Policy API. It includes a sample for creating a Data
Protection rule and detector.
"""


def build_create_policy_request(
    access_token: str,
    policy_payload: Mapping[str, Any]
) -> urllib.request.Request:
  """Builds the request for the Create Policy API.

  Args:
    access_token: The access token for the API.
    policy_payload: The dictionary representing the policy to create.

  Returns:
    The request for the Create Policy API.
  """
  create_url = f'{VERSIONED_BASE_URL}policies'
  data = json.dumps(policy_payload).encode('utf-8')
  request = urllib.request.Request(create_url, data=data, method='POST')
  request.add_header('Authorization', 'Bearer ' + access_token)
  request.add_header('Content-Type', 'application/json')
  return request


def call_create_policy_endpoint(
    access_token: str, policy_payload: Mapping[str, Any]
) -> Optional[Mapping[str, Any]]:
  """Calls the Create Policy API.

  Args:
    access_token: The access token for the API.
    policy_payload: The policy payload to create.

  Returns:
    The response from the Create Policy API, or None if the request failed.
  """
  request = build_create_policy_request(access_token, policy_payload)
  execute_request(request, 'Create Policy operation started:')


def main(argv: Sequence[str]):
  if len(argv) > 1:
    raise app.UsageError('Too many command-line arguments.')

  access_token = get_access_token(_ADMIN_EMAIL.value)

  # Holistic sample for creating drive_action rule.
  dlp_rule_payload = {
      'customer': 'customers/<customer_id>',
      'policyQuery': {
          'orgUnit': 'orgUnits/<org_unit_id>',
      },
      'setting': {
          'type': 'settings/rule.dlp',
          'value': {
              'displayName': 'sample rule creation',
              'state': 'ACTIVE',
              'triggers': ['google.workspace.drive.file.v1.share'],
              'condition': {
                  'contentCondition': 'all_content.matches_dlp_detector(\'US_SOCIAL_SECURITY_NUMBER\', google.privacy.dlp.v2.Likelihood.LIKELY, {minimum_match_count: 1, minimum_unique_match_count: 1})'
              },
              'action': {
                  'driveAction': {
                      'warnUser': {},
                  },
              },
          },
      },
  }

  # Holistic sample for creating wordList detector
  dlp_detector_payload = {
      'customer': 'customers/<customer_id>',
      'policyQuery': {
          'orgUnit': 'orgUnits/<org_unit_id>',
      },
      'setting': {
          'type': 'settings/detector.word_list',
          'value': {
              'displayName': 'Project Sensitive Terms',
              'description': (
                  'Detector for project-specific confidential keywords.'
              ),
              'wordList': {
                  'words': [
                      'confidential',
                      'internal-only',
                      'top-secret',
                      'project-x',
                  ]
              },
          },
      },
  }

  rule_response = call_create_policy_endpoint(access_token, dlp_rule_payload)
  detector_response = call_create_policy_endpoint(access_token, dlp_detector_payload)


if __name__ == '__main__':
  app.run(main)


ポリシーを修正する

次の例は、Python を使用して組織のポリシーにパッチを適用する方法を示しています。この例では、データ保護ルールを更新して、単語リスト検出器を条件として追加する方法を示します。

"""Script to interact with the Cloud Identity Policies API.

This script provides functionality to patch a policy, build, and
send requests to the Patch Policy API. It includes a sample for updating a Data
Protection rule.
"""

from collections.abc import Mapping, Sequence
import json
import pprint
from typing import Any
import urllib.error
import urllib.request
from absl import app
from absl import flags
import google.auth
from google.auth import iam
from google.auth.transport import requests
from google.oauth2 import service_account


AUTH_SCOPES = ['https://www.googleapis.com/auth/iam']
POLICY_SCOPES = ['https://www.googleapis.com/auth/cloud-identity.policies']
BASE_URL = 'https://cloudidentity.googleapis.com/'
VERSIONED_BASE_URL = f'{BASE_URL}v1/'
TOKEN_URI = 'https://accounts.google.com/o/oauth2/token'


_ADMIN_EMAIL = flags.DEFINE_string(
    name='admin_email',
    default=None,
    help='Administrator email to call as',
    required=True,
)
_POLICY_NAME = flags.DEFINE_string(
    name='policy_name',
    default=None,
    help='The resource name of the policy to patch, e.g., policies/12345',
    required=True,
)


def create_delegated_credentials(
    admin_email: str
) -> google.auth.credentials.Credentials:
  """Creates delegated credentials for the user.

  Args:
    admin_email: The administrator email to call as.

  Returns:
    The delegated credentials for the user.
  """

  # Fetch application default credentials (ADC)
  credentials, _ = google.auth.default(scopes=AUTH_SCOPES)

  # Populate account information
  request = requests.Request()
  credentials.refresh(request)

  # Create an IAM signer
  signer = iam.Signer(request, credentials,
                      credentials.service_account_email)

  # Create domain-wide delegated (DWD) credentials
  delegated_credentials = service_account.Credentials(
      signer=signer,
      service_account_email=credentials.service_account_email,
      token_uri=TOKEN_URI,
      scopes=POLICY_SCOPES,
      subject=admin_email
  )

  return delegated_credentials


def build_patch_policy_request(
    access_token: str,
    policy_name: str,
    policy_payload: Mapping[str, Any]
) -> urllib.request.Request:
  """Builds the request for the Patch Policy API.

  Args:
    access_token: The access token for the API.
    policy_name: The resource name of the policy to patch.
    policy_payload: The dictionary representing the policy fields to patch.

  Returns:
    The request for the Patch Policy API.
  """
  patch_url = f'{VERSIONED_BASE_URL}{policy_name}'
  data = json.dumps(policy_payload).encode('utf-8')
  request = urllib.request.Request(patch_url, data=data, method='PATCH')
  request.add_header('Authorization', 'Bearer ' + access_token)
  request.add_header('Content-Type', 'application/json')
  return request


def call_patch_policy_endpoint(
    access_token: str,
    policy_name: str,
    policy_payload: Mapping[str, Any]
) -> None:
  """Calls the Patch Policy API.

  Args:
    access_token: The access token for the API.
    policy_name: The policy name to patch.
    policy_payload: The policy payload for patch.
  """
  request = build_patch_policy_request(
      access_token, policy_name, policy_payload
  )
  execute_request(request, 'Patch Policy operation started:')


def main(argv: Sequence[str]):
  if len(argv) > 2:
    raise app.UsageError('Too many command-line arguments.')

  access_token = get_access_token(_ADMIN_EMAIL.value)

  # Sample for patching a rule policy to add a condition for a word list
  # detector. Replace <detector_response.name> with the name of a detector
  # policy created in the previous script.
  patch_policy_payload = {
      'policyQuery': {
          'orgUnit': 'orgUnits/<org_unit_id>'
      },
      'setting': {
          'type': 'settings/rule.dlp',
          'value': {
              'displayName': 'Warn users for sharing of custom sensitive data',
              'description': 'Rule triggered by custom word list detector',
              'triggers': [
                  'google.workspace.drive.file.v1.share'
              ],
              'condition': {
                  'contentCondition': (
                      'all_content.matches_word_list(\'<detector_response.name>\')'
                  )
              },
              'action': {
                  'driveAction': {
                      'warnUser': {}
                  }
              },
              'state': 'ACTIVE'
          }
      }
  }

  call_patch_policy_endpoint(
      access_token, _POLICY_NAME.value, patch_policy_payload
  )


if __name__ == '__main__':
  app.run(main)

ポリシーを削除する

次の例は、Python を使用して組織のポリシーを削除する方法を示しています。

"""Script to interact with the Cloud Identity Policies API.

This script lets you delete a policy, build, and
send requests to the Delete Policy API.
"""


def build_delete_policy_request(
    access_token: str,
    policy_name: str
) -> urllib.request.Request:
  """Builds the request for the Delete Policy API.

  Args:
    access_token: The access token for the API.
    policy_name: The resource name of the policy to delete.

  Returns:
    The request for the Delete Policy API.
  """
  delete_url = f"{VERSIONED_BASE_URL}{policy_name}"
  request = urllib.request.Request(delete_url, method="DELETE")
  request.add_header("Authorization", "Bearer " + access_token)
  return request


def call_delete_policy_api(
    access_token: str,
    policy_name: str
) -> None:
  """Calls the Delete Policy API.

  Args:
    access_token: The access token for the API.
    policy_name: The policy name to delete.
  """
  request = build_delete_policy_request(
      access_token, policy_name
  )
  execute_request(request, 'Delete Policy operation started:')


def main(argv: Sequence[str]):
  if len(argv) > 2:
    raise app.UsageError('Too many command-line arguments.')

  access_token = get_access_token(_ADMIN_EMAIL.value)

  call_delete_policy_api(
      access_token, _POLICY_NAME.value
  )


if __name__ == '__main__':
  app.run(main)

割り当て

Cloud Identity Policy API は、 Google Cloud プロジェクトごとに 1 秒あたりのクエリ数(QPS)を 1 件サポートしています。Cloud Identity Policy API は、お客様ごとに合計 1 つの QPS をサポートします。お客様が複数の Google Cloud プロジェクトを作成した場合でも同様です。

割り当ての増加は対象外です。