TIPCommon.adapters
class TIPCommon.adapters.pubsub.pubsub.PubSubAdapter
类 TIPCommon.adapters.pubsub.pubsub.PubSubAdapter(session, project_id=None, logger=None, region=None)
基础:object
用于管理 Google Cloud 项目 Pub/Sub 主题和订阅的适配器类。
ack
ack(sub_name, ack_ids)
确认与从 PubSubAdapter.pull()
返回的 AcknowledgeRequest
响应中的 ackIDs
关联的消息。
参数 | |
---|---|
sub_name |
(str )订阅名称。 |
ack_ids |
(list[str])
要确认的消息的确认 ID 列表。该列表由 Pub/Sub 系统在 |
static build_pubsub_message
静态 build_pubsub_message(message_content, encoding='utf-8', ordering_key=None, **attr)
创建 PubSubMessage 对象。
参数 | |
---|---|
message_content |
(str )
消息内容。 |
encoding |
(str )
用于对消息文本进行编码或解码的编码类型。 默认值为 |
ordering_key |
可选 (str )
如果使用,该参数会标识所有必须遵循发布顺序的相关消息。 |
**attr |
可选 ( 要作为消息对象属性传递的属性。 |
返回值
TIPCommon.adapters.pubsub.PubSubMessage
对象。
返回类型
create_subscription
create_subscription(sub_name, topic, **attr)
为指定主题创建 Pub/Sub 订阅。
参数 | |
---|---|
sub_name |
(str )
订阅名称。 |
topic |
(str )
要为其创建订阅的 Pub/Sub 主题名称。 |
**attr |
要传递给订阅请求的其他参数。 |
返回值
所创建订阅的 TIPCommon.adapters.pubsub.Subscription
对象。
返回类型
create_topic
create_topic(topic_name)
在 Google Cloud 项目中创建 Pub/Sub 主题。
参数 | |
---|---|
topic_name |
(str )
主题的名称。名称必须符合 Google Cloud 资源名称规则。 |
返回值
所创建主题的 TIPCommon.adapters.pubsub.Topic
对象。
返回类型
delete_subscription
delete_subscription(sub_name)
从 Google Cloud 项目中删除 Pub/Sub 订阅。
参数 | |
---|---|
sub_name |
(str )
要移除的订阅名称。 |
delete_topic
delete_topic(topic_name)
从 Google Cloud 项目中删除 Pub/Sub 主题。
参数 | |
---|---|
topic_name |
(str )要移除的主题。 |
来自凭据的 static
静态 from_credentials(credentials, project_id=None, verify_ssl=True, quota_project=None, logger=None, region=None)
基于 google.oauth2.credentials.Credentials
对象创建 PubSubAdapter 对象。
参数 | |
---|---|
credentials |
(google.oauth2.credentials.Credentials )
|
project_id |
可选 (str )
Google Cloud 项目 ID。 如果未提供,该函数会尝试默认使用凭据对象中配置的项目。 |
verify_ssl |
可选 (bool )
指定是否为 HTTP 会话启用 SSL 证书验证。 |
quota_project |
可选 (str )
用于配额和结算的项目。 |
logger |
可选 (SiemplifyLogger )
|
region |
(str )
Pub/Sub 的工作区域。 |
返回值
PubSubAdapter
对象。
返回类型
PubSubAdapter
来自 service_account_info 的 static
静态 subscription_name(project_id, sub_name)
根据 service_account
JSON 创建 PubSubAdapter 对象。
参数 | |
---|---|
user_service_account |
(str )
文本格式的 Google Cloud 服务账号 JSON。 |
project_id |
可选 (str )
Google Cloud 项目 ID。 如果未提供,该函数会尝试默认使用凭据对象中配置的项目。 |
verify_ssl |
可选 (bool )
指定是否为 HTTP 会话启用 SSL 证书验证。 |
quota_project |
可选 (str )
用于配额和结算的项目。 |
logger |
可选 (SiemplifyLogger )
|
返回值
PubSubAdapter
对象。
返回类型
PubSubAdapter
get_subscription
get_subscription(sub_name, topic=None, create_if_not_exist=False, **attr)
检索 Pub/Sub 订阅。
参数 | |
---|---|
sub_name |
(str )
订阅名称。 |
topic |
(str )
要为其创建订阅的 Pub/Sub 主题名称。 如果 |
create_if_not_exist |
在 Google Cloud 中创建 Pub/Sub 订阅(如果不存在)。 |
**attr |
要传递给订阅创建请求的其他参数。 |
返回值
检索到的订阅的 TIPCommon.adapters.pubsub.Subscription
对象。
返回类型
get_topic
get_topic(topic_name, create_if_not_exist=False)
从配置的 Google Cloud项目中检索 Pub/Sub 主题对象。
参数 | |
---|---|
topic_name |
(str )主题的名称(已简化,不含 |
create_if_not_exist |
bool 在 Google Cloud中创建 Pub/Sub 主题(如果该主题不存在)。 |
返回值
接收到的主题的 TIPCommon.adapters.pubsub.Topic
对象。
返回类型
patch_subscription
patch_subscription(sub_name, topic_name, push_config=None, bigquery_config=None, cloud_storage_config=None, ack_deadline_seconds=None, retain_acked_messages=None, retention_duration=None, labels=None, enable_message_ordering=None, expiration_policy=None, query_filter=None, dead_letter_policy=None, return_policy=None, detached=None, enable_once_delivery=None)
更新现有订阅。
参数 | |
---|---|
sub_name |
(str )订阅名称。 |
topic_name |
(str )相应订阅接收消息的主题的名称。 |
返回值
收到的订阅的 TIPCommon.adapters.pubsub.Subscription
对象。
返回类型
patch_topic
patch_topic(topic_name, labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, satisfies_pzs=None, retention_duration=None)
用于更新现有主题。
参数 | |
---|---|
topic_name |
(str )主题名称。 |
返回值
接收到的主题的 TIPCommon.adapters.pubsub.Topic
对象。
返回类型
publish
publish(topic_name, messages)
将 PubSubMessage 对象列表发布到主题。
参数 | |
---|---|
topic_name |
(str )要发布消息的主题的名称。 |
messages |
list PubSubMessage 对象的列表。 您可以使用 |
返回值
消息 ID 列表。
返回类型
list[str]
拉取
pull(sub_name, limit, timeout=60, encoding='utf-8')
从 Pub/Sub 订阅中拉取消息。
参数 | |
---|---|
sub_name |
(str )订阅名称。 |
limit |
int 相应请求要返回的最大消息数。 |
timeout |
int HTTP 请求超时时间(以秒为单位)。 默认值为 60 秒。 |
encoding |
(str )Pub/Sub 消息编码。默认值为 |
返回值
TIPCommon.adapters.pubsub.ReceivedMessage
对象的列表。
返回类型
list[ReceivedMessage]
静态 subscription_name
静态 subscription_name(project_id, sub_name)
检索完整订阅名称,格式如下:projects/project_id
/subscriptions/subscription_name
。
参数 | |
---|---|
project_id |
(str )包含资源的项目的名称。 |
sub_name |
(str )Pub/Sub 订阅名称。 |
返回值
采用以下格式的完整订阅名称:projects/project_id
/subscriptions/subscription_name
。
返回类型
str
静态 topic_name
静态 topic_name(project_id, topic)
检索 projects/project_id
/topics/topic_name
。
参数 | |
---|---|
project_id |
(str )包含资源的项目的名称。 |
topic |
(str )Pub/Sub 主题名称。 |
返回值
完整的主题名称:projects/project_id
/topics/topic_name
。
返回类型
str
class TIPCommon.adapters.pubsub.data_models.PubSubMessage
类 TIPCommon.adapters.pubsub.data_models.PubSubMessage(raw_data: 'dict', data: 'str' = None, attributes: 'dict' = None, message_id: 'str' = None, publish_time: 'int' = None, ordering_key: 'str' = None)
代码库:对象
attributes: dict= None
data: str= None
json()
message_id: str= None
ordering_key: str= None
publish_time: int= None
raw_data: dict
class TIPCommon.adapters.pubsub.data_models.ReceivedMessage
类 TIPCommon.adapters.pubsub.data_models.ReceivedMessage(raw_data: 'dict', ack_id: 'str', message: 'PubSubMessage', delivery_attempt: 'int')
代码库:对象
ack_id: str
delivery_attempt: int
json()
message: PubSubMessage
raw_data: dict
class TIPCommon.adapters.pubsub.data_models.SchemaSettings
类 TIPCommon.adapters.pubsub.data_models.SchemaSettings(raw_data: 'dict', schema: 'str', encoding: 'str' = None, first_revision_id: 'str' = None, last_revision_id: 'str' = None)
代码库:对象
encoding: str= None
first_revision_id: str= None
json()
last_revision_id: str= None
raw_data: dict
schema: str
class TIPCommon.adapters.pubsub.data_models.Subscription
类 TIPCommon.adapters.pubsub.data_models.Subscription(raw_data: 'dict', name: 'str', identifier: 'str', topic_identifier: 'str', state: 'str', ack_deadline_secs: 'int' = None, retain_ack_messages: 'bool' = None, message_retention_duration: 'int' = None, labels: 'dict' = None, message_ordering: 'bool' = None, query_filter: 'str' = None, topic_message_retention_duration: 'int' = None)
代码库:对象
ack_deadline_secs: int= None
identifier: str
json()
labels: dict= None
message_ordering: bool= None
message_retention_duration: int= None
name: str
query_filter: str= None
raw_data: dict
retain_ack_messages: bool= None
state: str
topic_identifier: str
topic_message_retention_duration: int= None
类 TIPCommon.adapters.pubsub.data_models.Topic
类 TIPCommon.adapters.pubsub.data_models.Topic(raw_data: 'dict', name: 'str', identifier: 'str', labels: 'dict' = None, schema_settings: 'SchemaSettings' = None, message_retention_duration: 'int' = None)
代码库:对象
identifier: str
json()
labels: dict= None
message_retention_duration: int= None
name: str
raw_data: dict
schema_settings: SchemaSettings= None
需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。