הפעלת משימת Dataflow בקונטיינר בהתאמה אישית

במאמר הזה מוסבר איך להריץ צינור Dataflow באמצעות קונטיינר בהתאמה אישית.

מידע על יצירת קובץ אימג' של קונטיינר זמין במאמר פיתוח פתרונות קובצי אימג'ים מותאמים אישית של קונטיינרים ל-Dataflow.

כשמריצים את צינור הנתונים, מפעילים אותו באמצעות Apache Beam SDK עם אותה גרסה וגרסת שפה כמו ה-SDK בתמונת מאגר ה-Docker בהתאמה אישית. השלב הזה עוזר למנוע שגיאות לא צפויות כתוצאה מתלות לא תואמת או מ-SDK לא תואם.

בדיקה מקומית

לפני שמריצים את צינור עיבוד הנתונים ב-Dataflow, מומלץ לבדוק את קובץ אימג' של קונטיינר באופן מקומי, כדי לאפשר בדיקה וניפוי באגים מהירים יותר.

מידע נוסף על שימוש ספציפי ב-Apache Beam זמין במדריך Apache Beam בנושא הרצת צינורות עם תמונות קונטיינר בהתאמה אישית.

בדיקה בסיסית באמצעות PortableRunner

כדי לוודא שאפשר לשלוף קובצי אימג' של קונטיינרים מרחוק ולהריץ צינור פשוט, משתמשים ב-Apache Beam PortableRunner. כשמשתמשים ב-PortableRunner, שליחת העבודה מתבצעת בסביבה המקומית, וההרצה של DoFn מתבצעת בסביבת Docker.

כשמשתמשים במעבדי GPU, יכול להיות שלקונטיינר Docker לא תהיה גישה למעבדי ה-GPU. כדי לבדוק את הקונטיינר עם יחידות GPU, משתמשים בdirect runner ופועלים לפי השלבים לבדיקת קובץ אימג' של קונטיינר במכונה וירטואלית עצמאית עם יחידות GPU בקטע ניפוי באגים באמצעות מכונה וירטואלית עצמאית בדף 'שימוש ביחידות GPU'.

הפקודה הבאה מריצה צינור עיבוד נתונים לדוגמה:

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Go

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

מחליפים את מה שכתוב בשדות הבאים:

  • REGION: האזור של שירות העבודות שבו רוצים להשתמש, בפורמט של כתובת ויציאה. לדוגמה: localhost:3000. אפשר להשתמש ב-embed כדי להריץ שירות עבודות בתהליך.
  • IMAGE_URI: ה-URI של קובץ האימג' של הקונטיינר בהתאמה אישית.
  • INPUT_FILE: קובץ קלט שאפשר לקרוא כקובץ טקסט. קובץ זה חייב להיות נגיש על ידי רתמת ה-SDK
    קובץ אימג' של קונטיינר, או שהוא נטען מראש בקובץ אימג' של קונטיינר או שהוא קובץ מרוחק.
  • OUTPUT_FILE: נתיב לכתיבת הפלט. הנתיב הזה הוא נתיב מרוחק או נתיב מקומי במאגר.

כשהתהליך מסתיים בהצלחה, בודקים את יומני המסוף כדי לוודא שהתהליך הסתיים בהצלחה ושהתמונה המרוחקת שצוינה על ידי IMAGE_URI נמצאת בשימוש.

אחרי שמריצים את צינור העיבוד, הקבצים שנשמרו במאגר לא נמצאים במערכת הקבצים המקומית, והמאגר מופסק. אפשר להעתיק קבצים ממערכת הקבצים של קובץ המאגר שהופסק באמצעות docker cp.

אפשרות אחרת:

  • לספק פלטים למערכת קבצים מרוחקת כמו Cloud Storage. יכול להיות שתצטרכו להגדיר גישה באופן ידני למטרות בדיקה, כולל קובצי פרטי כניסה או Application Default Credentials.
  • כדי לבצע ניפוי באגים במהירות, מוסיפים רישום זמני ביומן.

שימוש ב-Direct Runner

כדי לבצע בדיקה מקומית מעמיקה יותר של קובץ אימג' של קונטיינר ושל צינור הנתונים, אפשר להשתמש ב-Direct Runner של Apache Beam.

אפשר לבדוק את צינור עיבוד הנתונים בנפרד מהקונטיינר בסביבה מקומית שתואמת לקובץ אימג' של קונטיינר, או להפעיל את צינור עיבוד הנתונים בקונטיינר פעיל.

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Go

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

מחליפים את IMAGE_URI במזהה המשאבים האחיד (URI) של קובץ אימג' של קונטיינר בהתאמה אישית.

בדוגמאות מניחים שכל קובצי הצינור, כולל הצינור עצמו, נמצאים במאגר המותאם אישית, הועברו ממערכת קבצים מקומית או שהם מרוחקים ונגישים ל-Apache Beam ולמאגר. לדוגמה, כדי להשתמש ב-Maven ‏ (mvn) כדי להריץ את דוגמת ה-Java הקודמת, צריך להכין את Maven ואת התלות שלו בקונטיינר. מידע נוסף זמין במאמרים בנושא אחסון וdocker run במאמרי העזרה של Docker.

המטרה של בדיקה ב-Direct Runner היא לבדוק את צינור עיבוד הנתונים בסביבת הקונטיינר המותאם אישית, ולא לבדוק את הפעלת הקונטיינר עם ENTRYPOINT ברירת המחדל שלו. משנים את ENTRYPOINT (לדוגמה, docker run --entrypoint ...) כדי להפעיל ישירות את צינור הנתונים או כדי לאפשר הפעלה ידנית של פקודות במאגר.

