יצירת צינור עיבוד נתונים של Dataflow באמצעות Python
במאמר הזה מוסבר איך להשתמש ב-Apache Beam SDK for Python כדי ליצור תוכנית שמגדירה צינור עיבוד נתונים. לאחר מכן מריצים את צינור העיבוד באמצעות רץ מקומי ישיר או רץ מבוסס-ענן כמו Dataflow. בסרטון איך משתמשים ב-WordCount ב-Apache Beam יש מבוא לצינור WordCount.
לחצו על תראו לי איך כדי לקרוא הסבר מפורט על המשימה ישירות במסוף Google Cloud :
לפני שמתחילים
- נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
-
התקינו את ה-CLI של Google Cloud.
-
אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:
gcloud init -
יוצרים או בוחרים Google Cloud פרויקט.
תפקידים שנדרשים כדי לבחור או ליצור פרויקט
- Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
-
יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (
roles/resourcemanager.projectCreator), שכולל את ההרשאהresourcemanager.projects.create. איך מקצים תפקידים
-
יוצרים Google Cloud פרויקט:
gcloud projects create PROJECT_ID
מחליפים את
PROJECT_IDבשם של פרויקט Google Cloud שיוצרים. -
בוחרים את הפרויקט שיצרתם: Google Cloud
gcloud config set project PROJECT_ID
מחליפים את
PROJECT_IDבשם הפרויקט ב- Google Cloud .
מפעילים את ממשקי ה-API של Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore ו-Cloud Resource Manager:
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (
roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאהserviceusage.services.enable. איך מקצים תפקידיםgcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:
gcloud auth application-default login
אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט. -
USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה,myemail@example.com. -
ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
-
-
התקינו את ה-CLI של Google Cloud.
-
אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:
gcloud init -
יוצרים או בוחרים Google Cloud פרויקט.
תפקידים שנדרשים כדי לבחור או ליצור פרויקט
- Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
-
יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (
roles/resourcemanager.projectCreator), שכולל את ההרשאהresourcemanager.projects.create. איך מקצים תפקידים
-
יוצרים Google Cloud פרויקט:
gcloud projects create PROJECT_ID
מחליפים את
PROJECT_IDבשם של פרויקט Google Cloud שיוצרים. -
בוחרים את הפרויקט שיצרתם: Google Cloud
gcloud config set project PROJECT_ID
מחליפים את
PROJECT_IDבשם הפרויקט ב- Google Cloud .
מפעילים את ממשקי ה-API של Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore ו-Cloud Resource Manager:
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (
roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאהserviceusage.services.enable. איך מקצים תפקידיםgcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:
gcloud auth application-default login
אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט. -
USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה,myemail@example.com. -
ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
-
מקצים תפקידים לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- מחליפים את
PROJECT_IDבמזהה הפרויקט. - מחליפים את
PROJECT_NUMBERבמספר הפרויקט. כדי למצוא את מספר הפרויקט, אפשר לעיין במאמר בנושא זיהוי פרויקטים או להשתמש בפקודהgcloud projects describe. - מחליפים את
SERVICE_ACCOUNT_ROLEבכל אחד מהתפקידים.
-
יוצרים קטגוריה של Cloud Storage ומגדירים אותה כך:
-
מגדירים את סוג האחסון (storage class) לאפשרות הבאה:
S(Standard). -
מגדירים את מיקום האחסון לאזור הבא:
US(ארצות הברית). -
מחליפים את
BUCKET_NAMEבשם ייחודי לקטגוריה. שם הקטגוריה לא יכול להכיל מידע רגיש כי מרחב השמות של הקטגוריות זמין וגלוי לכולם.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
מגדירים את סוג האחסון (storage class) לאפשרות הבאה:
- מעתיקים את Google Cloud מזהה הפרויקט ואת שם הקטגוריה של Cloud Storage. תצטרכו את הערכים האלה בהמשך המאמר.
מגדירים את הסביבה
בקטע הזה, משתמשים בשורת הפקודה כדי להגדיר סביבה וירטואלית מבודדת של Python להרצת פרויקט צינור הנתונים באמצעות venv. התהליך הזה מאפשר לבודד את יחסי התלות של פרויקט אחד מיחסי התלות של פרויקטים אחרים.
אם אין לכם שורת פקודה זמינה, אתם יכולים להשתמש ב-Cloud Shell. מנהל החבילות של Python 3 כבר מותקן ב-Cloud Shell, כך שאפשר לדלג לשלב של יצירת סביבה וירטואלית.
כדי להתקין את Python ואז ליצור סביבה וירטואלית:
- בודקים ש-Python 3 ו-
pipפועלים במערכת:python --version python -m pip --version
- אם נדרש, מתקינים את Python 3 ואז מגדירים סביבה וירטואלית של Python: פועלים לפי ההוראות שבקטעים התקנת Python והגדרת venv בדף הגדרת סביבת פיתוח של Python.
אחרי שמסיימים את המדריך למתחילים, אפשר להשבית את הסביבה הווירטואלית על ידי הפעלת הפקודה deactivate.
הורדת Apache Beam SDK
Apache Beam SDK הוא מודל תכנות בקוד פתוח לצינורות נתונים. אתם מגדירים צינור עיבוד נתונים באמצעות תוכנית Apache Beam ואז בוחרים רץ, כמו Dataflow, כדי להריץ את צינור עיבוד הנתונים.
כדי להוריד ולהתקין את Apache Beam SDK, מבצעים את השלבים הבאים:
- מוודאים שאתם בסביבה הווירטואלית של Python שיצרתם בקטע הקודם.
מוודאים שההנחיה מתחילה ב-
<env_name>, כאשרenv_nameהוא השם של הסביבה הווירטואלית. - מתקינים את הגרסה האחרונה של Apache Beam SDK ל-Python:
pip install apache-beam[gcp]
הפעלת צינור עיבוד הנתונים באופן מקומי
כדי לראות איך צינור פועל באופן מקומי, משתמשים במודול Python מוכן מראש לדוגמה wordcount שכלולה בחבילה apache_beam.
בדוגמה לצינור העיבוד wordcount מתבצעות הפעולות הבאות:
מקבל קובץ טקסט כקלט.
קובץ הטקסט הזה נמצא בקטגוריה של Cloud Storage עם שם המשאב
gs://dataflow-samples/shakespeare/kinglear.txt.- מנתח כל שורה למילים.
- מבצעת ספירת תדירות של המילים שעברו טוקניזציה.
כדי להכין את צינור הנתונים של wordcount באופן מקומי:
- בטרמינל המקומי, מריצים את הדוגמה
wordcount:python -m apache_beam.examples.wordcount \ --output outputs
- צופים בפלט של צינור עיבוד הנתונים:
more outputs* - כדי לצאת, מקישים על q.
wordcount.py ב-Apache Beam GitHub.
הפעלת צינור העיבוד בשירות Dataflow
בקטע הזה, מריצים את צינור העיבוד לדוגמהwordcount מחבילת apache_beam בשירות Dataflow. בדוגמה הזו, הפרמטר שמוגדר ל---runner הוא DataflowRunner.
- מריצים את הפייפליין:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
מחליפים את מה שכתוב בשדות הבאים:
-
DATAFLOW_REGION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה:europe-west1הדגל
--regionמבטל את האזור שמוגדר כברירת מחדל בשרת המטא-נתונים, בלקוח המקומי או במשתני הסביבה. -
BUCKET_NAME: שם הקטגוריה ב-Cloud Storage שהעתקתם קודם -
PROJECT_ID: Google Cloud מזהה הפרויקט שהעתקתם קודם
-
צפייה בתוצאות
כשמריצים צינור באמצעות Dataflow, התוצאות נשמרות בקטגוריה של Cloud Storage. בקטע הזה, מוודאים שהצינור פועל באמצעות מסוף Google Cloud או מסוף מקומי.
מסוףGoogle Cloud
כדי לראות את התוצאות במסוף Google Cloud , פועלים לפי השלבים הבאים:
- נכנסים לדף Jobs ב-Dataflow במסוף Google Cloud .
בדף משימות מוצגים פרטים על משימת
wordcount, כולל סטטוס פועל בהתחלה, ואז הושלם. - נכנסים לדף Buckets של Cloud Storage.
ברשימת הקטגוריות בפרויקט, לוחצים על קטגוריית האחסון שיצרתם קודם.
בספרייה
wordcountמוצגים קובצי הפלט שנוצרו על ידי העבודה.
מסוף מקומי
אפשר לראות את התוצאות בטרמינל או באמצעות Cloud Shell.
- כדי לראות את רשימת קובצי הפלט, משתמשים בפקודה
gcloud storage ls:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- כדי לראות את התוצאות בקובצי הפלט, משתמשים בפקודה
gcloud storage cat:gcloud storage cat gs://BUCKET_NAME/results/outputs*
מחליפים את BUCKET_NAME בשם של קטגוריית Cloud Storage שבה נעשה שימוש בתוכנית של צינור העיבוד.
שינוי הקוד של צינור עיבוד הנתונים
בצינור העיבוד בדוגמאות הקודמות יש הבחנה בין מילים באותיות רישיות לבין מילים באותיות קטנות.wordcount
בשלבים הבאים מוסבר איך לשנות את צינור הנתונים כך שצינור הנתונים wordcount לא יהיה תלוי באותיות רישיות.
- במחשב המקומי, מורידים את העותק העדכני של הקוד
wordcountממאגר Apache Beam ב-GitHub. - בטרמינל המקומי, מריצים את צינור העיבוד:
python wordcount.py --output outputs
- צפייה בתוצאות:
more outputs* - כדי לצאת, מקישים על q.
- פותחים את הקובץ
wordcount.pyבכלי עריכה לבחירתכם. - בתוך הפונקציה
run, בודקים את השלבים בצינור עיבוד הנתונים:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
אחרי
split, השורות מפוצלות למילים כמחרוזות. - כדי להפוך את המחרוזות לאותיות קטנות, משנים את השורה אחרי
split: השינוי הזה ממפה את הפונקציהcounts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lowerלכל מילה. השורה הזו שוות ערך ל-beam.Map(lambda word: str.lower(word)). - שומרים את הקובץ ומריצים את משימת
wordcountששיניתם:python wordcount.py --output outputs
- צפייה בתוצאות של צינור העברת הנתונים ששונה:
more outputs* - כדי לצאת, מקישים על q.
- מריצים את הפייפליין ששיניתם בשירות Dataflow:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
מחליפים את מה שכתוב בשדות הבאים:
-
DATAFLOW_REGION: האזור שבו רוצים לפרוס את משימת Dataflow -
BUCKET_NAME: שם הקטגוריה שלכם ב-Cloud Storage -
PROJECT_ID: מזהה הפרויקט ב- Google Cloud
-
הסרת המשאבים
כדי לא לצבור חיובים בחשבון על המשאבים שבהם השתמשתם בדף הזה, אתם צריכים למחוק את הפרויקט יחד עם המשאבים. Google Cloud Google Cloud
- במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.
- לוחצים על תיבת הסימון של הקטגוריה שרוצים למחוק.
- כדי למחוק את הקטגוריה, לוחצים על Delete ופועלים לפי ההוראות.
אם אתם משאירים את הפרויקט, אתם צריכים לבטל את התפקידים שהקציתם לחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
אם תרצו, תוכלו לבטל את פרטי הכניסה שיצרתם ולמחוק את הקובץ המקומי של פרטי הכניסה.
gcloud auth application-default revoke
-
אם רוצים, מבטלים את פרטי הכניסה של ה-CLI של gcloud.
gcloud auth revoke