使用 Eventarc 触发 Pub/Sub 中的函数

本教程演示了如何使用 Pub/Sub 触发器编写和触发事件驱动型 Cloud Run functions 函数。

通过为 Eventarc 触发器指定过滤条件,您可以配置事件的路由,包括事件来源和事件目标。对于本教程中的示例,向 Pub/Sub 主题发布消息会触发事件,然后系统会以 HTTP 请求的形式向您的函数发送请求。

如果您刚接触 Pub/Sub,并且希望了解详情,请参阅 Pub/Sub 文档,以查看快速入门和重要参考信息。

所需的角色

您或您的管理员必须为部署者账号、触发器身份以及可选的 Pub/Sub 服务代理和 Cloud Storage 服务代理授予以下 IAM 角色。

部署者账号所需的角色

  1. 如果您是项目创建者,则会被授予基本 Owner 角色 (roles/owner)。默认情况下,此 Identity and Access Management (IAM) 角色可提供完全访问大多数 Google Cloud资源所需的权限,您可以跳过此步骤。

    如果您不是项目创建者,则必须向主账号授予项目的必需权限。例如,主账号可以是 Google 账号(针对最终用户)或服务账号(针对应用和计算工作负载)。如需了解详情,请参阅事件目标位置的角色和权限页面。

    如需获得完成本教程所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:

    如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

    您也可以通过自定义角色或其他预定义角色来获取所需的权限。

    请注意,默认情况下,Cloud Build 权限包含上传和下载 Artifact Registry 制品的权限

触发器身份所需的角色

  1. 记下 Compute Engine 默认服务账号,因为您将把它附加到 Eventarc 触发器以代表触发器的身份信息进行测试。启用或使用包含 Compute Engine 的 Google Cloud 服务后,系统会自动创建此服务账号,其电子邮件地址格式如下:

    PROJECT_NUMBER-compute@developer.gserviceaccount.com

    PROJECT_NUMBER 替换为您的 Google Cloud项目编号。您可以在 Google Cloud 控制台的欢迎页面上,或通过运行以下命令来查找项目编号:

    gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    对于生产环境,我们强烈建议创建新的服务账号,并为其授予一个或多个 IAM 角色,这些角色包含所需的最小权限并遵循最小权限原则。

  2. 默认情况下,只有 Project Owner、Project Editor 以及 Cloud Run Admin 和 Invoker 才能调用 Cloud Run 服务。您可以按服务控制访问权限;但是,出于测试目的,请向 Compute Engine 服务账号授予 Google Cloud 项目的 Cloud Run Invoker 角色 (run.invoker)。此操作会授予项目中所有 Cloud Run 服务和作业的角色。
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=roles/run.invoker

    请注意,如果您在未授予 Cloud Run Invoker 角色的情况下为经过身份验证的 Cloud Run 服务创建触发器,则触发器会成功创建且处于活动状态。但是,触发器将无法按预期运行,并且日志中会显示类似于以下内容的消息:

    The request was not authenticated. Either allow unauthenticated invocations or set the proper Authorization header.
  3. 将项目的 Eventarc Event Receiver 角色 (roles/eventarc.eventReceiver) 授予 Compute Engine 默认服务账号,以便 Eventarc 触发器可以接收来自事件提供程序的事件。
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=roles/eventarc.eventReceiver

Cloud Storage 服务代理的可选角色

  • 在为来自 Cloud Storage 的直接事件创建触发器之前,请向 Cloud Storage 服务代理授予 Pub/Sub Publisher 角色 (roles/pubsub.publisher):

    SERVICE_ACCOUNT="$(gcloud storage service-agent --project=PROJECT_ID)"
    
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:${SERVICE_ACCOUNT}" \
        --role='roles/pubsub.publisher'

Pub/Sub 服务代理的可选角色

  • 如果您在 2021 年 4 月 8 日或之前启用了 Cloud Pub/Sub 服务代理,以支持经过身份验证的 Pub/Sub 推送请求,请向该服务代理授予 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。否则,系统会默认授予此角色:
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
        --role=roles/iam.serviceAccountTokenCreator

创建 Pub/Sub 主题

在 Cloud Run 中,部署函数时系统不会自动创建 Pub/Sub 主题。在部署函数之前,请向此 Pub/Sub 主题发布消息以触发函数:

gcloud pubsub topics create YOUR_TOPIC_NAME

准备应用

  1. 将示例应用代码库克隆到本地机器:

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
    

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    

    Go

    git clone https://github.com/GoogleCloudPlatform/golang-samples.git
    

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    

    .NET

    git clone https://github.com/GoogleCloudPlatform/dotnet-docs-samples.git
    

    Ruby

    git clone https://github.com/GoogleCloudPlatform/ruby-docs-samples.git
    

    PHP

    git clone https://github.com/GoogleCloudPlatform/php-docs-samples.git
    
  2. 切换到包含访问 Pub/Sub 示例代码的目录:

    Node.js

    cd nodejs-docs-samples/functions/v2/helloPubSub/
    

    Python

    cd python-docs-samples/functions/v2/pubsub/
    

    Go

    cd golang-samples/functions/functionsv2/hellopubsub/
    

    Java

    cd java-docs-samples/functions/v2/pubsub/
    

    .NET

    cd dotnet-docs-samples/functions/helloworld/HelloPubSub/
    

    Ruby

    cd ruby-docs-samples/functions/helloworld/pubsub/
    

    PHP

    cd php-docs-samples/functions/helloworld_pubsub/
    
  3. 查看示例代码:

    Node.js

    const functions = require('@google-cloud/functions-framework');
    
    // Register a CloudEvent callback with the Functions Framework that will
    // be executed when the Pub/Sub trigger topic receives a message.
    functions.cloudEvent('helloPubSub', cloudEvent => {
      // The Pub/Sub message is passed as the CloudEvent's data payload.
      const base64name = cloudEvent.data.message.data;
    
      const name = base64name
        ? Buffer.from(base64name, 'base64').toString()
        : 'World';
    
      console.log(`Hello, ${name}!`);
    });

    Python

    import base64
    
    from cloudevents.http import CloudEvent
    import functions_framework
    
    
    # Triggered from a message on a Cloud Pub/Sub topic.
    @functions_framework.cloud_event
    def subscribe(cloud_event: CloudEvent) -> None:
        # Print out the data from Pub/Sub, to prove that it worked
        print(
            "Hello, " + base64.b64decode(cloud_event.data["message"]["data"]).decode() + "!"
        )
    
    

    Go

    
    // Package helloworld provides a set of Cloud Functions samples.
    package helloworld
    
    import (
    	"context"
    	"fmt"
    	"log"
    
    	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
    	"github.com/cloudevents/sdk-go/v2/event"
    )
    
    func init() {
    	functions.CloudEvent("HelloPubSub", helloPubSub)
    }
    
    // MessagePublishedData contains the full Pub/Sub message
    // See the documentation for more details:
    // https://cloud.google.com/eventarc/docs/cloudevents#pubsub
    type MessagePublishedData struct {
    	Message PubSubMessage
    }
    
    // PubSubMessage is the payload of a Pub/Sub event.
    // See the documentation for more details:
    // https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
    type PubSubMessage struct {
    	Data []byte `json:"data"`
    }
    
    // helloPubSub consumes a CloudEvent message and extracts the Pub/Sub message.
    func helloPubSub(ctx context.Context, e event.Event) error {
    	var msg MessagePublishedData
    	if err := e.DataAs(&msg); err != nil {
    		return fmt.Errorf("event.DataAs: %w", err)
    	}
    
    	name := string(msg.Message.Data) // Automatically decoded from base64.
    	if name == "" {
    		name = "World"
    	}
    	log.Printf("Hello, %s!", name)
    	return nil
    }
    

    Java

    import com.google.cloud.functions.CloudEventsFunction;
    import com.google.gson.Gson;
    import functions.eventpojos.PubSubBody;
    import io.cloudevents.CloudEvent;
    import java.nio.charset.StandardCharsets;
    import java.util.Base64;
    import java.util.logging.Logger;
    
    public class SubscribeToTopic implements CloudEventsFunction {
      private static final Logger logger = Logger.getLogger(SubscribeToTopic.class.getName());
    
      @Override
      public void accept(CloudEvent event) {
        // The Pub/Sub message is passed as the CloudEvent's data payload.
        if (event.getData() != null) {
          // Extract Cloud Event data and convert to PubSubBody
          String cloudEventData = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
          Gson gson = new Gson();
          PubSubBody body = gson.fromJson(cloudEventData, PubSubBody.class);
          // Retrieve and decode PubSub message data
          String encodedData = body.getMessage().getData();
          String decodedData =
              new String(Base64.getDecoder().decode(encodedData), StandardCharsets.UTF_8);
          logger.info("Hello, " + decodedData + "!");
        }
      }
    }

    .NET

    using CloudNative.CloudEvents;
    using Google.Cloud.Functions.Framework;
    using Google.Events.Protobuf.Cloud.PubSub.V1;
    using Microsoft.Extensions.Logging;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace HelloPubSub;
    
    public class Function : ICloudEventFunction<MessagePublishedData>
    {
        private readonly ILogger _logger;
    
        public Function(ILogger<Function> logger) =>
            _logger = logger;
    
        public Task HandleAsync(CloudEvent cloudEvent, MessagePublishedData data, CancellationToken cancellationToken)
        {
            string nameFromMessage = data.Message?.TextData;
            string name = string.IsNullOrEmpty(nameFromMessage) ? "world" : nameFromMessage;
            _logger.LogInformation("Hello {name}", name);
            return Task.CompletedTask;
        }
    }

    Ruby

    require "functions_framework"
    require "base64"
    
    FunctionsFramework.cloud_event "hello_pubsub" do |event|
      # The event parameter is a CloudEvents::Event::V1 object.
      # See https://cloudevents.github.io/sdk-ruby/latest/CloudEvents/Event/V1.html
      name = Base64.decode64 event.data["message"]["data"] rescue "World"
    
      # A cloud_event function does not return a response, but you can log messages
      # or cause side effects such as sending additional events.
      logger.info "Hello, #{name}!"
    end

    PHP

    
    use CloudEvents\V1\CloudEventInterface;
    use Google\CloudFunctions\FunctionsFramework;
    
    // Register the function with Functions Framework.
    // This enables omitting the `FUNCTIONS_SIGNATURE_TYPE=cloudevent` environment
    // variable when deploying. The `FUNCTION_TARGET` environment variable should
    // match the first parameter.
    FunctionsFramework::cloudEvent('helloworldPubsub', 'helloworldPubsub');
    
    function helloworldPubsub(CloudEventInterface $event): void
    {
        $log = fopen(getenv('LOGGER_OUTPUT') ?: 'php://stderr', 'wb');
    
        $cloudEventData = $event->getData();
        $pubSubData = base64_decode($cloudEventData['message']['data']);
    
        $name = $pubSubData ? htmlspecialchars($pubSubData) : 'World';
        fwrite($log, "Hello, $name!" . PHP_EOL);
    }

