קריאה מ-Apache Kafka אל Dataflow

במסמך הזה מוסבר איך לקרוא נתונים מ-Apache Kafka ל-Dataflow, והוא כולל טיפים לשיפור הביצועים ושיטות מומלצות.

ברוב תרחישי השימוש, מומלץ להשתמש במחבר I/O מנוהל כדי לקרוא מ-Kafka.

אם אתם צריכים כוונון ביצועים מתקדם יותר, כדאי להשתמש במחבר KafkaIO. מחבר KafkaIO זמין ל-Java או באמצעות מסגרת צינורות רב-לשוניים ל-Python ול-Go.

מקביליות

בקטעים הבאים מוסבר איך להגדיר מקביליות כשקוראים מ-Kafka.

סקירה כללית

המקביליות מוגבלת על ידי שני גורמים: מספר העובדים המקסימלי (max_num_workers) ומספר המחיצות של Kafka. ב-Dataflow, ערך ברירת המחדל של פיצול מקבילי הוא 4 כפול max_num_workers. עם זאת, הפיצול מוגבל במספר המחיצות. לדוגמה, אם יש 100 ליבות וירטואליות זמינות, אבל צינור הנתונים קורא רק מ-10 מחיצות של Kafka, רמת המקביליות המקסימלית היא 10.

כדי למקסם את ההפעלה המקבילית, מומלץ להשתמש ב-4 מחיצות Kafka לפחות.max_num_workers אם העבודה שלכם משתמשת ב-Runner v2, כדאי להגדיר ערך גבוה יותר של מקביליות. נקודת התחלה טובה היא ליצור מחיצות ששווה למספר ליבות ה-vCPU של העובד כפול 2.

הפצה מחדש

אם אי אפשר להגדיל את מספר המחיצות, אפשר להגדיל את המקביליות על ידי קריאה ל-KafkaIO.Read.withRedistribute. השיטה הזו מוסיפה טרנספורמציה Redistribute לצינור, שמספקת רמז ל-Dataflow לגבי חלוקה מחדש של הנתונים והפעלתם במקביל בצורה יעילה יותר. מומלץ מאוד לציין את מספר הרסיסים האופטימלי על ידי קריאה ל-KafkaIO.Read.withRedistributeNumKeys. שימוש ב-KafkaIO.Read.withRedistribute בלבד יכול ליצור מספר רב של מפתחות, מה שמוביל לבעיות בביצועים. מידע נוסף זמין במאמר בנושא זיהוי שלבים עם מקביליות גבוהה. הפצה מחדש של הנתונים מוסיפה תקורה מסוימת לביצוע שלב הערבוב. מידע נוסף זמין במאמר בנושא מניעת מיזוג.

כדי לצמצם את העלות של ערבוב מחדש של נתונים, צריך להתקשר אל KafkaIO.Read.withOffsetDeduplication. במצב הזה, כמות הנתונים שצריך לשמור כחלק מהערבוב מצטמצמת, אבל עדיין מתבצע עיבוד מדויק של כל נתון.

אם לא נדרש עיבוד של כל פריט בדיוק פעם אחת, אפשר לאפשר כפילויות על ידי קריאה ל-KafkaIO.Read.withAllowDuplicates.

בטבלה הבאה מפורטות האפשרויות להפצה מחדש:

אפשרות מצב עיבוד Apache Beam הגדרות אישיות
הפצה מחדש של קלט בדיוק פעם אחת גרסה 2.60 ואילך KafkaIO.Read.withRedistribute()
אפשרות להוספת כפילויות לפחות פעם אחת גרסה 2.60 ואילך KafkaIO.Read.withRedistribute().withAllowDuplicates()
ביטול כפילויות בהיסט בדיוק פעם אחת גרסה 2.69 ואילך KafkaIO.Read.withRedistribute().withOffsetDeduplication()

הטיה בעומס

כדאי לוודא שהעומס בין המחיצות יחסית אחיד ולא מוטה. אם העומס לא מאוזן, יכול להיות שהעובדים לא ינוצלו בצורה טובה. עובדים שקוראים ממחיצות עם עומס קל יותר עשויים להיות במצב יחסי של חוסר פעילות, ועובדים שקוראים ממחיצות עם עומס כבד עשויים להישאר מאחור. ב-Dataflow מוצגים מדדים לגבי ה-backlog לכל מחיצה.

