使用 Google Cloud 身份验证库对工作负载进行身份验证

本文档介绍了如何将工作负载身份联合与 Google Cloud 身份验证库(简称“身份验证库”)搭配使用,以对来自第三方身份提供商(如 AWS、Microsoft Azure 以及支持 OpenID Connect (OIDC) 或 SAML 2.0 的提供商)的工作负载进行身份验证。

借助工作负载身份联合,在 Google Cloud外部运行的应用无需使用服务账号密钥即可访问 Google Cloud 资源。Google Auth 库通过将外部凭据交换为短期 Google Cloud 访问令牌来实现此目的。

对于身份验证,您可以使用以下方法获取外部凭据:

准备工作

  1. Enable the required APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  2. 配置使用身份提供方的工作负载身份联合。

使用标准凭据机制进行身份验证

对于常用的受支持的第三方身份提供方,您可以使用Google Cloud auth 库的内置功能,通过生成凭据配置文件来验证工作负载的身份。此文件为身份验证库提供了从外部提供方联合身份所需的必要信息。

凭据配置文件(通常使用 GOOGLE_APPLICATION_CREDENTIALS 环境变量加载)可以指示身份验证库使用以下方法之一获取第三方正文令牌:

  • 文件来源:库从本地文件读取主题令牌。 一个单独的进程必须确保此文件包含有效且未过期的令牌。
  • 网址来源:库通过向指定的本地网址端点发出请求来获取主题令牌。
  • 可执行文件溯源:库运行配置的可执行命令。可执行文件的标准输出应包含正文令牌。
  1. 为您的特定提供商生成凭据配置文件:

  2. 使用凭据配置文件进行身份验证。

    如需让 Google Cloud 客户端库自动查找并使用您的凭据配置文件,请将 GOOGLE_APPLICATION_CREDENTIALS 环境变量设置为生成的 JSON 文件的路径。

    在 shell 中导出环境变量: bash export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/config.json

    设置环境变量后,客户端库会处理身份验证流程。

以下代码示例展示了如何对 Google Cloud API 进行经过身份验证的调用:

Node.js

如需了解如何安装和使用 IAM 客户端库,请参阅 IAM 客户端库。 如需了解详情,请参阅 IAM Node.js API 参考文档

如需向 IAM 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

// Imports the Google Cloud client library.
const {Storage} = require('@google-cloud/storage');

// Instantiates a client. If you don't specify credentials when constructing
// the client, the client library will look for credentials in the
// environment.
const storage = new Storage();
// Makes an authenticated API request.
async function listBuckets() {
  try {
    const results = await storage.getBuckets();

    const [buckets] = results;

    console.log('Buckets:');
    buckets.forEach(bucket => {
      console.log(bucket.name);
    });
  } catch (err) {
    console.error('ERROR:', err);
  }
}
listBuckets();

Python

如需了解如何安装和使用 IAM 客户端库,请参阅 IAM 客户端库。 如需了解详情,请参阅 IAM Python API 参考文档

如需向 IAM 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

def implicit():
    from google.cloud import storage

    # If you don't specify credentials when constructing the client, the
    # client library will look for credentials in the environment.
    storage_client = storage.Client()

    # Make an authenticated API request
    buckets = list(storage_client.list_buckets())
    print(buckets)

Java

如需了解如何安装和使用 IAM 客户端库,请参阅 IAM 客户端库。 如需了解详情,请参阅 IAM Java API 参考文档

如需向 IAM 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

static void authImplicit() {
  // If you don't specify credentials when constructing the client, the client library will
  // look for credentials via the environment variable GOOGLE_APPLICATION_CREDENTIALS.
  Storage storage = StorageOptions.getDefaultInstance().getService();

  System.out.println("Buckets:");
  Page<Bucket> buckets = storage.list();
  for (Bucket bucket : buckets.iterateAll()) {
    System.out.println(bucket.toString());
  }
}

Go

如需了解如何安装和使用 IAM 客户端库,请参阅 IAM 客户端库。 如需了解详情,请参阅 IAM Go API 参考文档

如需向 IAM 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/storage"
	"google.golang.org/api/iterator"
)

