TIPCommon.adapters

class TIPCommon.adapters.pubsub.pubsub.PubSubAdapter

TIPCommon.adapters.pubsub.pubsub.PubSubAdapter(session, project_id=None, logger=None, region=None)

基础:object

用于管理 Google Cloud 项目 Pub/Sub 主题和订阅的适配器类。

ack

ack(sub_name, ack_ids)

确认与从 PubSubAdapter.pull() 返回的 AcknowledgeRequest 响应中的 ackIDs 关联的消息。

参数
sub_name str

订阅名称。

ack_ids (list[str])

要确认的消息的确认 ID 列表。该列表由 Pub/Sub 系统在 PubSubAdapter.pull() 响应中返回。

static build_pubsub_message

静态 build_pubsub_message(message_content, encoding='utf-8', ordering_key=None, **attr)

创建 PubSubMessage 对象。

参数
message_content (str)

消息内容。

encoding (str)

用于对消息文本进行编码或解码的编码类型。

默认值为 UTF-8

ordering_key 可选 (str)

如果使用,该参数会标识所有必须遵循发布顺序的相关消息。

**attr 可选

str

要作为消息对象属性传递的属性。

返回值

TIPCommon.adapters.pubsub.PubSubMessage 对象。

返回类型

PubSubMessage

create_subscription

create_subscription(sub_name, topic, **attr)

为指定主题创建 Pub/Sub 订阅。

参数
sub_name (str)

订阅名称。

topic (str)

要为其创建订阅的 Pub/Sub 主题名称。

**attr 要传递给订阅请求的其他参数。

返回值

所创建订阅的 TIPCommon.adapters.pubsub.Subscription 对象。

返回类型

Subscription

create_topic

create_topic(topic_name)

在 Google Cloud 项目中创建 Pub/Sub 主题。

参数
topic_name (str)

主题的名称。名称必须符合 Google Cloud 资源名称规则

返回值

所创建主题的 TIPCommon.adapters.pubsub.Topic 对象。

返回类型

Topic

delete_subscription

delete_subscription(sub_name)

从 Google Cloud 项目中删除 Pub/Sub 订阅。

参数
sub_name (str)

要移除的订阅名称。

delete_topic

delete_topic(topic_name)

从 Google Cloud 项目中删除 Pub/Sub 主题。

参数
topic_name str

要移除的主题。

来自凭据的 static

静态 from_credentials(credentials, project_id=None, verify_ssl=True, quota_project=None, logger=None, region=None)

基于 google.oauth2.credentials.Credentials 对象创建 PubSubAdapter 对象。

参数
credentials (google.oauth2.credentials.Credentials)

google.oauth2.credentials.Credentials 对象。

project_id 可选 (str)

Google Cloud 项目 ID。

如果未提供,该函数会尝试默认使用凭据对象中配置的项目。

verify_ssl 可选 (bool)

指定是否为 HTTP 会话启用 SSL 证书验证。

quota_project 可选 (str)

用于配额和结算的项目。

logger 可选 (SiemplifyLogger)

SiemplifyLogger 对象。

region (str)

Pub/Sub 的工作区域。

返回值

PubSubAdapter 对象。

返回类型

PubSubAdapter

来自 service_account_info 的 static

静态 subscription_name(project_id, sub_name)

根据 service_account JSON 创建 PubSubAdapter 对象。

参数
user_service_account (str)

文本格式的 Google Cloud 服务账号 JSON。

project_id 可选 (str)

Google Cloud 项目 ID。

如果未提供,该函数会尝试默认使用凭据对象中配置的项目。

verify_ssl 可选 (bool)

指定是否为 HTTP 会话启用 SSL 证书验证。

quota_project 可选 (str)

用于配额和结算的项目。

logger 可选 (SiemplifyLogger)

SiemplifyLogger 对象。

返回值

PubSubAdapter 对象。

返回类型

PubSubAdapter

get_subscription

get_subscription(sub_name, topic=None, create_if_not_exist=False, **attr)

检索 Pub/Sub 订阅。

参数
sub_name (str)

订阅名称。

topic (str)

要为其创建订阅的 Pub/Sub 主题名称。

如果 create_if_not_existTrue,则必须提供此值。

create_if_not_exist 在 Google Cloud 中创建 Pub/Sub 订阅(如果不存在)。
**attr 要传递给订阅创建请求的其他参数。

返回值

检索到的订阅的 TIPCommon.adapters.pubsub.Subscription 对象。

返回类型

Subscription

get_topic

get_topic(topic_name, create_if_not_exist=False)

从配置的 Google Cloud项目中检索 Pub/Sub 主题对象。

参数
topic_name str

主题的名称(已简化,不含 projects/PROJECT_ID/topics/ 前缀)。

create_if_not_exist bool

在 Google Cloud中创建 Pub/Sub 主题(如果该主题不存在)。

返回值

接收到的主题的 TIPCommon.adapters.pubsub.Topic 对象。