אם העומס לא מאוזן, איזון דינמי של העבודה יכול לעזור לחלק את העבודה. לדוגמה, יכול להיות ש-Dataflow יקצה worker אחד לקריאה ממחיצות מרובות עם נפח נמוך, ויקצה worker אחר לקריאה ממחיצה אחת עם נפח גבוה. עם זאת, שני עובדים לא יכולים לקרוא מאותה מחיצה, ולכן מחיצה עם עומס כבד עדיין יכולה לגרום לעיכוב בצינור.

שיטות מומלצות

בקטע הזה מפורטות המלצות לקריאה מ-Kafka אל Dataflow.

נושאים עם נפח חיפוש נמוך

תרחיש נפוץ הוא קריאה מכמה נושאים עם נפח נמוך בו-זמנית – לדוגמה, נושא אחד לכל לקוח. יצירת משימות נפרדות של Dataflow לכל נושא היא לא חסכונית, כי כל משימה דורשת לפחות עובד מלא אחד. במקום זאת, אפשר לנסות את האפשרויות הבאות:

  • מיזוג נושאים. לשלב נושאים לפני שהם מוזנים ל-Dataflow. יעיל יותר להטמיע כמה נושאים עם נפח חיפוש גבוה מאשר להטמיע הרבה נושאים עם נפח חיפוש נמוך. כל נושא עם נפח גבוה יכול להיות מטופל על ידי משימת Dataflow אחת שמנצלת את העובדים שלה באופן מלא.

  • קריאת כמה נושאים אם אי אפשר לשלב נושאים לפני שמטמיעים אותם ב-Dataflow, כדאי ליצור צינור שקורא מנושאים מרובים. הגישה הזו מאפשרת ל-Dataflow להקצות כמה נושאים לאותה מכונת worker. יש שתי דרכים להטמיע את הגישה הזו:

    • שלב קריאה יחיד. יוצרים מופע יחיד של מחבר KafkaIO ומגדירים אותו לקריאת מספר נושאים. אחר כך מסננים לפי שם הנושא כדי להחיל לוגיקה שונה לכל נושא. דוגמאות לקוד זמינות במאמר בנושא קריאה מכמה נושאים. כדאי לבחור באפשרות הזו אם כל הנושאים נמצאים באותו אשכול. חיסרון אחד הוא שבעיות ב-sink או ב-transform יחידים עלולות לגרום להצטברות של נתונים שלא נשלחו בכל הנושאים.

      בתרחישי שימוש מתקדמים יותר, אפשר להעביר קבוצה של KafkaSourceDescriptorאובייקטים שמציינים את הנושאים לקריאה. באמצעות KafkaSourceDescriptor תוכלו לעדכן את רשימת הנושאים בהמשך, אם יהיה צורך בכך. כדי להשתמש בתכונה הזו, צריך Java עם Runner v2.

    • Multiple read steps (קריאה של נתוני הצעדים). כדי לקרוא מנושאים שנמצאים באשכולות שונים, יכול להיות שצינור הנתונים יכלול כמה מופעים של KafkaIO. בזמן שהעבודה פועלת, אפשר לעדכן מקורות בודדים באמצעות מיפויי טרנספורמציה. אפשר להגדיר נושא או קלאסטר חדשים רק כשמשתמשים ב-Runner v2. הגישה הזו עלולה להקשות על יכולת הצפייה, כי צריך לעקוב אחרי כל טרנספורמציה של קריאה בנפרד במקום להסתמך על מדדים ברמת צינור הנתונים.

ביצוע Commit חזרה ל-Kafka

כברירת מחדל, מחבר KafkaIO לא משתמש בהיסטים של Kafka כדי לעקוב אחרי ההתקדמות, ולא מבצע commit בחזרה ל-Kafka. אם מתקשרים אל commitOffsetsInFinalize, המחבר עושה כמיטב יכולתו כדי לבצע commit חזרה אל Kafka אחרי שרשומות מבוצעות ב-Dataflow. יכול להיות שרשומות שבוצעו ב-Dataflow לא יעברו עיבוד מלא, ולכן אם מבטלים את צינור הנתונים, יכול להיות שקיזוז יבוצע בלי שהרשומות יעברו עיבוד מלא.

לא מומלץ להשתמש באפשרות הזו כי ההגדרה enable.auto.commit=True מבצעת commit להיסטים ברגע שהם נקראים מ-Kafka בלי ש-Dataflow יעבד אותם. ההמלצה היא להגדיר גם את enable.auto.commit=False וגם את commitOffsetsInFinalize=True. אם מגדירים את enable.auto.commit ל-True, יכול להיות שנתונים יאבדו אם הצינור ייקטע במהלך העיבוד. יכול להיות שרשומות שכבר בוצעו ב-Kafka יושמטו.