// authenticateImplicitWithAdc uses Application Default Credentials
// to automatically find credentials and authenticate.
func authenticateImplicitWithAdc(w io.Writer, projectId string) error {
	// projectId := "your_project_id"

	ctx := context.Background()

	// NOTE: Replace the client created below with the client required for your application.
	// Note that the credentials are not specified when constructing the client.
	// The client library finds your credentials using ADC.
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("NewClient: %w", err)
	}
	defer client.Close()

	it := client.Buckets(ctx, projectId)
	for {
		bucketAttrs, err := it.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return err
		}
		fmt.Fprintf(w, "Bucket: %v\n", bucketAttrs.Name)
	}

	fmt.Fprintf(w, "Listed all storage buckets.\n")

	return nil
}

使用自定义凭据提供程序进行身份验证

如果您的环境不支持 Google 身份验证库的内置功能,或者您想实现自定义逻辑来向 Google 身份验证库提供凭据,请使用自定义凭据提供程序来对工作负载进行身份验证。

从 AWS 访问资源

初始化身份验证客户端时,请提供凭据提供方的自定义实现。客户端实例会延迟到供应商检索 AWS 安全凭据,以换取 Google Cloud 访问令牌。当客户端调用供应商时,供应商必须返回有效且未过期的凭据。

身份验证客户端不会缓存返回的 AWS 安全凭据或区域,因此请在提供程序中实现缓存,以防止针对同一资源发出冗余请求。

以下代码示例展示了如何使用自定义凭据提供程序设置从 AWS 访问 Google Cloud资源的权限。

Node.js

如需了解如何安装和使用 IAM 客户端库,请参阅 IAM 客户端库。 如需了解详情,请参阅 IAM Node.js API 参考文档

如需向 IAM 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

const {AwsClient} = require('google-auth-library');
const {fromNodeProviderChain} = require('@aws-sdk/credential-providers');
const fs = require('fs');
const path = require('path');
const {STSClient} = require('@aws-sdk/client-sts');
const {Storage} = require('@google-cloud/storage');

/**
 * Custom AWS Security Credentials Supplier.
 *
 * This implementation resolves AWS credentials using the default Node provider
 * chain from the AWS SDK. This allows fetching credentials from environment
 * variables, shared credential files (~/.aws/credentials), or IAM roles
 * for service accounts (IRSA) in EKS, etc.
 */
class CustomAwsSupplier {
  constructor() {
    this.region = null;

    this.awsCredentialsProvider = fromNodeProviderChain();
  }

  /**
   * Returns the AWS region. This is required for signing the AWS request.
   * It resolves the region automatically by using the default AWS region
   * provider chain, which searches for the region in the standard locations
   * (environment variables, AWS config file, etc.).
   */
  async getAwsRegion(_context) {
    if (this.region) {
      return this.region;
    }

    const client = new STSClient({});
    this.region = await client.config.region();

    if (!this.region) {
      throw new Error(
        'CustomAwsSupplier: Unable to resolve AWS region. Please set the AWS_REGION environment variable or configure it in your ~/.aws/config file.'
      );
    }

    return this.region;
  }

  /**
   * Retrieves AWS security credentials using the AWS SDK's default provider chain.
   */
  async getAwsSecurityCredentials(_context) {
    const awsCredentials = await this.awsCredentialsProvider();

    if (!awsCredentials.accessKeyId || !awsCredentials.secretAccessKey) {
      throw new Error(
        'Unable to resolve AWS credentials from the node provider chain. ' +
          'Ensure your AWS CLI is configured, or AWS environment variables (like AWS_ACCESS_KEY_ID) are set.'
      );
    }

    return {
      accessKeyId: awsCredentials.accessKeyId,
      secretAccessKey: awsCredentials.secretAccessKey,
      token: awsCredentials.sessionToken,
    };
  }
}

/**
 * Authenticates with Google Cloud using AWS credentials and retrieves bucket metadata.
 *
 * @param {string} bucketName The name of the bucket to retrieve.
 * @param {string} audience The Workload Identity Pool audience.
 * @param {string} [impersonationUrl] Optional Service Account impersonation URL.
 */
async function authenticateWithAwsCredentials(
  bucketName,
  audience,
  impersonationUrl
) {
  const customSupplier = new CustomAwsSupplier();

  const clientOptions = {
    audience: audience,
    subject_token_type: 'urn:ietf:params:aws:token-type:aws4_request',
    service_account_impersonation_url: impersonationUrl,
    aws_security_credentials_supplier: customSupplier,
  };

  const authClient = new AwsClient(clientOptions);

  const storage = new Storage({
    authClient: authClient,
  });

  const [metadata] = await storage.bucket(bucketName).getMetadata();
  return metadata;
}

