TIPCommon.adapters

class TIPCommon.adapters.pubsub.pubsub.PubSubAdapter

TIPCommon.adapters.pubsub.pubsub.PubSubAdapter(session, project_id=None, logger=None, region=None)

基础:object

用于管理 Google Cloud 项目 Pub/Sub 主题和订阅的适配器类。

ack

ack(sub_name, ack_ids)

确认与从 PubSubAdapter.pull() 返回的 AcknowledgeRequest 响应中的 ackIds 关联的消息。

参数
sub_name str

订阅名称。

ack_ids list[str]

要确认的消息的确认 ID 列表。该列表由 Pub/Sub 系统在 PubSubAdapter.pull() 响应中返回。

static build_pubsub_message

静态 build_pubsub_message(message_content, encoding='utf-8', ordering_key=None, **attr)

创建一个 PubSubMessage 对象。

参数
message_content str

消息内容。

encoding str

用于对消息文本进行编码或解码的编码类型。

默认值为 UTF-8

ordering_key str

可选。

如果已配置,该参数会标识所有必须遵循发布顺序的相关消息。

**attr str

可选。

要作为消息对象属性传递的属性。

返回值

TIPCommon.adapters.pubsub.PubSubMessage 对象。

返回类型

PubSubMessage

create_subscription

create_subscription(sub_name, topic, **attr)

为指定主题创建 Pub/Sub 订阅。

参数
sub_name str

订阅名称。

topic str

要为其创建订阅的 Pub/Sub 主题名称。

**attr 要传递给订阅请求的其他参数。

返回值

所创建订阅的 TIPCommon.adapters.pubsub.Subscription 对象。

返回类型

Subscription

create_topic

create_topic(topic_name)

在 Google Cloud 项目中创建 Pub/Sub 主题。

参数
topic_name str

主题的名称。

名称必须符合 Google Cloud 资源名称规则

返回值

所创建主题的 TIPCommon.adapters.pubsub.Topic 对象。

返回类型

Topic

delete_subscription

delete_subscription(sub_name)

从 Google Cloud 项目中删除 Pub/Sub 订阅。

参数
sub_name str

要移除的订阅名称。

delete_topic

delete_topic(topic_name)

从 Google Cloud 项目中删除 Pub/Sub 主题。

参数
topic_name str

要移除的主题。

来自凭据的 static

静态 from_credentials(credentials, project_id=None, verify_ssl=True, quota_project=None, logger=None, region=None)

基于 google.oauth2.credentials.Credentials 对象创建 PubSubAdapter 对象。

参数
credentials google.oauth2.credentials.Credentials

google.oauth2.credentials.Credentials 对象。

project_id 可选。

str

Google Cloud 项目 ID。

如果未提供,该函数会尝试默认使用凭据对象中配置的项目。

verify_ssl 可选。

bool

指定是否为 HTTP 会话启用 SSL 证书验证。

quota_project 可选。

str

用于配额和结算的项目。

logger 可选。

SiemplifyLogger

SiemplifyLogger 对象。

region str

Pub/Sub 的工作区域。

返回值

PubSubAdapter 对象。

返回类型

PubSubAdapter

来自 service_account_info 的 static

静态 subscription_name(user_service_account, project_id=None, verify_ssl=True, quota_project=None, logger=None)

根据 service_account JSON 创建 PubSubAdapter 对象。

参数
user_service_account str

文本格式的 Google Cloud 服务账号 JSON。

project_id 可选。

str

Google Cloud 项目 ID。

如果未提供,该函数会尝试默认使用凭据对象中配置的项目。

verify_ssl 可选。

bool

指定是否为 HTTP 会话启用 SSL 证书验证。

quota_project 可选。

str

用于配额和结算的项目。

logger 可选。

SiemplifyLogger

SiemplifyLogger 对象。

返回值

PubSubAdapter 对象。

返回类型

PubSubAdapter

get_subscription

get_subscription(sub_name, topic=None, create_if_not_exist=False, **attr)

检索 Pub/Sub 订阅。

参数
sub_name str

订阅名称。

topic str

要为其创建订阅的 Pub/Sub 主题名称。

如果 create_if_not_existTrue,则必须提供此值。

create_if_not_exist 在 Google Cloud 中创建 Pub/Sub 订阅(如果不存在)。
**attr 要传递给订阅创建请求的其他参数。

返回值

所检索订阅的 TIPCommon.adapters.pubsub.Subscription 对象。

返回类型

Subscription

get_topic

get_topic(topic_name, create_if_not_exist=False)

从配置的 Google Cloud项目中检索 Pub/Sub 主题对象。