סימני מים

כברירת מחדל, המחבר KafkaIO משתמש בזמן העיבוד הנוכחי כדי להקצות את סימן המים של הפלט ואת זמן האירוע. כדי לשנות את ההתנהגות הזו, קוראים ל-withTimestampPolicyFactory ומקצים TimestampPolicy. ‫Beam מספקת הטמעות של TimestampPolicy שמחשבות את סימן המים על סמך הזמן שבו היומן של Kafka מצורף או על סמך שעת יצירת ההודעה.

דברים שחשוב לזכור לגבי משחקי ריצה

למחבר KafkaIO יש שתי הטמעות בסיסיות לקריאות של Kafka: הגרסה הישנה יותר ReadFromKafkaViaUnbounded והגרסה החדשה יותר ReadFromKafkaViaSDF. ‫Dataflow בוחר באופן אוטומטי את ההטמעה הכי טובה למשימה על סמך שפת ה-SDK והדרישות של המשימה. אל תבקשו במפורש רץ או הטמעה של Kafka, אלא אם אתם צריכים תכונות ספציפיות שזמינות רק בהטמעה הזו. מידע נוסף על בחירת רץ זמין במאמר שימוש ב-Dataflow Runner v2.

אם צינור הנתונים משתמש ב-withTopic או ב-withTopics, ההטמעה הישנה יותר שולחת שאילתה ל-Kafka בזמן בניית צינור הנתונים כדי לקבל את המחיצות הזמינות. למכונה שיוצרת את צינור הנתונים צריכה להיות הרשאה להתחבר ל-Kafka. אם מופיעה שגיאת הרשאה, צריך לוודא שיש לכם הרשאות להתחבר ל-Kafka באופן מקומי. כדי להימנע מהבעיה הזו, אפשר להשתמש ב-withTopicPartitions, שלא מתחבר ל-Kafka בזמן בניית צינור הנתונים.

פריסה בסביבת הייצור

כשפורסים את הפתרון בסביבת ייצור, מומלץ להשתמש בתבניות גמישות. שימוש בתבנית Flex מאפשר להפעיל את צינור הנתונים מסביבה עקבית, מה שיכול לעזור לצמצם בעיות בהגדרות מקומיות.

הרישום ביומן מ-KafkaIO יכול להיות מפורט מאוד. כדאי לשקול להפחית את רמת הרישום ביומן בייצור באופן הבא:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

מידע נוסף זמין במאמר בנושא הגדרת רמות יומן של עובדי צינורות.

הגדרה של רשתות

כברירת מחדל, Dataflow מפעיל מופעים ברשת ברירת המחדל של הענן הווירטואלי הפרטי (VPC). בהתאם להגדרות של Kafka, יכול להיות שתצטרכו להגדיר רשת ורשת משנה שונות עבור Dataflow. מידע נוסף זמין במאמר ציון רשת ורשת משנה. כשמגדירים את הרשת, צריך ליצור כללי חומת אש שמאפשרים למכונות העובד של Dataflow להגיע לשרתי Kafka.

אם אתם משתמשים ב-VPC Service Controls, אתם צריכים למקם את אשכול Kafka בתוך גבולות הגזרה של VPC Service Controls, או להרחיב את גבולות הגזרה ל-VPN או ל-Cloud Interconnect המורשים.

אם אשכול Kafka נפרס מחוץ ל- Google Cloud, צריך ליצור חיבור רשת בין Dataflow לבין אשכול Kafka. יש כמה אפשרויות שונות לחיבור לרשת, שלכל אחת מהן יש יתרונות וחסרונות:

‫Dedicated Interconnect היא האפשרות הכי טובה לביצועים ולמהימנות צפויים, אבל יכול לקחת יותר זמן להגדיר אותה כי צדדים שלישיים צריכים להקצות את המעגלים החדשים. בטופולוגיה שמבוססת על כתובת IP ציבורית, אפשר להתחיל במהירות כי לא צריך לבצע הרבה עבודות ברשת.

בקטעים הבאים מפורטות האפשרויות האלה.

מרחב כתובות RFC 1918 משותף

גם Dedicated Interconnect וגם IPsec VPN מאפשרים לכם גישה ישירה לכתובות IP של RFC 1918 בענן הווירטואלי הפרטי (VPC), מה שיכול לפשט את ההגדרה של Kafka. אם אתם משתמשים בטופולוגיה שמבוססת על VPN, כדאי להגדיר VPN עם תפוקה גבוהה.

