乐观并发控制 (OCC) 是一种用于管理共享资源并防止多个用户或进程尝试同时修改同一资源时出现“更新丢失”或争用情况的策略。
举例来说,在 Google Cloud IAM 等系统中,共享资源是应用于资源(例如项目、存储分区或服务)的 IAM 政策。为了实现 OCC,系统通常会在资源对象上使用版本号或 etag(实体标记)字段。
OCC 简介
假设有两个进程 A 和 B 尝试同时更新共享资源:
进程 A 读取资源的当前状态。
进程 B 读取相同的当前状态。
进程 A 修改其副本并将其写回服务器。
进程 B 修改其副本并将其写回服务器。
由于进程 B 在不知晓进程 A 已更改资源的情况下覆盖了资源,因此进程 A 的更新会丢失。
OCC 通过引入每次实体修改时都会发生变化的唯一指纹来解决此问题。在许多系统(例如 IAM)中,此操作通过 etag 完成。服务器会在每次写入时检查此标记:
读取资源时,服务器会返回
etag(一个唯一的指纹)。当您将修改后的资源发送回去时,必须包含原始
etag。如果服务器发现存储的
etag与您发送的etag不匹配(这意味着自您读取资源以来,其他人修改了该资源),写入操作会失败,并显示ABORTED或FAILED_PRECONDITION错误。
此失败会强制客户端重试整个流程 - 重新读取新状态、重新应用更改,并使用新的 etag 再次尝试写入。
实现 OCC 循环
OCC 实现的核心是一个 while 循环,用于处理重试逻辑。设置合理的重试次数上限,以防止在高竞争情况下出现无限循环。
循环的步骤
| Step | 操作 | 实现示例 |
|---|---|---|
| Read(读取) | 提取当前资源状态,包括 etag。 |
Policy policy = client.getIamPolicy(resourceName); |
| 修改 | 将更改应用于本地对象。 | policy = policy.toBuilder().addBinding(newBinding).build(); |
| 写入/检查 | 尝试使用旧的 etag 保存修改后的资源。此操作必须位于 try 代码块内。 |
try { client.setIamPolicy(resourceName, policy); return policy; } catch (AbortedException e) { // retry loop } |
| 成功/重试 | 如果写入成功,则退出循环。如果因并发错误而失败,请递增重试计数器并继续循环(返回到“读取”步骤)。 |
以下文件提供了一个可运行的示例,演示了如何使用项目资源上的 IAM 政策作为目标来实现 OCC 循环。
安装
如需使用此示例,请将以下依赖项添加到 pom.xml:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-resourcemanager</artifactId>
<version>1.45.0</version>
</dependency>
示例
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.FailedPreconditionException;
import com.google.cloud.resourcemanager.v3.ProjectName;
import com.google.cloud.resourcemanager.v3.ProjectsClient;
import com.google.iam.v1.Binding;
import com.google.iam.v1.GetIamPolicyRequest;
import com.google.iam.v1.Policy;
import com.google.iam.v1.SetIamPolicyRequest;
import java.util.ArrayList;
import java.util.List;
public class IamOccExample {
/**
* Executes an Optimistic Concurrency Control (OCC) loop to safely update a resource.
*
* This method demonstrates the core Read-Modify-Write-Retry pattern.
*
* @param projectId The Google Cloud Project ID (e.g., "my-project-123").
* @param role The IAM role to grant (e.g., "roles/storage.objectAdmin").
* @param member The member to add (e.g., "user:user@example.com").
* @param maxRetries The maximum number of times to retry the update.
* @return The successfully updated IAM policy (or null on failure).
*/
public static Policy updateIamPolicyWithOcc(
String projectId,
String role,
String member,
int maxRetries
) throws Exception {
// Setup Client
try (ProjectsClient projectsClient = ProjectsClient.create()) {
String projectName = ProjectName.of(projectId).toString();
int retries = 0;
// START OCC LOOP (Read-Modify-Write-Retry)
while (retries < maxRetries) {
try {
// READ: Get the current policy. This includes the current etag.
System.out.printf("Attempt %d: Reading current IAM policy for %s...%n", retries, projectName);
GetIamPolicyRequest getIamPolicyRequest = GetIamPolicyRequest.newBuilder()
.setResource(projectName)
.build();
Policy policy = projectsClient.getIamPolicy(getIamPolicyRequest);
// MODIFY: Apply the desired changes to the local Policy object.
List<Binding> bindings = new ArrayList<>(policy.getBindingsList());
Binding targetBinding = null;
int bindingIndex = -1;
for (int i = 0; i < bindings.size(); i++) {
if (bindings.get(i).getRole().equals(role)) {
targetBinding = bindings.get(i);
bindingIndex = i;
break;
}
}
if (targetBinding != null) {
if (targetBinding.getMembersList().contains(member)) {
System.out.printf("Policy for role %s and member %s exists already!%n", role, member);
return policy;
}
// Create a new binding based on existing one to add the member
Binding updatedBinding = targetBinding.toBuilder()
.addMembers(member)
.build();
bindings.set(bindingIndex, updatedBinding);
} else {
// Role not found, create a new binding
Binding newBinding = Binding.newBuilder()
.setRole(role)
.addMembers(member)
.build();
bindings.add(newBinding);
}
// The policy builder now contains the modified bindings AND the original etag.
Policy updatedPolicy = policy.toBuilder()
.clearBindings()
.addAllBindings(bindings)
.build();
// WRITE/CHECK: Attempt to write the modified policy.
System.out.printf("Attempt %d: Setting modified IAM policy...%n", retries);
SetIamPolicyRequest setIamPolicyRequest = SetIamPolicyRequest.newBuilder()
.setResource(projectName)
.setPolicy(updatedPolicy)
.build();
Policy resultPolicy = projectsClient.setIamPolicy(setIamPolicyRequest);
// SUCCESS: If the call succeeds, return the new policy and exit the loop.
System.out.printf("Successfully updated IAM policy in attempt %d.%n", retries);
return resultPolicy;
} catch (AbortedException | FailedPreconditionException e) {
// If the etag is stale (concurrency conflict), this will throw a retryable exception.
retries++;
System.out.printf("Concurrency conflict detected (etag mismatch). Retrying... (%d/%d)%n",
retries, maxRetries);
// Exponential backoff (100ms * retry count)
Thread.sleep(100L * retries);
}
}
// END OCC LOOP
}
System.out.printf("Failed to update IAM policy after %d attempts due to persistent concurrency conflicts.%n", maxRetries);
return null;
}
}