Python

如需了解如何安装和使用 IAM 客户端库,请参阅 IAM 客户端库。 如需了解详情,请参阅 IAM Python API 参考文档

如需向 IAM 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

import json
import os
import sys

import boto3
from google.auth import aws
from google.auth import exceptions
from google.cloud import storage


class CustomAwsSupplier(aws.AwsSecurityCredentialsSupplier):
    """Custom AWS Security Credentials Supplier using Boto3."""

    def __init__(self):
        """Initializes the Boto3 session, prioritizing environment variables for region."""
        # Explicitly read the region from the environment first.
        region = os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION")

        # If region is None, Boto3's discovery chain will be used when needed.
        self.session = boto3.Session(region_name=region)
        self._cached_region = None

    def get_aws_region(self, context, request) -> str:
        """Returns the AWS region using Boto3's default provider chain."""
        if self._cached_region:
            return self._cached_region

        self._cached_region = self.session.region_name

        if not self._cached_region:
            raise exceptions.GoogleAuthError(
                "Boto3 was unable to resolve an AWS region."
            )

        return self._cached_region

    def get_aws_security_credentials(
        self, context, request=None
    ) -> aws.AwsSecurityCredentials:
        """Retrieves AWS security credentials using Boto3's default provider chain."""
        creds = self.session.get_credentials()
        if not creds:
            raise exceptions.GoogleAuthError(
                "Unable to resolve AWS credentials from Boto3."
            )

        return aws.AwsSecurityCredentials(
            access_key_id=creds.access_key,
            secret_access_key=creds.secret_key,
            session_token=creds.token,
        )


def authenticate_with_aws_credentials(bucket_name, audience, impersonation_url=None):
    """Authenticates using the custom AWS supplier and gets bucket metadata.

    Returns:
        dict: The bucket metadata response from the Google Cloud Storage API.
    """

    custom_supplier = CustomAwsSupplier()

    credentials = aws.Credentials(
        audience=audience,
        subject_token_type="urn:ietf:params:aws:token-type:aws4_request",
        service_account_impersonation_url=impersonation_url,
        aws_security_credentials_supplier=custom_supplier,
        scopes=["https://www.googleapis.com/auth/devstorage.read_only"],
    )

    storage_client = storage.Client(credentials=credentials)

    bucket = storage_client.get_bucket(bucket_name)

    return bucket._properties

Java

如需了解如何安装和使用 IAM 客户端库,请参阅 IAM 客户端库。 如需了解详情,请参阅 IAM Java API 参考文档

如需向 IAM 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.auth.oauth2.AwsCredentials;
import com.google.auth.oauth2.AwsSecurityCredentials;
import com.google.auth.oauth2.AwsSecurityCredentialsSupplier;
import com.google.auth.oauth2.ExternalAccountSupplierContext;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.io.Reader;
import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;

  public static Bucket authenticateWithAwsCredentials(
      String gcpWorkloadAudience, String saImpersonationUrl, String gcsBucketName)
      throws IOException {

    CustomAwsSupplier customSupplier = new CustomAwsSupplier();

    AwsCredentials.Builder credentialsBuilder =
        AwsCredentials.newBuilder()
            .setAudience(gcpWorkloadAudience)
            // This token type indicates that the subject token is an AWS Signature Version 4 signed
            // request. This is required for AWS Workload Identity Federation.
            .setSubjectTokenType("urn:ietf:params:aws:token-type:aws4_request")
            .setAwsSecurityCredentialsSupplier(customSupplier);

    if (saImpersonationUrl != null) {
      credentialsBuilder.setServiceAccountImpersonationUrl(saImpersonationUrl);
    }

    GoogleCredentials credentials = credentialsBuilder.build();

    Storage storage = StorageOptions.newBuilder().setCredentials(credentials).build().getService();

    return storage.get(gcsBucketName);
  }

  /**
   * Custom AWS Security Credentials Supplier.
   *
   * <p>This implementation resolves AWS credentials and regions using the default provider chains
   * from the AWS SDK (v2). This supports environment variables, ~/.aws/credentials, and EC2/EKS
   * metadata.
   */
  private static class CustomAwsSupplier implements AwsSecurityCredentialsSupplier {
    private final AwsCredentialsProvider awsCredentialsProvider;
    private String region;

    public CustomAwsSupplier() {
      // The AWS SDK handles caching internally.
      this.awsCredentialsProvider = DefaultCredentialsProvider.create();
    }

    @Override
    public String getRegion(ExternalAccountSupplierContext context) {
      if (this.region == null) {
        Region awsRegion = new DefaultAwsRegionProviderChain().getRegion();
        if (awsRegion == null) {
          throw new IllegalStateException(
              "Unable to resolve AWS region. Ensure AWS_REGION is set or configured.");
        }
        this.region = awsRegion.id();
      }
      return this.region;
    }

    @Override
    public AwsSecurityCredentials getCredentials(ExternalAccountSupplierContext context) {
      software.amazon.awssdk.auth.credentials.AwsCredentials credentials =
          this.awsCredentialsProvider.resolveCredentials();

      if (credentials == null) {
        throw new IllegalStateException("Unable to resolve AWS credentials.");
      }

      String sessionToken = null;
      if (credentials instanceof AwsSessionCredentials) {
        sessionToken = ((AwsSessionCredentials) credentials).sessionToken();
      }

      return new AwsSecurityCredentials(
          credentials.accessKeyId(), credentials.secretAccessKey(), sessionToken);
    }
  }

