יצירת צינור עיבוד נתונים של Dataflow באמצעות Python

במאמר הזה מוסבר איך להשתמש ב-Apache Beam SDK for Python כדי ליצור תוכנית שמגדירה צינור עיבוד נתונים. לאחר מכן מריצים את צינור העיבוד באמצעות רץ מקומי ישיר או רץ מבוסס-ענן כמו Dataflow. בסרטון איך משתמשים ב-WordCount ב-Apache Beam יש מבוא לצינור WordCount.


לחצו על תראו לי איך כדי לקרוא הסבר מפורט על המשימה ישירות במסוף Google Cloud :

תראו לי איך


לפני שמתחילים

  1. נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
  2. התקינו את ה-CLI של Google Cloud.

  3. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  4. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  5. יוצרים או בוחרים Google Cloud פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Google Cloud פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Google Cloud שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Google Cloud

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Google Cloud .

  6. מוודאים שהחיוב מופעל בפרויקט Google Cloud .

  7. מפעילים את ממשקי ה-API של Dataflow,‏ Compute Engine,‏ Cloud Logging,‏ Cloud Storage,‏ Google Cloud Storage JSON,‏ BigQuery,‏ Cloud Pub/Sub,‏ Cloud Datastore ו-Cloud Resource Manager:

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  8. יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:

    gcloud auth application-default login

    אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  9. מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה, myemail@example.com.
    • ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
  10. התקינו את ה-CLI של Google Cloud.

  11. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  12. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  13. יוצרים או בוחרים Google Cloud פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Google Cloud פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Google Cloud שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Google Cloud

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Google Cloud .

  14. מוודאים שהחיוב מופעל בפרויקט Google Cloud .

  15. מפעילים את ממשקי ה-API של Dataflow,‏ Compute Engine,‏ Cloud Logging,‏ Cloud Storage,‏ Google Cloud Storage JSON,‏ BigQuery,‏ Cloud Pub/Sub,‏ Cloud Datastore ו-Cloud Resource Manager:

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  16. יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:

    gcloud auth application-default login

    אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  17. מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה, myemail@example.com.
    • ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
  18. מקצים תפקידים לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • מחליפים את PROJECT_ID במזהה הפרויקט.
    • מחליפים את PROJECT_NUMBER במספר הפרויקט. כדי למצוא את מספר הפרויקט, אפשר לעיין במאמר בנושא זיהוי פרויקטים או להשתמש בפקודה gcloud projects describe.
    • מחליפים את SERVICE_ACCOUNT_ROLE בכל אחד מהתפקידים.
  19. יוצרים קטגוריה של Cloud Storage ומגדירים אותה כך:
    • מגדירים את סוג האחסון (storage class) לאפשרות הבאה: S (Standard).
    • מגדירים את מיקום האחסון לאזור הבא: US (ארצות הברית).
    • מחליפים את BUCKET_NAME בשם ייחודי לקטגוריה. שם הקטגוריה לא יכול להכיל מידע רגיש כי מרחב השמות של הקטגוריות זמין וגלוי לכולם.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  20. מעתיקים את Google Cloud מזהה הפרויקט ואת שם הקטגוריה של Cloud Storage. תצטרכו את הערכים האלה בהמשך המאמר.

מגדירים את הסביבה

בקטע הזה, משתמשים בשורת הפקודה כדי להגדיר סביבה וירטואלית מבודדת של Python להרצת פרויקט צינור הנתונים באמצעות venv. התהליך הזה מאפשר לבודד את יחסי התלות של פרויקט אחד מיחסי התלות של פרויקטים אחרים.

אם אין לכם שורת פקודה זמינה, אתם יכולים להשתמש ב-Cloud Shell. מנהל החבילות של Python 3 כבר מותקן ב-Cloud Shell, כך שאפשר לדלג לשלב של יצירת סביבה וירטואלית.

כדי להתקין את Python ואז ליצור סביבה וירטואלית:

  1. בודקים ש-Python 3 ו-pip פועלים במערכת:
    python --version
    python -m pip --version
  2. אם נדרש, מתקינים את Python 3 ואז מגדירים סביבה וירטואלית של Python: פועלים לפי ההוראות שבקטעים התקנת Python והגדרת venv בדף הגדרת סביבת פיתוח של Python.

אחרי שמסיימים את המדריך למתחילים, אפשר להשבית את הסביבה הווירטואלית על ידי הפעלת הפקודה deactivate.

הורדת Apache Beam SDK

‫Apache Beam SDK הוא מודל תכנות בקוד פתוח לצינורות נתונים. אתם מגדירים צינור עיבוד נתונים באמצעות תוכנית Apache Beam ואז בוחרים רץ, כמו Dataflow, כדי להריץ את צינור עיבוד הנתונים.

כדי להוריד ולהתקין את Apache Beam SDK, מבצעים את השלבים הבאים:

  1. מוודאים שאתם בסביבה הווירטואלית של Python שיצרתם בקטע הקודם. מוודאים שההנחיה מתחילה ב-<env_name>, כאשר env_name הוא השם של הסביבה הווירטואלית.
  2. מתקינים את הגרסה האחרונה של Apache Beam SDK ל-Python:
  3. pip install apache-beam[gcp]

הפעלת צינור עיבוד הנתונים באופן מקומי

כדי לראות איך צינור פועל באופן מקומי, משתמשים במודול Python מוכן מראש לדוגמה wordcount שכלולה בחבילה apache_beam.

בדוגמה לצינור העיבוד wordcount מתבצעות הפעולות הבאות:

  1. מקבל קובץ טקסט כקלט.

    קובץ הטקסט הזה נמצא בקטגוריה של Cloud Storage עם שם המשאב gs://dataflow-samples/shakespeare/kinglear.txt.

  2. מנתח כל שורה למילים.
  3. מבצעת ספירת תדירות של המילים שעברו טוקניזציה.

