יצירת צינור עיבוד נתונים של Dataflow באמצעות Go

בדף הזה מוסבר איך להשתמש ב-Apache Beam SDK for Go כדי ליצור תוכנית שמגדירה צינור עיבוד נתונים. לאחר מכן מריצים את צינור הנתונים באופן מקומי ובשירות Dataflow. בסרטון איך משתמשים ב-WordCount ב-Apache Beam יש מבוא לצינור WordCount.

לפני שמתחילים

  1. נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
  2. התקינו את ה-CLI של Google Cloud.

  3. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  4. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  5. יוצרים או בוחרים Google Cloud פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Google Cloud פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Google Cloud שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Google Cloud

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Google Cloud .

  6. מוודאים שהחיוב מופעל בפרויקט Google Cloud .

  7. מפעילים את ממשקי ה-API של Dataflow,‏ Compute Engine,‏ Cloud Logging,‏ Cloud Storage,‏ Google Cloud Storage JSON ו-Cloud Resource Manager:

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  8. יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:

    gcloud auth application-default login

    אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  9. מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה, myemail@example.com.
    • ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
  10. התקינו את ה-CLI של Google Cloud.

  11. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  12. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  13. יוצרים או בוחרים Google Cloud פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Google Cloud פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Google Cloud שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Google Cloud

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Google Cloud .

  14. מוודאים שהחיוב מופעל בפרויקט Google Cloud .

  15. מפעילים את ממשקי ה-API של Dataflow,‏ Compute Engine,‏ Cloud Logging,‏ Cloud Storage,‏ Google Cloud Storage JSON ו-Cloud Resource Manager:

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  16. יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:

    gcloud auth application-default login

    אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  17. מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה, myemail@example.com.
    • ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
  18. מקצים תפקידים לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • מחליפים את PROJECT_ID במזהה הפרויקט.
    • מחליפים את PROJECT_NUMBER במספר הפרויקט. כדי למצוא את מספר הפרויקט, אפשר לעיין במאמר בנושא זיהוי פרויקטים או להשתמש בפקודה gcloud projects describe.
    • מחליפים את SERVICE_ACCOUNT_ROLE בכל אחד מהתפקידים.
  19. יוצרים קטגוריה של Cloud Storage ומגדירים אותה כך:
    • מגדירים את סוג האחסון (storage class) לאפשרות הבאה: S (Standard).
    • מגדירים את מיקום האחסון לאזור הבא: US (ארצות הברית).
    • מחליפים את BUCKET_NAME בשם ייחודי לקטגוריה. שם הקטגוריה לא יכול להכיל מידע רגיש כי מרחב השמות של הקטגוריות זמין וגלוי לכולם.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  20. מעתיקים את Google Cloud מזהה הפרויקט ואת שם הקטגוריה של Cloud Storage. תצטרכו את הערכים האלה בהמשך המדריך למתחילים הזה.

הגדרת סביבת הפיתוח

‫Apache Beam SDK הוא מודל תכנות בקוד פתוח לצינורות נתונים. מגדירים צינור עיבוד נתונים באמצעות תוכנית Apache Beam ואז בוחרים רץ (runner), כמו Dataflow, כדי להריץ את צינור עיבוד הנתונים.

מומלץ להשתמש בגרסה העדכנית של Go כשעובדים עם Apache Beam SDK for Go. אם לא מותקנת אצלכם הגרסה העדכנית של Go, אפשר להשתמש במדריך ההורדה וההתקנה של Go כדי להוריד ולהתקין את Go למערכת ההפעלה הספציפית שלכם.

כדי לוודא איזו גרסה של Go מותקנת אצלכם, מריצים את הפקודה הבאה במסוף המקומי:

go version

הרצת הדוגמה של ספירת מילים ב-Beam

‫Apache Beam SDK for Go כולל wordcount דוגמה לצינור. בדוגמה wordcount מתבצעות הפעולות הבאות:

  1. קורא קובץ טקסט כקלט. כברירת מחדל, הפונקציה קוראת קובץ טקסט שנמצא בקטגוריה של Cloud Storage עם שם המשאב gs://dataflow-samples/shakespeare/kinglear.txt.
  2. מנתח כל שורה למילים.
  3. מבצעת ספירת תדירות של המילים שעברו טוקניזציה.

כדי להריץ את הגרסה העדכנית של הדוגמה של Beam wordcount במחשב המקומי, מבצעים את השלבים הבאים:

  1. משתמשים בפקודה git clone כדי לשכפל את מאגר GitHub‏ apache/beam:

    git clone https://github.com/apache/beam.git
  2. עוברים לספרייה beam/sdks/go:

    cd beam/sdks/go
  3. משתמשים בפקודה הבאה כדי להריץ את צינור העיבוד:

    go run examples/wordcount/wordcount.go \
      --input gs://dataflow-samples/shakespeare/kinglear.txt \
      --output outputs

    הדגל input מציין את הקובץ לקריאה, והדגל output מציין את שם הקובץ של פלט ספירת התדרים.

אחרי שהצינור יסיים את הפעולה, אפשר לראות את תוצאות הפלט:

more outputs*

כדי לצאת, מקישים על q.

שינוי הקוד של צינור עיבוד הנתונים

צינור הנתונים של Beam wordcount מבחין בין מילים באותיות רישיות לבין מילים באותיות קטנות. בשלבים הבאים מוסבר איך ליצור מודול Go משלכם, לשנות את צינור wordcount כך שהוא לא יהיה תלוי באותיות רישיות, ולהריץ אותו ב-Dataflow.

יצירת מודול Go