参数
topic_name str

主题的名称(已简化,不含 projects/PROJECT_ID/topics/ 前缀)。

create_if_not_exist bool

在 Google Cloud 中创建 Pub/Sub 主题(如果不存在)。

返回值

所接收主题的 TIPCommon.adapters.pubsub.Topic 对象。

返回类型

Topic

patch_subscription

patch_subscription(sub_name, topic_name, push_config=None, bigquery_config=None, cloud_storage_config=None, ack_deadline_seconds=None, retain_acked_messages=None, retention_duration=None, labels=None, enable_message_ordering=None, expiration_policy=None, query_filter=None, dead_letter_policy=None, return_policy=None, detached=None, enable_once_delivery=None)

更新现有订阅。

参数
sub_name str

订阅名称。

topic_name str

订阅接收消息的主题的名称。

push_config dict

可选。推送传送端点的配置。

bigquery_config dict

可选。BigQuery 订阅的配置。

cloud_storage_config dict

可选。Cloud Storage 订阅的配置。

ack_deadline_seconds int

可选。Pub/Sub 在重新传送消息之前等待订阅者确认收到消息的大致时间。

retain_acked_messages bool

可选。指示是否保留已确认的消息。

retention_duration str

可选。从消息发布时算起,在订阅的积压消息中保留未确认消息的时间。

labels dict

可选。用于整理订阅的一组键值对。

enable_message_ordering bool

可选。如果值为 True,则在 PubsubMessage 中使用相同 ordering_key 发布的消息将按 Pub/Sub 系统接收它们的顺序传送。

expiration_policy dict

可选。用于指定相应订阅到期条件的政策。

query_filter str

可选。以 Pub/Sub 过滤条件语言编写的表达式,用于指定哪些消息会传送给订阅。

dead_letter_policy dict

可选。一项政策,用于指定相应订阅中消息的死信条件。

return_policy dict

可选。用于指定退回消息的条件的政策。

detached bool

可选。指示订阅是否已与主题分离。

enable_once_delivery bool

可选。如果值为 True,Pub/Sub 会为此订阅中的消息提供“正好一次”传送。

返回值

所接收订阅的 TIPCommon.adapters.pubsub.Subscription 对象。

返回类型

Subscription

patch_topic

patch_topic(topic_name, labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, satisfies_pzs=None, retention_duration=None)

更新现有主题。

参数
topic_name str

主题名称。

labels dict

可选。用于整理主题的一组键值对。

message_storage_policy dict

可选。用于限制消息静态存储区域的政策。

kms_key_name str

可选。Cloud Key Management Service CryptoKey 的资源名称,用于保护发布到相应主题的消息。

schema_settings dict

可选。用于验证根据架构发布的消息的设置。

satisfies_pzs bool

可选。预留以供日后使用。如果主题满足物理可用区隔离要求,则此字段设置为 True

retention_duration str

可选。主题保留已发布消息的时间。如果配置了此设置,无论任何订阅配置如何,消息都将保留此时长。

返回值

所接收主题的 TIPCommon.adapters.pubsub.Topic 对象。

返回类型

Topic

publish

publish(topic_name, messages)

PubSubMessage 对象列表发布到主题。

参数
topic_name str

消息发布的目标主题的名称。

messages list

包含 PubSubMessage 对象的列表。

PubSubAdapter.build_message() 静态方法用于构造这些对象。

返回值

消息 ID 列表。

返回类型

list[str]

拉取

pull(sub_name, limit, timeout=60, encoding='utf-8')

从 Pub/Sub 订阅中拉取消息。

参数
sub_name str

订阅名称。

limit int

相应请求要返回的最大消息数。

timeout int

HTTP 请求超时时间(以秒为单位)。

默认值为 60

encoding str

Pub/Sub 消息编码。

默认值为 utf-8

返回值

TIPCommon.adapters.pubsub.ReceivedMessage 对象的列表。

返回类型

ReceivedMessage

静态 subscription_name

静态 subscription_name(project_id, sub_name)

检索完整订阅名称,格式如下:projects/project_id/topics/topic_name

参数
project_id str

包含资源的项目的名称。

sub_name str

Pub/Sub 订阅名称。

返回值

完整订阅名称,格式如下:projects/project_id/topics/topic_name

返回类型

str

静态 topic_name

静态 topic_name(project_id, topic)

检索采用以下格式的完整主题名称:projects/project_id/topics/topic_name

参数
project_id str

包含资源的项目名称。

topic str

Pub/Sub 主题名称。

返回值

完整的主题名称:projects/project_id/topics/topic_name

返回类型

str

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。