כדי להכין את צינור הנתונים של wordcount באופן מקומי:

  1. בטרמינל המקומי, מריצים את הדוגמה wordcount:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. צופים בפלט של צינור עיבוד הנתונים:
    more outputs*
  3. כדי לצאת, מקישים על q.
הפעלת צינור העיבוד באופן מקומי מאפשרת לכם לבדוק ולנפות באגים בתוכנית Apache Beam. אפשר לראות את קוד המקור של wordcount.py ב-Apache Beam GitHub.

הפעלת צינור העיבוד בשירות Dataflow

בקטע הזה, מריצים את צינור העיבוד לדוגמה wordcount מחבילת apache_beam בשירות Dataflow. בדוגמה הזו, הפרמטר שמוגדר ל---runner הוא DataflowRunner.
  • מריצים את הפייפליין:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    מחליפים את מה שכתוב בשדות הבאים:

    • DATAFLOW_REGION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: europe-west1

      הדגל --region מבטל את האזור שמוגדר כברירת מחדל בשרת המטא-נתונים, בלקוח המקומי או במשתני הסביבה.

    • BUCKET_NAME: שם הקטגוריה ב-Cloud Storage שהעתקתם קודם
    • PROJECT_ID: Google Cloud מזהה הפרויקט שהעתקתם קודם

צפייה בתוצאות

כשמריצים צינור באמצעות Dataflow, התוצאות נשמרות בקטגוריה של Cloud Storage. בקטע הזה, מוודאים שהצינור פועל באמצעות מסוף Google Cloud או מסוף מקומי.

מסוףGoogle Cloud

כדי לראות את התוצאות במסוף Google Cloud , פועלים לפי השלבים הבאים:

  1. נכנסים לדף Jobs ב-Dataflow במסוף Google Cloud .

    מעבר אל Jobs

    בדף משימות מוצגים פרטים על משימת wordcount, כולל סטטוס פועל בהתחלה, ואז הושלם.

  2. נכנסים לדף Buckets של Cloud Storage.

    כניסה לדף Buckets

  3. ברשימת הקטגוריות בפרויקט, לוחצים על קטגוריית האחסון שיצרתם קודם.

    בספרייה wordcount מוצגים קובצי הפלט שנוצרו על ידי העבודה.

מסוף מקומי

אפשר לראות את התוצאות בטרמינל או באמצעות Cloud Shell.

  1. כדי לראות את רשימת קובצי הפלט, משתמשים בפקודה gcloud storage ls:
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. מחליפים את BUCKET_NAME בשם של קטגוריית Cloud Storage שבה נעשה שימוש בתוכנית של צינור העיבוד.

  3. כדי לראות את התוצאות בקובצי הפלט, משתמשים בפקודה gcloud storage cat:
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

שינוי הקוד של צינור עיבוד הנתונים

בצינור העיבוד בדוגמאות הקודמות יש הבחנה בין מילים באותיות רישיות לבין מילים באותיות קטנות.wordcount בשלבים הבאים מוסבר איך לשנות את צינור הנתונים כך שצינור הנתונים wordcount לא יהיה תלוי באותיות רישיות.
  1. במחשב המקומי, מורידים את העותק העדכני של הקוד wordcount ממאגר Apache Beam ב-GitHub.
  2. בטרמינל המקומי, מריצים את צינור העיבוד:
    python wordcount.py --output outputs
  3. צפייה בתוצאות:
    more outputs*
  4. כדי לצאת, מקישים על q.
  5. פותחים את הקובץ wordcount.py בכלי עריכה לבחירתכם.
  6. בתוך הפונקציה run, בודקים את השלבים בצינור עיבוד הנתונים:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    אחרי split, השורות מפוצלות למילים כמחרוזות.

  7. כדי להפוך את המחרוזות לאותיות קטנות, משנים את השורה אחרי split:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    השינוי הזה ממפה את הפונקציה str.lower לכל מילה. השורה הזו שוות ערך ל-beam.Map(lambda word: str.lower(word)).
  8. שומרים את הקובץ ומריצים את משימת wordcount ששיניתם:
    python wordcount.py --output outputs
  9. צפייה בתוצאות של צינור העברת הנתונים ששונה:
    more outputs*
  10. כדי לצאת, מקישים על q.
  11. מריצים את הפייפליין ששיניתם בשירות Dataflow:
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    מחליפים את מה שכתוב בשדות הבאים:

    • DATAFLOW_REGION: האזור שבו רוצים לפרוס את משימת Dataflow
    • BUCKET_NAME: שם הקטגוריה שלכם ב-Cloud Storage
    • PROJECT_ID: מזהה הפרויקט ב- Google Cloud

הסרת המשאבים

כדי לא לצבור חיובים בחשבון על המשאבים שבהם השתמשתם בדף הזה, אתם צריכים למחוק את הפרויקט יחד עם המשאבים. Google Cloud Google Cloud

  1. במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.

    כניסה לדף Buckets

  2. לוחצים על תיבת הסימון של הקטגוריה שרוצים למחוק.
  3. כדי למחוק את הקטגוריה, לוחצים על Delete ופועלים לפי ההוראות.
  4. אם אתם משאירים את הפרויקט, אתם צריכים לבטל את התפקידים שהקציתם לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. אם תרצו, תוכלו לבטל את פרטי הכניסה שיצרתם ולמחוק את הקובץ המקומי של פרטי הכניסה.

    gcloud auth application-default revoke
  6. אם רוצים, מבטלים את פרטי הכניסה של ה-CLI של gcloud.

    gcloud auth revoke

המאמרים הבאים