部署事件驱动的函数

如需部署该函数,请在包含示例代码的目录中运行以下命令:

Node.js

  gcloud run deploy FUNCTION \
        --source . \
        --function helloPubSub \
        --base-image BASE_IMAGE \

您需要进行如下替换:

  • FUNCTION 替换为您要部署的函数的名称。如果您省略此参数,系统将在您运行命令时提示您输入名称。
  • BASE_IMAGE 替换为函数所用的基础映像环境,例如 nodejs22。如需详细了解基础映像以及每个映像中包含的软件包,请参阅支持的语言运行时和基础映像

Python

  gcloud run deploy FUNCTION \
        --source . \
        --function subscribe \
        --base-image BASE_IMAGE \

您需要进行如下替换:

  • FUNCTION 替换为您要部署的函数的名称。如果您省略此参数,系统将在您运行命令时提示您输入名称。
  • BASE_IMAGE 替换为函数所用的基础映像环境,例如 python313。如需详细了解基础映像以及每个映像中包含的软件包,请参阅支持的语言运行时和基础映像

Go

  gcloud run deploy FUNCTION \
        --source . \
        --function HelloPubSub \
        --base-image BASE_IMAGE \

您需要进行如下替换:

  • FUNCTION 替换为您要部署的函数的名称。如果您省略此参数,系统将在您运行命令时提示您输入名称。
  • BASE_IMAGE 替换为函数所用的基础映像环境,例如 go125。如需详细了解基础映像以及每个映像中包含的软件包,请参阅支持的语言运行时和基础映像

