您可以使用 Workflows 連接器支援 Pub/Sub 作業,包括將訊息發布至 Pub/Sub 主題。
Pub/Sub 主題是發布者傳送訊息的資源。訂閱項目代表主題的訊息串流,這些訊息會傳送至訂閱應用程式。進一步瞭解 Pub/Sub。
發布訊息
建立 Pub/Sub 主題和該主題的訂閱項目後,您可以建立工作流程,將訊息發布至該主題:
YAML
- init: assign: - project: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}' - topic: TOPIC_ID - subscription: SUBSCRIPTION_ID - message: hello: world - base64Msg: '${base64.encode(json.encode(message))}' - publish_message_to_topic: call: googleapis.pubsub.v1.projects.topics.publish args: topic: '${"projects/" + project + "/topics/" + topic}' body: messages: - data: '${base64Msg}'
JSON
[ { "init": { "assign": [ { "project": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}" }, { "topic": "TOPIC_ID" }, { "subscription": "SUBSCRIPTION_ID" }, { "message": { "hello": "world" } }, { "base64Msg": "${base64.encode(json.encode(message))}" } ] } }, { "publish_message_to_topic": { "call": "googleapis.pubsub.v1.projects.topics.publish", "args": { "topic": "${\"projects/\" + project + \"/topics/\" + topic}", "body": { "messages": [ { "data": "${base64Msg}" } ] } } } } ]
更改下列內容:
TOPIC_ID
:Pub/Sub 主題的 ID 或完整修飾 ID。SUBSCRIPTION_ID
:Pub/Sub 訂閱項目的 ID 或完整修飾符。
提取訊息
您可以建立 Eventarc 觸發條件,將 Pub/Sub 主題連結至 Workflows 事件接收器。訊息會發布至 Pub/Sub 主題,以產生事件,而事件會以執行階段引數的形式傳遞至目的地工作流程。詳情請參閱「使用事件或 Pub/Sub 訊息觸發工作流程」。
您也可以建立可擷取 Pub/Sub 訊息的工作流程。在以下範例中,工作流程會等待使用輪詢功能發布訊息。
YAML
- pullMessage: call: googleapis.pubsub.v1.projects.subscriptions.pull args: subscription: '${"projects/" + project + "/subscriptions/" + subscription}' body: maxMessages: 1 result: m - checkState: switch: - condition: ${m.receivedMessages[0].message.data != ""} next: outputMessage - wait: call: sys.sleep args: seconds: 60 next: pullMessage - outputMessage: return: '${json.decode(base64.decode(m.receivedMessages[0].message.data))}'
JSON
[ { "pullMessage": { "call": "googleapis.pubsub.v1.projects.subscriptions.pull", "args": { "subscription": "${\"projects/\" + project + \"/subscriptions/\" + subscription}", "body": { "maxMessages": 1 } }, "result": "m" } }, { "checkState": { "switch": [ { "condition": "${m.receivedMessages[0].message.data != \"\"}", "next": "outputMessage" } ] } }, { "wait": { "call": "sys.sleep", "args": { "seconds": 60 }, "next": "pullMessage" } }, { "outputMessage": { "return": "${json.decode(base64.decode(m.receivedMessages[0].message.data))}" } } ]