本文說明如何使用 Apache Beam PubSubIO
I/O 連接器,將 Dataflow 中的文字資料寫入 Pub/Sub。
總覽
如要將資料寫入 Pub/Sub,請使用 PubSubIO 連接器。輸入元素可以是 Pub/Sub 訊息,也可以只是訊息資料。如果輸入元素是 Pub/Sub 訊息,您也可以選擇在每則訊息中設定屬性或排序鍵。
您可以使用 Java、Python 或 Go 版本的 PubSubIO 連接器,如下所示:
Java
如要寫入單一主題,請呼叫 PubsubIO.writeMessages 方法。這個方法會採用 PubsubMessage 物件的輸入集合。連接器也會定義便利方法,用於編寫字串、二進位編碼的 Avro 訊息或二進位編碼的 protobuf 訊息。這些方法會將輸入集合轉換為 Pub/Sub 訊息。
如要根據輸入資料寫入動態主題集,請呼叫 writeMessagesDynamic。呼叫訊息上的 PubsubMessage.withTopic,為每則訊息指定目的地主題。舉例來說,您可以根據輸入資料中特定欄位的值,將訊息轉送至不同主題。
詳情請參閱PubsubIO參考說明文件。
Python
呼叫 pubsub.WriteToPubSub 方法。
根據預設,這個方法會採用 bytes 類型的輸入集合,代表訊息酬載。如果 with_attributes 參數是 True,這個方法會採用 PubsubMessage 物件的集合。
詳情請參閱 pubsub 模組參考說明文件。
Go
如要將資料寫入 Pub/Sub,請呼叫 pubsubio.Write 方法。這個方法會接收 PubSubMessage 物件或包含訊息酬載的位元組切片輸入集合。
詳情請參閱 pubsubio 套件參考說明文件。
如要進一步瞭解 Pub/Sub 訊息,請參閱 Pub/Sub 說明文件中的「訊息格式」。
時間戳記
Pub/Sub 會為每則訊息設定時間戳記。這個時間戳記代表訊息發布至 Pub/Sub 的時間。在串流情境中,您可能也會在意「事件」時間戳記,也就是產生訊息資料的時間。您可以使用 Apache Beam 元素時間戳記來表示事件時間。建立不受限 PCollection 的來源通常會為每個新元素指派與事件時間相符的時間戳記。
如果是 Java 和 Python,Pub/Sub I/O 連接器可將每個元素的時間戳記寫入為 Pub/Sub 訊息屬性。訊息消費者可以使用這項屬性取得事件時間戳記。
Java
呼叫 PubsubIO.Write<T>.withTimestampAttribute 並指定屬性的名稱。
Python
呼叫 WriteToPubSub 時,請指定 timestamp_attribute 參數。
郵件傳送
Dataflow 支援管道內訊息的僅處理一次作業。不過,Pub/Sub I/O 連接器無法保證透過 Pub/Sub 傳送的訊息只會傳送一次。
如果是 Java 和 Python,您可以設定 Pub/Sub I/O 連接器,將每個元素的專屬 ID 寫入為訊息屬性。訊息消費者隨後可以使用這項屬性來刪除重複訊息。
Java
呼叫 PubsubIO.Write<T>.withIdAttribute 並指定屬性的名稱。
Python
呼叫 WriteToPubSub 時,請指定 id_label 參數。
直接輸出
如果您在管道中啟用至少一次串流模式,I/O 連接器就會使用直接輸出。在這個模式中,連接器不會檢查點訊息,因此寫入速度較快。不過, 在此模式下重試可能會導致重複訊息具有不同的訊息 ID, 可能導致訊息消費者更難將訊息重複資料刪除。
對於使用「只傳送一次」模式的管道,您可以設定 streaming_enable_pubsub_direct_output
服務選項,啟用直接輸出功能。直接輸出可減少寫入延遲,並提高處理效率。如果訊息消費者可以處理具有非專屬訊息 ID 的重複訊息,建議您採用這個選項。
範例
下列範例會建立 PCollection 的 Pub/Sub 訊息,並將這些訊息寫入 Pub/Sub 主題。主題會指定為管道選項。每則訊息都包含酬載資料和一組屬性。
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。
Python
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。