יצירת צינור עיבוד נתונים של Dataflow באמצעות Go
בדף הזה מוסבר איך להשתמש ב-Apache Beam SDK for Go כדי ליצור תוכנית שמגדירה צינור עיבוד נתונים. לאחר מכן מריצים את צינור הנתונים באופן מקומי ובשירות Dataflow. בסרטון איך משתמשים ב-WordCount ב-Apache Beam יש מבוא לצינור WordCount.
לפני שמתחילים
- נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
-
התקינו את ה-CLI של Google Cloud.
-
אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:
gcloud init -
יוצרים או בוחרים Google Cloud פרויקט.
תפקידים שנדרשים כדי לבחור או ליצור פרויקט
- Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
-
יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (
roles/resourcemanager.projectCreator), שכולל את ההרשאהresourcemanager.projects.create. איך מקצים תפקידים
-
יוצרים Google Cloud פרויקט:
gcloud projects create PROJECT_ID
מחליפים את
PROJECT_IDבשם של פרויקט Google Cloud שיוצרים. -
בוחרים את הפרויקט שיצרתם: Google Cloud
gcloud config set project PROJECT_ID
מחליפים את
PROJECT_IDבשם הפרויקט ב- Google Cloud .
מפעילים את ממשקי ה-API של Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON ו-Cloud Resource Manager:
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (
roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאהserviceusage.services.enable. איך מקצים תפקידיםgcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:
gcloud auth application-default login
אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט. -
USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה,myemail@example.com. -
ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
-
-
התקינו את ה-CLI של Google Cloud.
-
אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:
gcloud init -
יוצרים או בוחרים Google Cloud פרויקט.
תפקידים שנדרשים כדי לבחור או ליצור פרויקט
- Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
-
יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (
roles/resourcemanager.projectCreator), שכולל את ההרשאהresourcemanager.projects.create. איך מקצים תפקידים
-
יוצרים Google Cloud פרויקט:
gcloud projects create PROJECT_ID
מחליפים את
PROJECT_IDבשם של פרויקט Google Cloud שיוצרים. -
בוחרים את הפרויקט שיצרתם: Google Cloud
gcloud config set project PROJECT_ID
מחליפים את
PROJECT_IDבשם הפרויקט ב- Google Cloud .
מפעילים את ממשקי ה-API של Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON ו-Cloud Resource Manager:
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (
roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאהserviceusage.services.enable. איך מקצים תפקידיםgcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:
gcloud auth application-default login
אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט. -
USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה,myemail@example.com. -
ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
-
מקצים תפקידים לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- מחליפים את
PROJECT_IDבמזהה הפרויקט. - מחליפים את
PROJECT_NUMBERבמספר הפרויקט. כדי למצוא את מספר הפרויקט, אפשר לעיין במאמר בנושא זיהוי פרויקטים או להשתמש בפקודהgcloud projects describe. - מחליפים את
SERVICE_ACCOUNT_ROLEבכל אחד מהתפקידים.
-
יוצרים קטגוריה של Cloud Storage ומגדירים אותה כך:
-
מגדירים את סוג האחסון (storage class) לאפשרות הבאה:
S(Standard). -
מגדירים את מיקום האחסון לאזור הבא:
US(ארצות הברית). -
מחליפים את
BUCKET_NAMEבשם ייחודי לקטגוריה. שם הקטגוריה לא יכול להכיל מידע רגיש כי מרחב השמות של הקטגוריות זמין וגלוי לכולם.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
מגדירים את סוג האחסון (storage class) לאפשרות הבאה:
- מעתיקים את Google Cloud מזהה הפרויקט ואת שם הקטגוריה של Cloud Storage. תצטרכו את הערכים האלה בהמשך המדריך למתחילים הזה.
הגדרת סביבת הפיתוח
Apache Beam SDK הוא מודל תכנות בקוד פתוח לצינורות נתונים. מגדירים צינור עיבוד נתונים באמצעות תוכנית Apache Beam ואז בוחרים רץ (runner), כמו Dataflow, כדי להריץ את צינור עיבוד הנתונים.
מומלץ להשתמש בגרסה העדכנית של Go כשעובדים עם Apache Beam SDK for Go. אם לא מותקנת אצלכם הגרסה העדכנית של Go, אפשר להשתמש במדריך ההורדה וההתקנה של Go כדי להוריד ולהתקין את Go למערכת ההפעלה הספציפית שלכם.
כדי לוודא איזו גרסה של Go מותקנת אצלכם, מריצים את הפקודה הבאה במסוף המקומי:
go versionהרצת הדוגמה של ספירת מילים ב-Beam
Apache Beam SDK for Go כולל wordcount דוגמה לצינור.
בדוגמה wordcount מתבצעות הפעולות הבאות:
- קורא קובץ טקסט כקלט. כברירת מחדל, הפונקציה קוראת קובץ טקסט שנמצא בקטגוריה של Cloud Storage עם שם המשאב
gs://dataflow-samples/shakespeare/kinglear.txt. - מנתח כל שורה למילים.
- מבצעת ספירת תדירות של המילים שעברו טוקניזציה.
כדי להריץ את הגרסה העדכנית של הדוגמה של Beam wordcount במחשב המקומי, מבצעים את השלבים הבאים:
משתמשים בפקודה
git cloneכדי לשכפל את מאגר GitHubapache/beam:git clone https://github.com/apache/beam.gitעוברים לספרייה
beam/sdks/go:cd beam/sdks/goמשתמשים בפקודה הבאה כדי להריץ את צינור העיבוד:
go run examples/wordcount/wordcount.go \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsהדגל
inputמציין את הקובץ לקריאה, והדגלoutputמציין את שם הקובץ של פלט ספירת התדרים.
אחרי שהצינור יסיים את הפעולה, אפשר לראות את תוצאות הפלט:
more outputs*כדי לצאת, מקישים על q.
שינוי הקוד של צינור עיבוד הנתונים
צינור הנתונים של Beam wordcount מבחין בין מילים באותיות רישיות לבין מילים באותיות קטנות. בשלבים הבאים מוסבר איך ליצור מודול Go משלכם, לשנות את צינור wordcount כך שהוא לא יהיה תלוי באותיות רישיות, ולהריץ אותו ב-Dataflow.
יצירת מודול Go
כדי לבצע שינויים בקוד של צינור הנתונים, פועלים לפי השלבים הבאים.
יוצרים ספרייה למודול Go במיקום הרצוי:
mkdir wordcountcd wordcountיוצרים מודול Go. בדוגמה הזו, משתמשים ב-
example/dataflowכנתיב המודול.go mod init example/dataflowמורידים את העותק העדכני ביותר של קוד
wordcountממאגר Apache Beam ב-GitHub. מכניסים את הקובץ הזה לתיקייהwordcountשיצרתם.אם אתם משתמשים במערכת הפעלה שאינה Linux, אתם צריכים לקבל את חבילת
unixGo. החבילה הזו נדרשת להפעלת צינורות ב-Dataflow.go get -u golang.org/x/sys/unixמוודאים שקובץ
go.modתואם לקוד המקור של המודול:go mod tidy
הפעלת צינור עיבוד הנתונים ללא שינויים
מוודאים שצינור עיבוד הנתונים wordcount שלא שונה פועל באופן מקומי.
בטרמינל, יוצרים ומריצים את צינור עיבוד הנתונים באופן מקומי:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsצפייה בתוצאות הפלט:
more outputs*כדי לצאת, מקישים על q.
שינוי קוד הפייפליין
כדי לשנות את הצינור כך שלא יבחין בין אותיות רישיות לאותיות קטנות, משנים את הקוד כך שתחול הפונקציה strings.ToLower על כל המילים.
פותחים את הקובץ
wordcount.goבכלי עריכה לבחירתכם.בודקים את הבלוק
init(ההערות הוסרו לשם הבהרה):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }מוסיפים שורה חדשה כדי לרשום את הפונקציה
strings.ToLower:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }בודקים את הפונקציה
CountWords:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }כדי להפוך את המילים לאותיות קטנות, מוסיפים ParDo שמחיל את
strings.ToLowerעל כל מילה:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }שומרים את הקובץ.
הפעלת צינור הנתונים המעודכן באופן מקומי
מריצים את צינור העדכון wordcount באופן מקומי ומוודאים שהפלט השתנה.
מפתחים ומריצים את צינור עיבוד הנתונים
wordcountששיניתם:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsצפייה בתוצאות הפלט של צינור העיבוד שעבר שינוי. כל המילים צריכות להיות באותיות קטנות.
more outputs*כדי לצאת, מקישים על q.
הפעלת צינור העיבוד בשירות Dataflow
כדי להריץ את הדוגמה המעודכנת של wordcount בשירות Dataflow, משתמשים בפקודה הבאה:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/מחליפים את מה שכתוב בשדות הבאים:
BUCKET_NAME: שם הקטגוריה של Cloud Storage.
PROJECT_ID: מזהה הפרויקט ב- Google Cloud .
DATAFLOW_REGION: האזור שבו רוצים לפרוס את משימת Dataflow. לדוגמה,europe-west1. רשימת המיקומים הזמינים מופיעה במאמר בנושא מיקומי Dataflow. הדגל--regionמבטל את ברירת המחדל של האזור שמוגדר בשרת המטא-נתונים, בלקוח המקומי או במשתני הסביבה.
צפייה בתוצאות
אפשר לראות רשימה של משימות Dataflow בGoogle Cloud מסוף. נכנסים לדף Jobs של Dataflow במסוף Google Cloud .
בדף משימות מוצגים פרטים על משימת wordcount, כולל סטטוס. בהתחלה הסטטוס יהיה פועל, ואז הוא ישתנה להושלם.
כשמריצים צינור באמצעות Dataflow, התוצאות מאוחסנות בקטגוריה של Cloud Storage. אפשר לראות את תוצאות הפלט באמצעותGoogle Cloud המסוף או הטרמינל המקומי.
המסוף
כדי לראות את התוצאות במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.
ברשימת הקטגוריות בפרויקט, לוחצים על קטגוריית האחסון שיצרתם קודם. קבצי הפלט שנוצרו על ידי העבודה מוצגים בספרייה results.
טרמינל
אפשר לראות את התוצאות בטרמינל או באמצעות Cloud Shell.
כדי לראות את רשימת קובצי הפלט, משתמשים בפקודה
gcloud storage ls:gcloud storage ls gs://BUCKET_NAME/results/outputs* --longמחליפים את
BUCKET_NAMEבשם של קטגוריית הפלט שצוינה ב-Cloud Storage.כדי לראות את התוצאות בקובצי הפלט, משתמשים בפקודה
gcloud storage cat:gcloud storage cat gs://BUCKET_NAME/results/outputs*
הסרת המשאבים
כדי לא לצבור חיובים בחשבון על המשאבים שבהם השתמשתם בדף הזה, אתם צריכים למחוק את הפרויקט יחד עם המשאבים. Google Cloud Google Cloud
- במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.
- לוחצים על תיבת הסימון של הקטגוריה שרוצים למחוק.
- כדי למחוק את הקטגוריה, לוחצים על Delete ופועלים לפי ההוראות.
אם אתם משאירים את הפרויקט, אתם צריכים לבטל את התפקידים שהקציתם לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
אם תרצו, תוכלו לבטל את פרטי הכניסה שיצרתם ולמחוק את הקובץ המקומי של פרטי הכניסה.
gcloud auth application-default revoke
-
אם רוצים, מבטלים את פרטי הכניסה של ה-CLI של gcloud.
gcloud auth revoke