כדי לבצע שינויים בקוד של צינור הנתונים, פועלים לפי השלבים הבאים.

  1. יוצרים ספרייה למודול Go במיקום הרצוי:

    mkdir wordcount
    cd wordcount
  2. יוצרים מודול Go. בדוגמה הזו, משתמשים ב-example/dataflow כנתיב המודול.

    go mod init example/dataflow
  3. מורידים את העותק העדכני ביותר של קוד wordcount ממאגר Apache Beam ב-GitHub. מכניסים את הקובץ הזה לתיקייה wordcount שיצרתם.

  4. אם אתם משתמשים במערכת הפעלה שאינה Linux, אתם צריכים לקבל את חבילת unix Go. החבילה הזו נדרשת להפעלת צינורות ב-Dataflow.

    go get -u golang.org/x/sys/unix
  5. מוודאים שקובץ go.mod תואם לקוד המקור של המודול:

    go mod tidy

הפעלת צינור עיבוד הנתונים ללא שינויים

מוודאים שצינור עיבוד הנתונים wordcount שלא שונה פועל באופן מקומי.

  1. בטרמינל, יוצרים ומריצים את צינור עיבוד הנתונים באופן מקומי:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
  2. צפייה בתוצאות הפלט:

     more outputs*
  3. כדי לצאת, מקישים על q.

שינוי קוד הפייפליין

כדי לשנות את הצינור כך שלא יבחין בין אותיות רישיות לאותיות קטנות, משנים את הקוד כך שתחול הפונקציה strings.ToLower על כל המילים.

  1. פותחים את הקובץ wordcount.go בכלי עריכה לבחירתכם.

  2. בודקים את הבלוק init (ההערות הוסרו לשם הבהרה):

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. מוסיפים שורה חדשה כדי לרשום את הפונקציה strings.ToLower:

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. בודקים את הפונקציה CountWords:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. כדי להפוך את המילים לאותיות קטנות, מוסיפים ParDo שמחיל את strings.ToLower על כל מילה:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. שומרים את הקובץ.

הפעלת צינור הנתונים המעודכן באופן מקומי

מריצים את צינור העדכון wordcount באופן מקומי ומוודאים שהפלט השתנה.

  1. מפתחים ומריצים את צינור עיבוד הנתונים wordcount ששיניתם:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
  2. צפייה בתוצאות הפלט של צינור העיבוד שעבר שינוי. כל המילים צריכות להיות באותיות קטנות.

     more outputs*
  3. כדי לצאת, מקישים על q.

הפעלת צינור העיבוד בשירות Dataflow

כדי להריץ את הדוגמה המעודכנת של wordcount בשירות Dataflow, משתמשים בפקודה הבאה:

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

מחליפים את מה שכתוב בשדות הבאים:

  • BUCKET_NAME: שם הקטגוריה של Cloud Storage.

  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud .

  • DATAFLOW_REGION: האזור שבו רוצים לפרוס את משימת Dataflow. לדוגמה, europe-west1. רשימת המיקומים הזמינים מופיעה במאמר בנושא מיקומי Dataflow. הדגל --region מבטל את ברירת המחדל של האזור שמוגדר בשרת המטא-נתונים, בלקוח המקומי או במשתני הסביבה.

צפייה בתוצאות

אפשר לראות רשימה של משימות Dataflow בGoogle Cloud מסוף. נכנסים לדף Jobs של Dataflow במסוף Google Cloud .

מעבר אל Jobs

בדף משימות מוצגים פרטים על משימת wordcount, כולל סטטוס. בהתחלה הסטטוס יהיה פועל, ואז הוא ישתנה להושלם.

כשמריצים צינור באמצעות Dataflow, התוצאות מאוחסנות בקטגוריה של Cloud Storage. אפשר לראות את תוצאות הפלט באמצעותGoogle Cloud המסוף או הטרמינל המקומי.

המסוף

כדי לראות את התוצאות במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.

כניסה לדף Buckets

ברשימת הקטגוריות בפרויקט, לוחצים על קטגוריית האחסון שיצרתם קודם. קבצי הפלט שנוצרו על ידי העבודה מוצגים בספרייה results.

טרמינל

אפשר לראות את התוצאות בטרמינל או באמצעות Cloud Shell.

  1. כדי לראות את רשימת קובצי הפלט, משתמשים בפקודה gcloud storage ls:

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long

    מחליפים את BUCKET_NAME בשם של קטגוריית הפלט שצוינה ב-Cloud Storage.

  2. כדי לראות את התוצאות בקובצי הפלט, משתמשים בפקודה gcloud storage cat:

    gcloud storage cat gs://BUCKET_NAME/results/outputs*

הסרת המשאבים

כדי לא לצבור חיובים בחשבון על המשאבים שבהם השתמשתם בדף הזה, אתם צריכים למחוק את הפרויקט יחד עם המשאבים. Google Cloud Google Cloud

  1. במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.

    כניסה לדף Buckets

  2. לוחצים על תיבת הסימון של הקטגוריה שרוצים למחוק.
  3. כדי למחוק את הקטגוריה, לוחצים על Delete ופועלים לפי ההוראות.
  4. אם אתם משאירים את הפרויקט, אתם צריכים לבטל את התפקידים שהקציתם לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. אם תרצו, תוכלו לבטל את פרטי הכניסה שיצרתם ולמחוק את הקובץ המקומי של פרטי הכניסה.

    gcloud auth application-default revoke
  6. אם רוצים, מבטלים את פרטי הכניסה של ה-CLI של gcloud.

    gcloud auth revoke

המאמרים הבאים