通过 OIDC 和 SAML 访问资源

初始化身份验证客户端时,请提供自定义令牌提供程序,以提供用于交换 Google Cloud 访问令牌的主题令牌。当客户端调用供应商时,供应商必须返回有效且未过期的正文令牌。

身份验证客户端不会缓存返回的令牌,因此请在提供程序中实现缓存,以防止针对同一主题令牌发出冗余请求。

以下代码示例展示了如何通过自定义凭据提供方设置对支持 OpenID Connect (OIDC) 或 SAML 2.0 的提供方提供的 Google Cloud资源的访问权限。

Node.js

如需了解如何安装和使用 IAM 客户端库,请参阅 IAM 客户端库。 如需了解详情,请参阅 IAM Node.js API 参考文档

如需向 IAM 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

const {IdentityPoolClient} = require('google-auth-library');
const {Storage} = require('@google-cloud/storage');
const {Gaxios} = require('gaxios');
const fs = require('fs');
const path = require('path');

/**
 * A custom SubjectTokenSupplier that authenticates with Okta using the
 * Client Credentials grant flow.
 */
class OktaClientCredentialsSupplier {
  constructor(domain, clientId, clientSecret) {
    const cleanDomain = domain.endsWith('/') ? domain.slice(0, -1) : domain;
    this.oktaTokenUrl = `${cleanDomain}/oauth2/default/v1/token`;

    this.clientId = clientId;
    this.clientSecret = clientSecret;
    this.accessToken = null;
    this.expiryTime = 0;
    this.gaxios = new Gaxios();
  }

  /**
   * Main method called by the auth library. It will fetch a new token if one
   * is not already cached.
   * @returns {Promise<string>} A promise that resolves with the Okta Access token.
   */
  async getSubjectToken() {
    const isTokenValid =
      this.accessToken && Date.now() < this.expiryTime - 60 * 1000;

    if (isTokenValid) {
      return this.accessToken;
    }

    const {accessToken, expiresIn} = await this.fetchOktaAccessToken();
    this.accessToken = accessToken;
    this.expiryTime = Date.now() + expiresIn * 1000;
    return this.accessToken;
  }

  /**
   * Performs the Client Credentials grant flow with Okta.
   */
  async fetchOktaAccessToken() {
    const params = new URLSearchParams();
    params.append('grant_type', 'client_credentials');
    params.append('scope', 'gcp.test.read');

    const authHeader =
      'Basic ' +
      Buffer.from(`${this.clientId}:${this.clientSecret}`).toString('base64');

    try {
      const response = await this.gaxios.request({
        url: this.oktaTokenUrl,
        method: 'POST',
        headers: {
          Authorization: authHeader,
          'Content-Type': 'application/x-www-form-urlencoded',
        },
        data: params.toString(),
      });

      const {access_token, expires_in} = response.data;
      if (access_token && expires_in) {
        return {accessToken: access_token, expiresIn: expires_in};
      } else {
        throw new Error(
          'Access token or expires_in not found in Okta response.'
        );
      }
    } catch (error) {
      throw new Error(
        `Failed to authenticate with Okta: ${error.response?.data || error.message}`
      );
    }
  }
}