返回类型

Topic

patch_subscription

patch_subscription(sub_name, topic_name, push_config=None, bigquery_config=None, cloud_storage_config=None, ack_deadline_seconds=None, retain_acked_messages=None, retention_duration=None, labels=None, enable_message_ordering=None, expiration_policy=None, query_filter=None, dead_letter_policy=None, return_policy=None, detached=None, enable_once_delivery=None)

更新现有订阅。

参数
sub_name str

订阅名称。

topic_name str

相应订阅接收消息的主题的名称。

返回值

收到的订阅的 TIPCommon.adapters.pubsub.Subscription 对象。

返回类型

Subscription

patch_topic

patch_topic(topic_name, labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, satisfies_pzs=None, retention_duration=None)

用于更新现有主题。

参数
topic_name str

主题名称。

返回值

接收到的主题的 TIPCommon.adapters.pubsub.Topic 对象。

返回类型

Topic

publish

publish(topic_name, messages)

将 PubSubMessage 对象列表发布到主题。

参数
topic_name str

要发布消息的主题的名称。

messages list

PubSubMessage 对象的列表。

您可以使用 PubSubAdapter.build_message() 静态方法创建列表。

返回值

消息 ID 列表。

返回类型

list[str]

拉取

pull(sub_name, limit, timeout=60, encoding='utf-8')

从 Pub/Sub 订阅中拉取消息。

参数
sub_name str

订阅名称。

limit int

相应请求要返回的最大消息数。

timeout int

HTTP 请求超时时间(以秒为单位)。

默认值为 60 秒。

encoding str

Pub/Sub 消息编码。默认值为 utf-8

返回值

TIPCommon.adapters.pubsub.ReceivedMessage 对象的列表。

返回类型

list[ReceivedMessage]

静态 subscription_name

静态 subscription_name(project_id, sub_name)

检索完整订阅名称,格式如下:projects/project_id/subscriptions/subscription_name

参数
project_id str

包含资源的项目的名称。

sub_name str

Pub/Sub 订阅名称。

返回值

采用以下格式的完整订阅名称:projects/project_id/subscriptions/subscription_name

返回类型

str

静态 topic_name

静态 topic_name(project_id, topic)

检索 projects/project_id/topics/topic_name

参数
project_id str

包含资源的项目的名称。

topic str

Pub/Sub 主题名称。

返回值

完整的主题名称:projects/project_id/topics/topic_name

返回类型

str

class TIPCommon.adapters.pubsub.data_models.PubSubMessage

TIPCommon.adapters.pubsub.data_models.PubSubMessage(raw_data: 'dict', data: 'str' = None, attributes: 'dict' = None, message_id: 'str' = None, publish_time: 'int' = None, ordering_key: 'str' = None)

代码库:对象

attributes: dict= None

data: str= None

json()

message_id: str= None

ordering_key: str= None

publish_time: int= None

raw_data: dict

class TIPCommon.adapters.pubsub.data_models.ReceivedMessage

TIPCommon.adapters.pubsub.data_models.ReceivedMessage(raw_data: 'dict', ack_id: 'str', message: 'PubSubMessage', delivery_attempt: 'int')

代码库:对象

ack_id: str

delivery_attempt: int

json()

message: PubSubMessage

raw_data: dict

class TIPCommon.adapters.pubsub.data_models.SchemaSettings

TIPCommon.adapters.pubsub.data_models.SchemaSettings(raw_data: 'dict', schema: 'str', encoding: 'str' = None, first_revision_id: 'str' = None, last_revision_id: 'str' = None)

代码库:对象

encoding: str= None

first_revision_id: str= None

json()

last_revision_id: str= None

raw_data: dict

schema: str

class TIPCommon.adapters.pubsub.data_models.Subscription

TIPCommon.adapters.pubsub.data_models.Subscription(raw_data: 'dict', name: 'str', identifier: 'str', topic_identifier: 'str', state: 'str', ack_deadline_secs: 'int' = None, retain_ack_messages: 'bool' = None, message_retention_duration: 'int' = None, labels: 'dict' = None, message_ordering: 'bool' = None, query_filter: 'str' = None, topic_message_retention_duration: 'int' = None)

代码库:对象

ack_deadline_secs: int= None

identifier: str

json()

labels: dict= None

message_ordering: bool= None

message_retention_duration: int= None

name: str

query_filter: str= None

raw_data: dict

retain_ack_messages: bool= None

state: str

topic_identifier: str

topic_message_retention_duration: int= None

TIPCommon.adapters.pubsub.data_models.Topic

TIPCommon.adapters.pubsub.data_models.Topic(raw_data: 'dict', name: 'str', identifier: 'str', labels: 'dict' = None, schema_settings: 'SchemaSettings' = None, message_retention_duration: 'int' = None)

代码库:对象

identifier: str

json()

labels: dict= None

message_retention_duration: int= None

name: str

raw_data: dict

schema_settings: SchemaSettings= None

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。