אם אתם מסתמכים על הגדרה ספציפית שמבוססת על הרצת הקונטיינר ב-Compute Engine, אתם יכולים להריץ את הקונטיינר ישירות במכונה וירטואלית של Compute Engine. מידע נוסף זמין במאמר בנושא Containers on Compute Engine.

הפעלת המשימה ב-Dataflow

כשמפעילים את צינור Apache Beam ב-Dataflow, מציינים את הנתיב לקובץ האימג' של הקונטיינר. אל תשתמשו בתג :latest עם תמונות מותאמות אישית. מתייגים את הגרסאות בתאריך או במזהה ייחודי. אם משהו משתבש, שימוש בסוג כזה של תג עשוי לאפשר לכם לחזור לגרסה קודמת של הרצת צינור העברת הנתונים להגדרה קודמת שעבדה, ולבדוק את השינויים.

Java

משתמשים ב---sdkContainerImage כדי לציין קובץ אימג' של קונטיינר SDK עבור זמן הריצה של Java.

כדי להפעיל את Runner v2, משתמשים ב---experiments=use_runner_v2.

Python

אם משתמשים בגרסת SDK‏ 2.30.0 ואילך, צריך להשתמש באפשרות הצינור --sdk_container_image כדי לציין קובץ אימג' של קונטיינר של SDK.

בגרסאות קודמות של ה-SDK, משתמשים באפשרות הצינור --worker_harness_container_image כדי לציין את המיקום של קובץ אימג' של קונטיינר שבו רוצים להשתמש עבור ה-worker harness.

מאגרי תגים בהתאמה אישית נתמכים רק ב-Dataflow Runner v2. אם מפעילים צינור Python לעיבוד נתונים באצווה, צריך להגדיר את הדגל --experiments=use_runner_v2. אם מפעילים צינור Python לסטרימינג, אין צורך לציין את הניסוי, כי צינורות Python לסטרימינג משתמשים ב-Runner v2 כברירת מחדל.

Go

אם משתמשים בגרסת SDK ‏2.40.0 ואילך, צריך להשתמש באפשרות הצינור --sdk_container_image כדי לציין קובץ אימג' של קונטיינר של SDK.

בגרסאות קודמות של ה-SDK, משתמשים באפשרות הצינור --worker_harness_container_image כדי לציין את המיקום של קובץ אימג' של קונטיינר שבו רוצים להשתמש עבור ה-worker harness.

מאגרי תגים בהתאמה אישית נתמכים בכל הגרסאות של Go SDK כי הם משתמשים ב-Dataflow Runner v2 כברירת מחדל.

בדוגמה הבאה מוסבר איך להפעיל את WordCount לדוגמה של עיבוד באצווה עם קונטיינר בהתאמה אישית.

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

שימוש ב-Apache Beam SDK ל-Python בגרסה 2.30.0 ואילך:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Go

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

מחליפים את מה שכתוב בשדות הבאים:

  • INPUT_FILE: נתיב הקלט של Cloud Storage שנקרא על ידי Dataflow כשמריצים את הדוגמה.
  • OUTPUT_FILE: נתיב הפלט של Cloud Storage שאליו נכתב צינור הנתונים לדוגמה. הקובץ הזה מכיל את ספירת המילים.
  • PROJECT_ID: המזהה של הפרויקט ב- Google Cloud.
  • REGION: האזור שבו רוצים לפרוס את משימת Dataflow.
  • TEMP_LOCATION: הנתיב ב-Cloud Storage שבו Dataflow יאחסן זמנית קבצים של משימות שנוצרו במהלך ההרצה של צינור העיבוד.
  • DISK_SIZE_GB: אופציונלי. אם הקונטיינר גדול, כדאי להגדיל את גודל דיסק האתחול שמוגדר כברירת מחדל כדי למנוע מצב של חוסר מקום בדיסק.
  • IMAGE_URI: ה-URI של קובץ אימג' של קונטיינר מותאם אישית של SDK. תמיד משתמשים ב-SHA או בתג של מאגר תגים עם גרסה. אל תשתמשו בתג :latest או בתג שניתן לשינוי.

סטרימינג של קובץ אימג' של קונטיינר

כדי לשפר את זמן האחזור של ההפעלה וההתאמה לעומס (autoscaling) של צינור Dataflow, אפשר להפעיל סטרימינג של תמונות. התכונה הזו שימושית אם המאגר המותאם אישית מכיל תוכן מיותר או אם לא נעשה שימוש בכל התוכן בכל שלב. לדוגמה, יכול להיות שהקונטיינר יכיל תוכן מקרי כמו קוד ספרייה מבוסס-CPU להסקת מסקנות מבוססת-GPU. באופן דומה, יכול להיות שיש לכם קונטיינר שמריץ צינורות ML עם כמה מודלים שמשתמשים רק במודל אחד בכל שלב, ולכן לא צריך את התוכן שלו בבת אחת. הפעלת סטרימינג של תמונות יכולה לעזור לשפר את זמן האחזור במקרים האלה.

Java

--dataflowServiceOptions=enable_image_streaming

Python

--dataflow_service_options=enable_image_streaming

Go

--dataflow_service_options=enable_image_streaming

הזרמת תמונות תאחזר חלקים מהקונטיינר המותאם אישית לפי הצורך של קוד צינור הנתונים, במקום להוריד את כל הקונטיינר מראש. אין צורך להוריד חלקים במאגר התגים שלא נעשה בהם שימוש.

כדי ליהנות מהזרמת תמונות, צריך להפעיל את Container File System API.