/**
 * Authenticates with Google Cloud using Okta credentials and retrieves bucket metadata.
 *
 * @param {string} bucketName The name of the bucket to retrieve.
 * @param {string} audience The Workload Identity Pool audience.
 * @param {string} domain The Okta domain.
 * @param {string} clientId The Okta client ID.
 * @param {string} clientSecret The Okta client secret.
 * @param {string} [impersonationUrl] Optional Service Account impersonation URL.
 */
async function authenticateWithOktaCredentials(
  bucketName,
  audience,
  domain,
  clientId,
  clientSecret,
  impersonationUrl
) {
  const oktaSupplier = new OktaClientCredentialsSupplier(
    domain,
    clientId,
    clientSecret
  );

  const authClient = new IdentityPoolClient({
    audience: audience,
    subject_token_type: 'urn:ietf:params:oauth:token-type:jwt',
    token_url: 'https://sts.googleapis.com/v1/token',
    subject_token_supplier: oktaSupplier,
    service_account_impersonation_url: impersonationUrl,
  });

  const storage = new Storage({
    authClient: authClient,
  });

  const [metadata] = await storage.bucket(bucketName).getMetadata();
  return metadata;
}

Python

如需了解如何安装和使用 IAM 客户端库,请参阅 IAM 客户端库。 如需了解详情,请参阅 IAM Python API 参考文档

如需向 IAM 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

import json
import time
import urllib.parse

from google.auth import identity_pool
from google.cloud import storage
import requests


class OktaClientCredentialsSupplier:
    """A custom SubjectTokenSupplier that authenticates with Okta.

    This supplier uses the Client Credentials grant flow for machine-to-machine
    (M2M) authentication with Okta.
    """

    def __init__(self, domain, client_id, client_secret):
        self.okta_token_url = f"{domain.rstrip('/')}/oauth2/default/v1/token"
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = None
        self.expiry_time = 0

    def get_subject_token(self, context, request=None) -> str:
        """Fetches a new token if the current one is expired or missing."""
        if self.access_token and time.time() < self.expiry_time - 60:
            return self.access_token
        self._fetch_okta_access_token()
        return self.access_token

    def _fetch_okta_access_token(self):
        """Performs the Client Credentials grant flow with Okta."""
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Accept": "application/json",
        }
        data = {
            "grant_type": "client_credentials",
            "scope": "gcp.test.read",  # Set scope as per Okta app config.
        }

        response = requests.post(
            self.okta_token_url,
            headers=headers,
            data=urllib.parse.urlencode(data),
            auth=(self.client_id, self.client_secret),
        )
        response.raise_for_status()

        token_data = response.json()
        self.access_token = token_data["access_token"]
        self.expiry_time = time.time() + token_data["expires_in"]


def authenticate_with_okta_credentials(
    bucket_name, audience, domain, client_id, client_secret, impersonation_url=None
):
    """Authenticates using the custom Okta supplier and gets bucket metadata.

    Returns:
        dict: The bucket metadata response from the Google Cloud Storage API.
    """

    okta_supplier = OktaClientCredentialsSupplier(domain, client_id, client_secret)

    credentials = identity_pool.Credentials(
        audience=audience,
        subject_token_type="urn:ietf:params:oauth:token-type:jwt",
        token_url="https://sts.googleapis.com/v1/token",
        subject_token_supplier=okta_supplier,
        default_scopes=["https://www.googleapis.com/auth/devstorage.read_only"],
        service_account_impersonation_url=impersonation_url,
    )

    storage_client = storage.Client(credentials=credentials)

    bucket = storage_client.get_bucket(bucket_name)

    return bucket._properties

Java

如需了解如何安装和使用 IAM 客户端库,请参阅 IAM 客户端库。 如需了解详情,请参阅 IAM Java API 参考文档