כברירת מחדל, Dataflow מפעיל מופעים ברשת ה-VPC שמוגדרת כברירת מחדל. בטופולוגיה של רשת פרטית עם מסלולים שמוגדרים באופן מפורש ב-Cloud Router שמקשרים בין רשתות משנה ב- Google Cloud לאותו אשכול Kafka, אתם צריכים יותר שליטה במיקום של מופעי Dataflow. אפשר להשתמש ב-Dataflow כדי להגדיר את network וsubnetwork פרמטרים של ביצוע.

חשוב לוודא שיש מספיק כתובות IP זמינות ברשת המשנה המתאימה, כדי שמערכת Dataflow תוכל להפעיל מופעים כשהיא מנסה להרחיב את קנה המידה. בנוסף, כשיוצרים רשת נפרדת להפעלת מכונות Dataflow, צריך לוודא שיש כלל חומת אש שמאפשר תעבורת TCP בין כל המכונות הווירטואליות בפרויקט. כלל חומת האש הזה כבר מוגדר ברשת שמוגדרת כברירת מחדל.

מרחב כתובות IP ציבוריות

הארכיטקטורה הזו משתמשת בפרוטוקול Transport Layer Security ‏(TLS) כדי לאבטח את התעבורה בין לקוחות חיצוניים לבין Kafka, ומשתמשת בתעבורה לא מוצפנת לתקשורת בין ברוקרים. אם מאזין Kafka נקשר לממשק רשת שמשמש לתקשורת פנימית וחיצונית, קל להגדיר את המאזין. עם זאת, בתרחישים רבים, הכתובות שמפורסמות חיצונית של ברוקרי Kafka באשכול שונות מממשקי הרשת הפנימיים שבהם Kafka משתמש. בתרחישים כאלה, אפשר להשתמש במאפיין advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

לקוחות חיצוניים מתחברים באמצעות יציאה 9093 דרך ערוץ SSL, ולקוחות פנימיים מתחברים באמצעות יציאה 9092 דרך ערוץ טקסט לא מוצפן. כשמציינים כתובת בקטע advertised.listeners, צריך להשתמש בשמות DNS (kafkabroker-n.mydomain.com בדוגמה הזו) שמפנים לאותו מופע גם לתנועה חיצונית וגם לתנועה פנימית. יכול להיות ששימוש בכתובות IP ציבוריות לא יפעל כי יכול להיות שהכתובות לא יצליחו לפתור בעיות בתנועה פנימית.

כוונון של Kafka

להגדרות של אשכול Kafka ושל לקוח Kafka יכולה להיות השפעה רבה על הביצועים. בפרט, יכול להיות שההגדרות הבאות נמוכות מדי. בקטע הזה מוצגות כמה נקודות התחלה מומלצות, אבל כדאי להתנסות עם הערכים האלה כדי למצוא את הערכים המתאימים לעומס העבודה הספציפי שלכם.

  • unboundedReaderMaxElements. ברירת המחדל היא 10,000. ערך גבוה יותר, כמו 100,000, יכול להגדיל את הגודל של החבילות, מה שיכול לשפר משמעותית את הביצועים אם הצינור כולל צבירות. עם זאת, ערך גבוה יותר עשוי גם להגדיל את זמן האחזור. כדי להגדיר את הערך, משתמשים ב-setUnboundedReaderMaxElements. ההגדרה הזו לא חלה על Runner v2. ב-Runner v2, משתמשים באפשרות השירות Dataflow‏: sdf_checkpoint_after_output_bytes.

  • unboundedReaderMaxReadTimeMs. ברירת המחדל היא 10,000 אלפיות השנייה. ערך גבוה יותר, כמו 20,000 אלפיות השנייה, יכול להגדיל את גודל החבילה, בעוד שערך נמוך יותר, כמו 5,000 אלפיות השנייה, יכול להקטין את ההשהיה או את העומס. כדי להגדיר את הערך, משתמשים ב-setUnboundedReaderMaxReadTimeMs. ההגדרה הזו לא חלה על Runner v2. ב-Runner v2, משתמשים באפשרות השירות Dataflow‏: sdf_checkpoint_after_duration.

  • max.poll.records. ברירת המחדל היא 500. ערך גבוה יותר עשוי להניב ביצועים טובים יותר על ידי אחזור של יותר רשומות נכנסות יחד, במיוחד כשמשתמשים ב-Runner v2. כדי להגדיר את הערך, קוראים ל-withConsumerConfigUpdates.

  • fetch.max.bytes. ברירת המחדל היא 1MB. ערך גבוה יותר עשוי לשפר את קצב העברת הנתונים על ידי הקטנת מספר הבקשות, במיוחד כשמשתמשים ב-Runner v2. עם זאת, הגדרה של ערך גבוה מדי עלולה להגדיל את זמן האחזור, למרות שסביר יותר שהצוואר בקבוק העיקרי הוא עיבוד במורד הזרם. ערך התחלתי מומלץ הוא 100MB. כדי להגדיר את הערך, קוראים ל-withConsumerConfigUpdates.

  • max.partition.fetch.bytes. ברירת המחדל היא 1MB. הפרמטר הזה מגדיר את כמות הנתונים המקסימלית לכל מחיצה שהשרת מחזיר. הגדלת הערך יכולה לשפר את קצב העברת הנתונים על ידי הקטנת מספר הבקשות, במיוחד כשמשתמשים ב-Runner v2. עם זאת, אם מגדירים ערך גבוה מדי, יכול להיות שזמן האחזור יתארך, למרות שסביר יותר שהצוואר בקבוק העיקרי הוא העיבוד במורד הזרם. הערך המומלץ להתחלה הוא 100MB. כדי להגדיר את הערך, קוראים ל-withConsumerConfigUpdates.

  • consumerPollingTimeout. ברירת המחדל היא 2 שניות. אם חלף הזמן הקצוב לתפוגה של לקוח הצרכן לפני שהוא יכול לקרוא רשומות, נסו להגדיר ערך גבוה יותר. ההגדרה הזו רלוונטית בעיקר כשמבצעים קריאות בין אזורים או קריאות ברשת איטית. כדי להגדיר את הערך, קוראים ל-withConsumerPollingTimeout.

