流水线将总线连接到目标目的地,并将事件消息路由到该目的地。您可以配置流水线,使其以特定格式接收事件数据;也可以在将事件传送到目标之前,将事件数据从一种受支持的格式转换为另一种受支持的格式。例如,您可能需要将事件路由到仅接受 Avro 数据的端点。
支持的格式
支持以下格式转换:
- Avro 到 JSON
- Avro 到 Protobuf
- JSON 到 Avro
- JSON 到 Protobuf
- Protobuf 到 Avro
- Protobuf 到 JSON
请注意以下几点:
转换事件格式时,仅转换事件载荷,而不转换整个事件消息。
如果为流水线指定了入站数据格式,则所有事件都必须符合该格式。任何不符合预期格式的事件都会被视为持久性错误。
如果没有为流水线指定入站数据格式,则无法设置出站格式。
在为特定目的地转换事件格式之前,系统会先应用所有已配置的数据转换。
除非您指定消息绑定,否则事件始终通过二进制内容模式的 HTTP 请求以 CloudEvents 格式传送。
系统会动态检测 JSON 架构。对于 Protobuf 架构定义,您只能定义一个顶级类型,并且不支持引用其他类型的 import 语句。没有
syntax标识符的架构定义默认为proto2。请注意,存在架构大小限制。
配置流水线以格式化事件
您可以在 Google Cloud 控制台中或使用 gcloud CLI 配置流水线,使其以特定格式接收事件数据,或将事件数据从一种格式转换为另一种格式。
控制台
gcloud
打开终端。
您可以创建流水线,也可以使用
gcloud eventarc pipelines update命令更新流水线:gcloud eventarc pipelines update PIPELINE_NAME \ --location=REGION \ --INPUT_PAYLOAD_FLAG \ --destinations=OUTPUT_PAYLOAD_KEY
替换以下内容:
PIPELINE_NAME:流水线的 ID 或完全限定名称REGION:受支持的 Eventarc Advanced 位置或者,您也可以设置 gcloud CLI 位置属性:
gcloud config set eventarc/location REGIONINPUT_PAYLOAD_FLAG:输入数据格式标志,可以是以下值之一:--input-payload-format-avro-schema-definition--input-payload-format-json--input-payload-format-protobuf-schema-definition
请注意,如果为流水线指定了输入数据格式,则所有事件都必须符合该格式。任何不符合预期格式的事件都会被视为持久性错误。
OUTPUT_PAYLOAD_KEY:输出数据格式键,可以是以下值之一:output_payload_format_avro_schema_definitionoutput_payload_format_jsonoutput_payload_format_protobuf_schema_definition
请注意,如果您设置了输出数据格式键,则还必须指定输入数据格式标志。
更新流水线可能需要几分钟时间。
示例:
以下示例使用
--input-payload-format-protobuf-schema-definition标志指定流水线应以具有特定架构的 Protobuf 数据格式接收事件:gcloud eventarc pipelines update my-pipeline \ --input-payload-format-protobuf-schema-definition \ ' syntax = "proto3"; message schema { string name = 1; string severity = 2; } '
以下示例使用
output_payload_format_avro_schema_definition键和--input-payload-format-avro-schema-definition标志创建了一个流水线,该流水线以 Avro 格式接收事件,并以相同格式输出事件:gcloud eventarc pipelines create my-pipeline \ --location=us-central1 \ --destinations=http_endpoint_uri='https://example-endpoint.com',output_payload_format_avro_schema_definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}' \ --input-payload-format-avro-schema-definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'
以下示例使用
output_payload_format_protobuf_schema_definition键和--input-payload-format-avro-schema-definition标志来更新流水线,并使用架构定义将其事件数据从 Avro 转换为 Protobuf:gcloud eventarc pipelines update my-pipeline \ --location=us-central1 \ --destinations=output_payload_format_protobuf_schema_definition='message MessageProto {string prop1 = 1; string prop2 = 2;}' \ --input-payload-format-avro-schema-definition= \ ' { "type": "record", "name": "MessageProto", "fields": [ { "name" : "prop1", "type": "string" }, { "name" : "prop2", "type": "string" }, ] } '