Java

  gcloud run deploy FUNCTION \
        --source . \
        --function functions.SubscribeToTopic \
        --base-image BASE_IMAGE \

您需要进行如下替换:

  • FUNCTION 替换为您要部署的函数的名称。如果您省略此参数,系统将在您运行命令时提示您输入名称。
  • BASE_IMAGE 替换为函数所用的基础映像环境,例如 java21。如需详细了解基础映像以及每个映像中包含的软件包,请参阅支持的语言运行时和基础映像

.NET

  gcloud run deploy FUNCTION \
        --source . \
        --function HelloPubSub.Function \
        --base-image BASE_IMAGE \

您需要进行如下替换:

  • FUNCTION 替换为您要部署的函数的名称。如果您省略此参数,系统将在您运行命令时提示您输入名称。
  • BASE_IMAGE 替换为函数所用的基础映像环境,例如 dotnet8。如需详细了解基础映像以及每个映像中包含的软件包,请参阅支持的语言运行时和基础映像

Ruby

  gcloud run deploy FUNCTION \
        --source . \
        --function hello_pubsub \
        --base-image BASE_IMAGE \

您需要进行如下替换:

  • FUNCTION 替换为您要部署的函数的名称。如果您省略此参数,系统将在您运行命令时提示您输入名称。
  • BASE_IMAGE 替换为函数所用的基础映像环境,例如 ruby34。如需详细了解基础映像以及每个映像中包含的软件包,请参阅支持的语言运行时和基础映像

PHP

  gcloud run deploy FUNCTION \
        --source . \
        --function helloworldPubsub \
        --base-image BASE_IMAGE \

您需要进行如下替换:

  • FUNCTION 替换为您要部署的函数的名称。如果您省略此参数,系统将在您运行命令时提示您输入名称。
  • BASE_IMAGE 替换为函数所用的基础映像环境,例如 php84。如需详细了解基础映像以及每个映像中包含的软件包,请参阅支持的语言运行时和基础映像

如果系统提示您在指定区域中创建仓库,请按 y 进行响应。部署完成后,Google Cloud CLI 会显示运行服务的网址。

创建 Eventarc 触发器

如需使用 Pub/Sub 触发器部署函数,请在包含示例代码的目录中运行以下命令:

  1. 创建 Eventarc Pub/Sub 触发器:

    gcloud eventarc triggers create TRIGGER_NAME  \
        --location=${REGION} \
        --destination-run-service=FUNCTION \
        --destination-run-region=${REGION} \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

    您需要进行如下替换:

    • TRIGGER_NAME 替换为触发器的名称。
    • FUNCTION 替换为函数的名称。
    • PROJECT_NUMBER 替换为您的 Google Cloud 项目编号。

    请注意,首次在 Google Cloud 项目中创建 Eventarc 触发器时,预配 Eventarc 服务代理可能会有延迟。通常,您可以尝试再次创建触发器,以解决此问题。如需了解详情,请参阅权限遭拒错误

  2. 确认触发器已成功创建。请注意,尽管触发器会立即创建,但它最长可能需要两分钟才能完全正常运行。

    gcloud eventarc triggers list --location=${REGION}

    输出应类似如下所示:

    NAME: helloworld-events
    TYPE: google.cloud.pubsub.topic.v1.messagePublished
    DESTINATION: Cloud Run service: helloworld-events
    ACTIVE: Yes
    LOCATION: us-central1
    

触发函数

如需测试 Pub/Sub 函数,请执行以下操作:

  1. 将主题分配给变量:

    TOPIC_ID=$(gcloud eventarc triggers describe TRIGGER_NAME --location $REGION --format='value(transport.pubsub.topic)')
    
  2. 向主题发布消息:

    gcloud pubsub topics publish $TOPIC_ID --message="Hello World"
    

Cloud Run 服务会记录传入消息的正文。您可以在 Cloud Run 实例的“日志”部分中查看此信息:

  1. 前往Google Cloud 控制台
  2. 点击相应函数。
  3. 选择日志标签页。

    您可能需要等待一些时间才能看到日志。如果您没有立即看到日志,请稍等片刻再检查一次。

  4. 查找“Hello World!”消息。