מוודאים ש-receive.buffer.bytes גדול מספיק כדי להכיל את גודל ההודעות. אם הערך קטן מדי, יכול להיות שבלוגים יופיעו צרכנים שנוצרים מחדש באופן רציף ומחפשים היסט ספציפי.

דוגמאות

בדוגמאות הקוד הבאות מוצגות דרכים ליצירת צינורות Dataflow שקוראים מ-Kafka. כשמשתמשים ב-Application Default Credentials בשילוב עם גורם מטפל בקריאה חוזרת שסופק על ידי השירות המנוהל של Google Cloud ל-Apache Kafka, נדרשת גרסה 3.7.0 ואילך של kafka-clients.

קריאה מנושא יחיד

בדוגמה הזו נשתמש במחבר קלט/פלט מנוהל. הוא מראה איך לקרוא מנושא ב-Kafka ולכתוב את נתוני התוכן של ההודעה לקובצי טקסט.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | beam.managed.Read(
                beam.managed.KAFKA,
                config={
                  "bootstrap_servers": options.bootstrap_server,
                  "topic": options.topic,
                  "data_format": "RAW",
                  "auto_offset_reset_config": "earliest",
                  # The max_read_time_seconds parameter is intended for testing.
                  # Avoid using this parameter in production.
                  "max_read_time_seconds": 5
                }
            )
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

קריאה מכמה נושאים

בדוגמה הזו נעשה שימוש במחבר KafkaIO. הוא מראה איך לקרוא מכמה נושאים של Kafka ולהחיל לוגיקה נפרדת של צינורות לכל נושא.

בתרחישי שימוש מתקדמים יותר, אפשר להעביר באופן דינמי קבוצה של אובייקטים מסוג KafkaSourceDescriptor, כדי לעדכן את רשימת הנושאים לקריאה. בגישה הזו נדרש Java עם Runner v2.

Java

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class KafkaReadTopics {

  public static Pipeline createPipeline(Options options) {
    String topic1 = options.getTopic1();
    String topic2 = options.getTopic2();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    var allTopics = pipeline
        .apply(KafkaIO.<Long, String>read()
            .withTopics(List.of(topic1, topic2))
            .withBootstrapServers(options.getBootstrapServer())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withMaxReadTime(Duration.standardSeconds(10))
            .withStartReadTime(Instant.EPOCH)
        );

    // Create separate pipeline branches for each topic.
    // The first branch filters on topic1.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic1)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic1)
            .withSuffix(".txt")
            .withNumShards(1)
        );

    // The second branch filters on topic2.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic2)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic2)
            .withSuffix(".txt")
            .withNumShards(1)
        );
    return pipeline;
  }
}

Python

כדי לבצע אימות ב-Dataflow, צריך להגדיר את Application Default Credentials. מידע נוסף זמין במאמר הגדרת אימות לסביבת פיתוח מקומית.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))

המאמרים הבאים