במאמר הזה מוסבר איך לכתוב נתוני טקסט מ-Dataflow ל-Pub/Sub באמצעות מחבר הקלט/פלט של Apache Beam PubSubIO.
סקירה כללית
כדי לכתוב נתונים ל-Pub/Sub, משתמשים במחבר PubSubIO. רכיבי הקלט יכולים להיות הודעות Pub/Sub או רק נתוני ההודעה.
אם רכיבי הקלט הם הודעות Pub/Sub, אפשר להגדיר מאפיינים או מפתח סדר לכל הודעה.
אפשר להשתמש בגרסת Java, Python או Go של המחבר PubSubIO, באופן הבא:
Java
כדי לכתוב לנושא אחד, צריך להפעיל את method PubsubIO.writeMessages. השיטה הזו מקבלת אוסף קלט של אובייקטים מסוג PubsubMessage. המחבר מגדיר גם שיטות נוחות לכתיבת מחרוזות, הודעות Avro עם קידוד בינארי או הודעות protobuf עם קידוד בינארי. השיטות האלה ממירות את אוסף הקלט להודעות Pub/Sub.
כדי לכתוב למערך דינמי של נושאים על סמך נתוני הקלט, קוראים ל-writeMessagesDynamic. מציינים את נושא היעד לכל הודעה על ידי קריאה ל-PubsubMessage.withTopic בהודעה. לדוגמה, אתם יכולים לנתב הודעות לנושאים שונים על סמך הערך של שדה מסוים בנתוני הקלט.
מידע נוסף מופיע במאמרי העזרה של PubsubIO.
Python
מבצעים קריאה ל-method pubsub.WriteToPubSub.
כברירת מחדל, השיטה הזו מקבלת אוסף קלט מסוג bytes, שמייצג את המטען הייעודי (Payload) של ההודעה. אם הפרמטר with_attributes הוא True, השיטה מקבלת אוסף של אובייקטים מסוג PubsubMessage.
מידע נוסף מופיע במאמרי העזרה של מודול pubsub.
Go
כדי לכתוב נתונים ל-Pub/Sub, מפעילים את השיטה pubsubio.Write. השיטה הזו מקבלת אוסף קלט של אובייקטים מסוג PubSubMessage או פרוסות של בייטים שמכילות את מטעני הייעוד של ההודעה.
מידע נוסף מופיע במאמרי העזרה בנושא חבילת pubsubio.
מידע נוסף על הודעות Pub/Sub זמין במאמר פורמט הודעה במסמכי Pub/Sub.
חותמות זמן
ב-Pub/Sub מוגדרת חותמת זמן לכל הודעה. חותמת הזמן הזו מייצגת את השעה שבה ההודעה פורסמה ב-Pub/Sub. בתרחיש של עיבוד סטרימינג, יכול להיות שיהיה לכם חשוב גם חותמת הזמן של האירוע, כלומר הזמן שבו נוצרו נתוני ההודעה. אפשר להשתמש בחותמת הזמן של הרכיב ב-Apache Beam כדי לייצג את זמן האירוע. מקורות שיוצרים PCollection ללא הגבלה לרוב מקצים לכל רכיב חדש חותמת זמן שתואמת לזמן האירוע.
ב-Java וב-Python, מחבר ה-I/O של Pub/Sub יכול לכתוב את חותמת הזמן של כל רכיב כמאפיין של הודעת Pub/Sub. צרכני הודעות יכולים להשתמש במאפיין הזה כדי לקבל את חותמת הזמן של האירוע.
Java
קוראים ל-PubsubIO.Write<T>.withTimestampAttribute ומציינים את שם המאפיין.
Python
מציינים את הפרמטר timestamp_attribute כשמתקשרים אל WriteToPubSub.
מסירת הודעות
Dataflow תומך בעיבוד של הודעות בדיוק פעם אחת בצינור. עם זאת, מחבר הקלט/פלט של Pub/Sub לא יכול להבטיח מסירה של הודעות דרך Pub/Sub בדיוק פעם אחת.
ב-Java וב-Python, אפשר להגדיר את מחבר הקלט/פלט של Pub/Sub כך שיכתוב את המזהה הייחודי של כל רכיב כמאפיין של הודעה. צרכני ההודעות יכולים להשתמש במאפיין הזה כדי לבטל כפילויות של הודעות.
Java
קוראים ל-PubsubIO.Write<T>.withIdAttribute ומציינים את שם המאפיין.
Python
מציינים את הפרמטר id_label כשמתקשרים אל WriteToPubSub.
פלט ישיר
אם מפעילים מצב סטרימינג של לפחות פעם אחת בצינור, מחבר הקלט/פלט משתמש בפלט ישיר. במצב הזה, המחבר לא מבצע נקודות ביקורת בהודעות, מה שמאפשר כתיבה מהירה יותר. עם זאת, ניסיונות חוזרים במצב הזה עלולים לגרום לשכפול הודעות עם מזהי הודעות שונים, מה שיכול להקשות על צרכני ההודעות לבטל את הכפילות של ההודעות.
בצינורות שמשתמשים במצב 'פעם אחת בדיוק', אפשר להפעיל פלט ישיר על ידי הגדרת streaming_enable_pubsub_direct_output
אפשרות השירות. פלט ישיר
מפחית את זמן האחזור של הכתיבה ומוביל לעיבוד יעיל יותר. מומלץ להשתמש באפשרות הזו אם צרכני ההודעות יכולים לטפל בהודעות כפולות עם מזהי הודעות לא ייחודיים.
דוגמאות
בדוגמה הבאה נוצר PCollection של הודעות Pub/Sub והן נכתבות לנושא Pub/Sub. הנושא מצוין כאפשרות של צינור. כל הודעה מכילה נתוני מטען ייעודי (payload) וקבוצה של מאפיינים.
Java
כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.
Python
כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.