如需向 IAM 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.client.json.GenericJson;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.oauth2.ExternalAccountSupplierContext;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.IdentityPoolCredentials;
import com.google.auth.oauth2.IdentityPoolSubjectTokenSupplier;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.lang.reflect.Type;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Base64;
import java.util.Map;

  public static Bucket authenticateWithOktaCredentials(
      String gcpWorkloadAudience,
      String saImpersonationUrl,
      String gcsBucketName,
      String oktaDomain,
      String oktaClientId,
      String oktaClientSecret)
      throws IOException {

    OktaClientCredentialsSupplier oktaSupplier =
        new OktaClientCredentialsSupplier(oktaDomain, oktaClientId, oktaClientSecret);

    IdentityPoolCredentials.Builder credentialsBuilder =
        IdentityPoolCredentials.newBuilder()
            .setAudience(gcpWorkloadAudience)
            // This token type indicates that the subject token is a JSON Web Token (JWT).
            // This is required for Workload Identity Federation with an OIDC provider like Okta.
            .setSubjectTokenType("urn:ietf:params:oauth:token-type:jwt")
            .setTokenUrl("https://sts.googleapis.com/v1/token")
            .setSubjectTokenSupplier(oktaSupplier);

    if (saImpersonationUrl != null) {
      credentialsBuilder.setServiceAccountImpersonationUrl(saImpersonationUrl);
    }

    GoogleCredentials credentials = credentialsBuilder.build();

    Storage storage = StorageOptions.newBuilder().setCredentials(credentials).build().getService();

    return storage.get(gcsBucketName);
  }

  /**
   * A custom SubjectTokenSupplier that authenticates with Okta using the Client Credentials grant
   * flow.
   */
  private static class OktaClientCredentialsSupplier implements IdentityPoolSubjectTokenSupplier {

    private static final long TOKEN_REFRESH_BUFFER_SECONDS = 60;

    private final String oktaTokenUrl;
    private final String clientId;
    private final String clientSecret;
    private String accessToken;
    private Instant expiryTime;

    public OktaClientCredentialsSupplier(String domain, String clientId, String clientSecret) {
      // Ensure domain doesn't have a trailing slash for cleaner URL construction
      String cleanedDomain =
          domain.endsWith("/") ? domain.substring(0, domain.length() - 1) : domain;
      this.oktaTokenUrl = cleanedDomain + "/oauth2/default/v1/token";
      this.clientId = clientId;
      this.clientSecret = clientSecret;
    }

    /**
     * Main method called by the auth library. It will fetch a new token if one is not already
     * cached.
     */
    @Override
    public String getSubjectToken(ExternalAccountSupplierContext context) throws IOException {
      // Check if the current token is still valid (with a 60-second buffer).
      boolean isTokenValid =
          this.accessToken != null
              && this.expiryTime != null
              && Instant.now().isBefore(this.expiryTime.minusSeconds(TOKEN_REFRESH_BUFFER_SECONDS));

      if (isTokenValid) {
        return this.accessToken;
      }

      fetchOktaAccessToken();
      return this.accessToken;
    }

    /**
     * Performs the Client Credentials grant flow by making a POST request to Okta's token endpoint.
     */
    private void fetchOktaAccessToken() throws IOException {
      URL url = new URL(this.oktaTokenUrl);
      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
      conn.setRequestMethod("POST");
      conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
      conn.setRequestProperty("Accept", "application/json");

      // The client_id and client_secret are sent in a Basic Auth header.
      String auth = this.clientId + ":" + this.clientSecret;
      String encodedAuth =
          Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
      conn.setRequestProperty("Authorization", "Basic " + encodedAuth);

      conn.setDoOutput(true);
      try (java.io.OutputStream out = conn.getOutputStream()) {
        // Scopes define the permissions the access token will have.
        // Update "gcp.test.read" to match your Okta configuration.
        String params = "grant_type=client_credentials&scope=gcp.test.read";
        out.write(params.getBytes(StandardCharsets.UTF_8));
        out.flush();
      }

      int responseCode = conn.getResponseCode();
      if (responseCode == HttpURLConnection.HTTP_OK) {
        try (BufferedReader in =
            new BufferedReader(
                new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {

          GenericJson jsonObject =
              GsonFactory.getDefaultInstance().createJsonParser(in).parse(GenericJson.class);

          if (jsonObject.containsKey("access_token") && jsonObject.containsKey("expires_in")) {
            this.accessToken = (String) jsonObject.get("access_token");
            Number expiresInNumber = (Number) jsonObject.get("expires_in");
            this.expiryTime = Instant.now().plusSeconds(expiresInNumber.longValue());
          } else {
            throw new IOException("Access token or expires_in not found in Okta response.");
          }
        }
      } else {
        throw new IOException("Failed to authenticate with Okta. Response code: " + responseCode);
      }
    }
  }

后续步骤