עבודה עם נהלים מאוחסנים ב-Apache Spark
המסמך הזה מיועד למהנדסי נתונים, למדעני נתונים ולמנתחי נתונים שרוצים ליצור פרוצדורות מאוחסנות ולהפעיל אותן ב-Spark ב-BigQuery.
באמצעות BigQuery, אתם יכולים ליצור ולהפעיל Spark שנכתבו ב-Python, ב-Java וב-Scala. אחר כך אפשר להריץ את הפרוצדורות המאוחסנות האלה ב-BigQuery באמצעות שאילתת GoogleSQL, בדומה להרצה של פרוצדורות מאוחסנות של SQL.
לפני שמתחילים
כדי ליצור פרוצדורה מאוחסנת ל-Spark, צריך לבקש מהאדמין ליצור חיבור ל-Spark ולשתף אותו איתכם. האדמין צריך גם להעניק לחשבון השירות שמשויך לחיבור את ההרשאות הנדרשות לניהול זהויות והרשאות גישה (IAM).
התפקידים הנדרשים
כדי לקבל את ההרשאות שדרושות לביצוע המשימות שמתוארות במאמרי עזרה הזה, אתם צריכים לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים:
-
יצירת תהליך מאוחסן עבור Spark:
-
BigQuery Data Editor (
roles/bigquery.dataEditor) במערך הנתונים שבו יוצרים את הפרוצדורה המאוחסנת -
BigQuery Connection Admin (
roles/bigquery.connectionAdmin) בחיבור שבו נעשה שימוש בתהליך המאוחסן -
BigQuery Job User (
roles/bigquery.jobUser) בפרויקט
-
BigQuery Data Editor (
-
התקשרות לתהליך מאוחסן ב-Spark:
-
BigQuery Metadata Viewer (
roles/bigquery.metadataViewer) במערך הנתונים שבו מאוחסנת הפרוצדורה המאוחסנת -
BigQuery Connection User (
roles/bigquery.connectionUser) on the connection -
BigQuery Job User (
roles/bigquery.jobUser) בפרויקט
-
BigQuery Metadata Viewer (
להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.
התפקידים המוגדרים מראש האלה כוללים את ההרשאות שנדרשות לביצוע המשימות שמתוארות במסמך הזה. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:
ההרשאות הנדרשות
כדי לבצע את המשימות שמתוארות במסמך הזה, צריך את ההרשאות הבאות:
-
יצירת חיבור:
-
bigquery.connections.create -
bigquery.connections.list
-
-
יוצרים תהליך מאוחסן עבור Spark:
-
bigquery.routines.create -
bigquery.connections.delegate -
bigquery.jobs.create
-
-
קריאה לתהליך מאוחסן ב-Spark:
-
bigquery.routines.get -
bigquery.connections.use -
bigquery.jobs.create
-
יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.
שיקולים לגבי מיקום
אתם צריכים ליצור פרוצדורה מאוחסנת עבור Spark באותו מיקום שבו נמצא החיבור, כי הפרוצדורה המאוחסנת פועלת באותו מיקום שבו נמצא החיבור. לדוגמה, כדי ליצור פרוצדורה מאוחסנת במספר אזורים בארה"ב, צריך להשתמש בחיבור שנמצא במספר אזורים בארה"ב.
תמחור
החיובים על הרצת פרוצדורות Spark ב-BigQuery דומים לחיובים על הרצת פרוצדורות Spark ב-Serverless for Apache Spark. למידע נוסף, ראו תמחור של Serverless for Apache Spark.
אפשר להשתמש בפרוצדורות מאוחסנות של Spark עם מודל התמחור לפי דרישה וגם עם כל אחת ממהדורות של BigQuery. החיוב על פרוצדורות Spark מתבצע תמיד לפי המודל של מהדורת BigQuery Enterprise לתשלום לפי שימוש, ללא קשר למודל התמחור של המחשוב שבו נעשה שימוש בפרויקט.
פרוצדורות מאוחסנות של Spark ל-BigQuery לא תומכות בשימוש בהזמנות או בהתחייבויות. הזמנות קיימות והתחייבויות ימשיכו לשמש שאילתות ופרוצדורות נתמכות אחרות. החיובים על השימוש בהליכים מאוחסנים של Spark מתווספים לחשבון שלכם בעלות של מהדורת Enterprise – תשלום לפי שימוש. ההנחות של הארגון שלכם יחולו, אם הן רלוונטיות.
למרות שהפרוצדורות המאוחסנות של Spark משתמשות במנוע ההפעלה של Spark, לא תראו חיובים נפרדים על הפעלת Spark. כמו שצוין, החיובים המתאימים מדווחים בתור מק"ט של מהדורת BigQuery Enterprise בתשלום לפי שימוש.
אין רמת שירות בחינם לנהלים מאוחסנים של Spark.
יצירת תהליך מאוחסן ל-Spark
צריך ליצור את הפרוצדורה המאוחסנת באותו המיקום שבו נמצא החיבור שבו אתם משתמשים.
אם גודל הגוף של התהליך המאוחסן גדול מ-1MB, מומלץ לשים את התהליך המאוחסן בקובץ בקטגוריה של Cloud Storage במקום להשתמש בקוד מוטבע. ב-BigQuery יש שתי שיטות ליצירת תהליך מאוחסן ל-Spark באמצעות Python:
- אם רוצים להשתמש בהצהרה
CREATE PROCEDURE, צריך להשתמש בעורך שאילתות SQL. - אם רוצים להקליד קוד Python ישירות, משתמשים בעורך PySpark. אפשר לשמור את הקוד כפרוצדורה מאוחסנת.
שימוש בעורך השאילתות ב-SQL
כדי ליצור פרוצדורה מאוחסנת ל-Spark בכלי לעריכת שאילתות SQL, פועלים לפי השלבים הבאים:
עוברים לדף BigQuery.
בעורך השאילתות, מוסיפים את קוד הדוגמה של הצהרת
CREATE PROCEDUREשמופיעה.לחלופין, בחלונית Explorer, לוחצים על החיבור בפרויקט שבו השתמשתם כדי ליצור את משאב החיבור. כדי ליצור פרוצדורה מאוחסנת ל-Spark, לוחצים על Create stored procedure.
Python
כדי ליצור פרוצדורות מאוחסנות ל-Spark ב-Python, משתמשים בקוד לדוגמה הבא:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Java או Scala
כדי ליצור פרוצדורה מאוחסנת ל-Spark ב-Java או ב-Scala עם האפשרות
main_file_uri, משתמשים בקוד לדוגמה הבא:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
כדי ליצור פרוצדורה מאוחסנת ל-Spark ב-Java או ב-Scala עם האפשרויות
main_classו-jar_uris, משתמשים בקוד לדוגמה הבא:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI"]); LANGUAGE JAVA|SCALA
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: הפרויקט שבו רוצים ליצור את הפרוצדורה המאוחסנת – לדוגמה,myproject. -
DATASET: מערך הנתונים שבו רוצים ליצור את הפרוצדורה המאוחסנת, לדוגמהmydataset. -
PROCEDURE_NAME: השם של הפרוצדורה המאוחסנת שרוצים להריץ ב-BigQuery. למשל,mysparkprocedure. -
PROCEDURE_ARGUMENT: פרמטר להזנת ארגומנטים של קלט.בפרמטר הזה, מציינים את השדות הבאים:
-
ARGUMENT_MODE: המצב של הארגומנט.הערכים התקפים כוללים
IN,OUTו-INOUT. ערך ברירת המחדל הואIN. -
ARGUMENT_NAME: שם הארגומנט. -
ARGUMENT_TYPE: סוג הארגומנט.
לדוגמה:
myproject.mydataset.mysparkproc(num INT64).מידע נוסף מופיע במאמר הזה בקטע בנושא העברת ערך כפרמטר
INאו בנושא הפרמטריםOUTו-INOUT. -
-
CONNECTION_PROJECT_ID: הפרויקט שמכיל את החיבור להפעלת הפרוצדורה של Spark. -
CONNECTION_REGION: האזור שמכיל את החיבור להפעלת הפרוצדורה של Spark, לדוגמהus. -
CONNECTION_ID: מזהה החיבור, לדוגמה:myconnection.כשמציגים את פרטי החיבור במסוף Google Cloud , מזהה החיבור הוא הערך בקטע האחרון של מזהה החיבור המלא שמוצג במזהה החיבור – לדוגמה,
projects/myproject/locations/connection_location/connections/myconnection. -
RUNTIME_VERSION: גרסת זמן הריצה של Spark. לדוגמה,2.2. -
MAIN_PYTHON_FILE_URI: הנתיב לקובץ PySpark, לדוגמה,gs://mybucket/mypysparkmain.py.לחלופין, אם רוצים להוסיף את גוף הפרוצדורה המאוחסנת בהצהרה
CREATE PROCEDURE, מוסיפיםPYSPARK_CODEאחריLANGUAGE PYTHON ASכמו בדוגמה שבקטע שימוש בקוד מוטבע במסמך הזה. -
PYSPARK_CODE: ההגדרה של אפליקציית PySpark בהצהרהCREATE PROCEDUREאם רוצים להעביר את גוף הפרוצדורה בשורה.הערך הוא מחרוזת מילולית. אם הקוד כולל מירכאות ולוכסנים הפוכים, צריך להוסיף להם escape או לייצג אותם כמחרוזת גולמית. לדוגמה, קוד ההחזרה
"\n";יכול להיות מיוצג באחת מהדרכים הבאות:- מחרוזת מצוטטת:
"return \"\\n\";". גם המירכאות וגם הקווים הנטויים מסומנים בתווי בריחה (escape). - מחרוזת עם מרכאות משולשות:
"""return "\\n";""". התו '\' משמש כתווית בריחה, אבל לא המירכאות. - מחרוזת גולמית:
r"""return "\n";""". אין צורך בשימוש בתווי Escape.
- מחרוזת מצוטטת:
-
MAIN_JAR_URI: הנתיב של קובץ ה-JAR שמכיל את המחלקהmain, לדוגמה,gs://mybucket/my_main.jar. -
CLASS_NAME: השם המלא של מחלקה בקובץ JAR שמוגדר עם האפשרותjar_uris, לדוגמה,com.example.wordcount. -
URI: הנתיב של קובץ ה-JAR שמכיל את המחלקה שצוינה במחלקהmain, לדוגמה,gs://mybucket/mypysparkmain.jar.
לקבלת אפשרויות נוספות שאפשר לציין ב-
OPTIONS, אפשר לעיין ברשימת האפשרויות של הפרוצדורה.-
שימוש בעורך PySpark
כשיוצרים פרוצדורה באמצעות העורך של PySpark, לא צריך להשתמש בהצהרה CREATE PROCEDURE. במקום זאת, מוסיפים את קוד Python ישירות בעורך Pyspark ושומרים או מריצים את הקוד.
כדי ליצור פרוצדורה מאוחסנת ל-Spark בכלי לעריכת PySpark, פועלים לפי השלבים הבאים:
עוברים לדף BigQuery.
אם רוצים להקליד את קוד PySpark ישירות, פותחים את עורך PySpark. כדי לפתוח את עורך PySpark, לוחצים על התפריט ליד יצירת שאילתת SQL ואז בוחרים באפשרות יצירת תהליך PySpark.
כדי להגדיר אפשרויות, לוחצים על עוד > אפשרויות PySpark, ואז מבצעים את הפעולות הבאות:
מציינים את המיקום שבו רוצים להריץ את קוד PySpark.
בשדה Connection (חיבור), מציינים את חיבור Spark.
בקטע Stored procedure invocation (הפעלת תהליך מאוחסן), מציינים את מערך הנתונים שבו רוצים לאחסן את התהליכים המאוחסנים הזמניים שנוצרים. אתם יכולים להגדיר מערך נתונים ספציפי או לאפשר שימוש במערך נתונים זמני כדי להפעיל את קוד PySpark.
מערך הנתונים הזמני נוצר עם המיקום שצוין בשלב הקודם. אם מציינים שם של מערך נתונים, צריך לוודא שמערך הנתונים והחיבור ל-Spark נמצאים באותו מיקום.
בקטע Parameters (פרמטרים), מגדירים פרמטרים עבור ההליך המאוחסן. הערך של הפרמטר משמש רק במהלך הפעלות של קוד PySpark בסשן, אבל ההצהרה עצמה מאוחסנת בפרוצדורה.
בקטע אפשרויות מתקדמות, מציינים את האפשרויות של ההליך. רשימה מפורטת של אפשרויות הפרוצדורה מופיעה במאמר רשימת אפשרויות הפרוצדורה.
1. בקטע Properties, מוסיפים את צמדי המפתח/ערך כדי להגדיר את העבודה. אפשר להשתמש בכל אחד מצמדי המפתח/ערך מתוך מאפייני Spark הנתמכים ב-Serverless for Apache Spark.
בהגדרות חשבון השירות, מציינים את חשבון השירות המותאם אישית, את CMEK, את מערך הנתונים של שלב הביניים ואת תיקיית Cloud Storage של שלב הביניים שבהם ישתמשו במהלך הפעלות של קוד PySpark בתוך הסשן.
לוחצים על Save.
שמירת תהליך מאוחסן ל-Spark
אחרי יצירת התהליך המאוחסן באמצעות עורך PySpark, אפשר לשמור את התהליך המאוחסן. כדי לעשות זאת:
במסוף Google Cloud , עוברים לדף BigQuery.
בעורך השאילתות, יוצרים פרוצדורה מאוחסנת ל-Spark באמצעות Python עם עורך PySpark.
לוחצים על
שמירה > שמירת ההליך.בתיבת הדו-שיח שמירת תהליך מאוחסן, מציינים את שם מערך הנתונים שבו רוצים לאחסן את התהליך המאוחסן ואת שם התהליך המאוחסן.
לוחצים על Save.
אם רוצים להריץ את קוד PySpark בלי לשמור אותו כפרוצדורה מאוחסנת, אפשר ללחוץ על הפעלה במקום על שמירה.
שימוש במאגרי תגים מותאמים אישית
הקונטיינר המותאם אישית מספק את סביבת זמן הריצה לתהליכי מנהל ההתקן וההרצה של עומס העבודה. כדי להשתמש בקונטיינרים בהתאמה אישית, משתמשים בקוד לדוגמה הבא:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: הפרויקט שבו רוצים ליצור את הפרוצדורה המאוחסנת – לדוגמה,myproject. -
DATASET: מערך הנתונים שבו רוצים ליצור את הפרוצדורה המאוחסנת, לדוגמהmydataset. -
PROCEDURE_NAME: השם של הפרוצדורה המאוחסנת שרוצים להריץ ב-BigQuery – למשל,mysparkprocedure. -
PROCEDURE_ARGUMENT: פרמטר להזנת ארגומנטים של קלט.בפרמטר הזה, מציינים את השדות הבאים:
-
ARGUMENT_MODE: המצב של הארגומנט.הערכים התקפים כוללים
IN,OUTו-INOUT. ערך ברירת המחדל הואIN. -
ARGUMENT_NAME: שם הארגומנט. -
ARGUMENT_TYPE: סוג הארגומנט.
לדוגמה:
myproject.mydataset.mysparkproc(num INT64).מידע נוסף מופיע במאמר הזה בקטע בנושא העברת ערך כפרמטר
INאו בפרמטריםOUTו-INOUT. -
-
CONNECTION_PROJECT_ID: הפרויקט שמכיל את החיבור להפעלת הפרוצדורה של Spark. -
CONNECTION_REGION: האזור שמכיל את החיבור להרצת ההליך של Spark, לדוגמהus. -
CONNECTION_ID: מזהה החיבור, לדוגמה,myconnection.כשמציגים את פרטי החיבור במסוף Google Cloud , מזהה החיבור הוא הערך בקטע האחרון של מזהה החיבור המלא שמוצג במזהה החיבור – לדוגמה,
projects/myproject/locations/connection_location/connections/myconnection. -
RUNTIME_VERSION: גרסת זמן הריצה של Spark. לדוגמה,2.2. -
MAIN_PYTHON_FILE_URI: הנתיב לקובץ PySpark, לדוגמה,gs://mybucket/mypysparkmain.py.לחלופין, אם רוצים להוסיף את גוף הפרוצדורה המאוחסנת בהצהרת
CREATE PROCEDURE, מוסיפיםPYSPARK_CODEאחריLANGUAGE PYTHON AS, כמו בדוגמה שבקטע שימוש בקוד מוטבע במסמך הזה. -
PYSPARK_CODE: ההגדרה של אפליקציית PySpark בהצהרהCREATE PROCEDUREאם רוצים להעביר את גוף הפרוצדורה בשורה.הערך הוא מחרוזת מילולית. אם הקוד כולל מירכאות ולוכסנים הפוכים, צריך להוסיף להם escape או לייצג אותם כמחרוזת גולמית. לדוגמה, קוד ההחזרה
"\n";יכול להיות מיוצג באחת מהדרכים הבאות:- מחרוזת מצוטטת:
"return \"\\n\";". גם המירכאות וגם הקווים הנטויים לאחור מסומנים בתווי בריחה (escape). - מחרוזת עם מרכאות משולשות:
"""return "\\n";""". התו '\' משמש כתווית בריחה, אבל לא המירכאות. - מחרוזת גולמית:
r"""return "\n";""". אין צורך בשימוש בתווי Escape.
- מחרוזת מצוטטת:
-
CONTAINER_IMAGE: הנתיב של התמונה ב-Artifact Registry. הוא צריך להכיל רק ספריות לשימוש בהליך. אם לא מציינים, נעשה שימוש בקובץ אימג' של קונטיינר ברירת המחדל של המערכת שמשויך לגרסת זמן הריצה.
מידע נוסף על יצירת תמונת קונטיינר בהתאמה אישית באמצעות Spark זמין במאמר יצירת תמונת קונטיינר בהתאמה אישית.
הפעלת תהליך מאוחסן ב-Spark
אחרי שיוצרים פרוצדורה מאוחסנת, אפשר להפעיל אותה באחת מהדרכים הבאות:
המסוף
עוברים לדף BigQuery.
בחלונית הימנית, לוחצים על קטגוריה הגרסה הקלאסית של Explorer:

אם החלונית הימנית לא מוצגת, לוחצים על הרחבת החלונית הימנית כדי לפתוח אותה.
בחלונית Classic Explorer, מרחיבים את הפרויקט ובוחרים את הפרוצדורה המאוחסנת של Spark שרוצים להריץ.
בחלון Stored procedure info (פרטי תהליך מאוחסן), לוחצים על Invoke stored procedure (הפעלת תהליך מאוחסן). אפשר גם להרחיב את האפשרות View actions (הצגת פעולות) וללחוץ על Invoke (הפעלה).
לוחצים על Run.
בקטע כל התוצאות, לוחצים על הצגת התוצאות.
אופציונלי: בקטע Query results (תוצאות השאילתה), פועלים לפי השלבים הבאים:
כדי לראות את היומנים של מנהל התקן Spark, לוחצים על פרטי הביצוע.
אם רוצים לראות את היומנים ב-Cloud Logging, לוחצים על Job information ואז בשדה Log לוחצים על log.
כדי לקבל את נקודת הקצה של Spark History Server, לוחצים על Job information (פרטי המשימה) ואז על Spark history server (שרת היסטוריית Spark).
SQL
כדי להפעיל תהליך מאוחסן, משתמשים בהצהרה CALL PROCEDURE:
במסוף Google Cloud , עוברים לדף BigQuery.
מזינים את ההצהרה הבאה בעורך השאילתות:
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
לוחצים על הפעלה.
מידע נוסף על הרצת שאילתות זמין במאמר הרצת שאילתה אינטראקטיבית.
שימוש בחשבון שירות מותאם אישית
במקום להשתמש בזהות השירות של חיבור Spark כדי לגשת לנתונים, אתם יכולים להשתמש בחשבון שירות בהתאמה אישית כדי לגשת לנתונים בקוד Spark.
כדי להשתמש בחשבון שירות בהתאמה אישית, צריך לציין את מצב האבטחה INVOKER (באמצעות ההצהרה EXTERNAL SECURITY INVOKER) כשיוצרים פרוצדורה מאוחסנת של Spark, ולציין את חשבון השירות כשמפעילים את הפרוצדורה המאוחסנת.
כשמריצים את הפרוצדורה המאוחסנת של Spark עם חשבון השירות המותאם אישית בפעם הראשונה, BigQuery יוצר סוכן שירות של Spark ומעניק לסוכן השירות את ההרשאות הנדרשות. חשוב לוודא שלא משנים את ההרשאה הזו לפני שמפעילים את הפרוצדורה המאוחסנת של Spark. פרטים נוספים זמינים במאמר בנושא BigQuery Spark Service Agent.
כדי לגשת לקוד Spark מ-Cloud Storage ולהשתמש בו, צריך להעניק את ההרשאות הנדרשות לזהות השירות של חיבור Spark. צריך להעניק לחשבון השירות של החיבור את ההרשאה storage.objects.get ב-IAM או את התפקיד storage.objectViewer ב-IAM.
אם ציינתם את Dataproc Metastore ואת Dataproc Persistent History Server בחיבור, תוכלו להעניק לחשבון השירות של החיבור גישה אליהם. מידע נוסף מופיע במאמר הענקת גישה לחשבון השירות.
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
אפשר גם להוסיף לקוד הקודם את הארגומנטים הבאים:
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
מחליפים את מה שכתוב בשדות הבאים:
-
CUSTOM_SERVICE_ACCOUNT: חובה. חשבון שירות מותאם אישית שסיפקתם. -
BUCKET_NAME: אופציונלי. הקטגוריה של Cloud Storage שמשמשת כמערכת הקבצים של אפליקציית Spark כברירת מחדל. אם לא מספקים את הפרטים האלה, נוצרת קטגוריה של Cloud Storage כברירת מחדל בפרויקט, והקטגוריה משותפת לכל העבודות שפועלות באותו פרויקט. -
DATASET: אופציונלי. מערך הנתונים שבו יאוחסנו הנתונים הזמניים שנוצרו מהפעלת הפרוצדורה. הנתונים מנוקים אחרי שהעבודה מסתיימת. אם לא מספקים את הפרטים האלה, נוצר מערך נתונים זמני כברירת מחדל בשביל העבודה.
לחשבון השירות המותאם אישית צריכות להיות ההרשאות הבאות:
כדי לקרוא ולכתוב לקטגוריית ההכנה לפרסום שמשמשת כמערכת הקבצים של אפליקציית Spark שמוגדרת כברירת מחדל:
- הרשאות
storage.objects.*או תפקיד IAMroles/storage.objectAdminבקטגוריית הביניים שאתם מציינים. - בנוסף, צריך להקצות את ההרשאות
storage.buckets.*או את תפקיד IAMroles/storage.Adminבפרויקט, אם לא צוינה קטגוריית ביניים.
- הרשאות
(אופציונלי) כדי לקרוא ולכתוב נתונים מ-BigQuery ואליו:
-
bigquery.tables.*בטבלאות BigQuery. -
bigquery.readsessions.*בפרויקט. - תפקיד ה-IAM
roles/bigquery.adminכולל את ההרשאות הקודמות.
-
(אופציונלי) כדי לקרוא ולכתוב נתונים מ-Cloud Storage ואליו:
- הרשאות
storage.objects.*או תפקיד IAMroles/storage.objectAdminבאובייקטים של Cloud Storage.
- הרשאות
(אופציונלי) כדי לקרוא ולכתוב לערכת הנתונים הזמנית שמשמשת לפרמטרים של
INOUT/OUT:-
bigquery.tables.*אוroles/bigquery.dataEditorתפקיד IAM במערך הנתונים של הביניים שציינתם. - בנוסף, צריך להגדיר את ההרשאה
bigquery.datasets.createאו את תפקיד ה-IAMroles/bigquery.dataEditorבפרויקט אם לא צוין מערך נתונים זמני.
-
דוגמאות לתהליכים מאוחסנים ב-Spark
בקטע הזה מוצגות דוגמאות לאופן שבו אפשר ליצור פרוצדורה מאוחסנת עבור Apache Spark.
שימוש בקובץ PySpark או בקובץ JAR ב-Cloud Storage
בדוגמה הבאה מוצג איך ליצור פרוצדורה מאוחסנת ל-Spark באמצעות חיבור my-project-id.us.my-connection וקובץ PySpark או JAR שמאוחסן בקטגוריה של Cloud Storage:
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java או Scala
כדי ליצור נהלים מאוחסנים, משתמשים ב-main_file_uri:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
כדי ליצור נהלים מאוחסנים, משתמשים ב-main_class:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
שימוש בקוד בתוך שורה
בדוגמה הבאה אפשר לראות איך ליצור פרוצדורה מאוחסנת ל-Spark באמצעות החיבור my-project-id.us.my-connection וקוד PySpark מוטבע:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
העברת ערך כפרמטר קלט
בדוגמאות הבאות מוצגות שתי השיטות להעברת ערך כפרמטר קלט ב-Python:
שיטה 1: שימוש במשתני סביבה
בקוד PySpark, אפשר לקבל את פרמטרי הקלט של הפרוצדורה המאוחסנת עבור Spark דרך משתני סביבה במנהל ובמבצעים של Spark. הפורמט של שם משתנה הסביבה הוא BIGQUERY_PROC_PARAM.PARAMETER_NAME, כאשר PARAMETER_NAME הוא שם פרמטר הקלט. לדוגמה, אם שם פרמטר הקלט הוא var, שם משתנה הסביבה התואם הוא var.BIGQUERY_PROC_PARAM.var פרמטרי הקלט הם JSON encoded.
בקוד PySpark, אפשר לקבל את ערך הפרמטר במחרוזת JSON ממשתנה הסביבה ולפענח אותו למשתנה Python.
בדוגמה הבאה מוצג איך מקבלים את הערך של פרמטר קלט מהסוג INT64 בקוד PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
שיטה 2: שימוש בספרייה מובנית
בקוד PySpark, אפשר פשוט לייבא ספרייה מובנית ולהשתמש בה כדי לאכלס את כל סוגי הפרמטרים. כדי להעביר את הפרמטרים למבצעים, מאכלסים את הפרמטרים במנהל התקנים של Spark כמשתני Python ומעבירים את הערכים למבצעים. הספרייה המובנית תומכת ברוב סוגי הנתונים של BigQuery, למעט INTERVAL, GEOGRAPHY, NUMERIC ו-BIGNUMERIC.
| סוג נתונים ב-BigQuery | סוג הנתונים ב-Python |
|---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
לא נתמך |
BIGNUMERIC
|
לא נתמך |
INTERVAL
|
לא נתמך |
GEOGRAPHY
|
לא נתמך |
בדוגמה הבאה מוצג איך לייבא את הספרייה המובנית ולהשתמש בה כדי לאכלס פרמטר קלט מסוג INT64 ופרמטר קלט מסוג ARRAY<STRUCT<a INT64, b STRING>> בקוד PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
בקוד Java או Scala, אפשר לקבל את פרמטרי הקלט של הפרוצדורה המאוחסנת עבור Spark דרך משתני סביבה במנהל ההתקנים ובמבצעים של Spark. הפורמט של שם משתנה הסביבה הוא BIGQUERY_PROC_PARAM.PARAMETER_NAME, כאשר PARAMETER_NAME הוא שם פרמטר הקלט. לדוגמה, אם שם פרמטר הקלט הוא var, שם משתנה הסביבה התואם הוא BIGQUERY_PROC_PARAM.var.
בקוד Java או Scala, אפשר לקבל את ערך פרמטר הקלט ממשתנה הסביבה.
בדוגמה הבאה מוצג איך מקבלים את הערך של פרמטר קלט ממשתני סביבה אל קוד Scala:
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
בדוגמה הבאה מוצגות דרכים להעברת פרמטרים של קלט ממשתני סביבה לקוד Java:
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
העברת ערכים כפרמטרים OUT ו-INOUT
פרמטרים של פלט מחזירים את הערך מהפרוצדורה של Spark, ואילו הפרמטר INOUT מקבל ערך עבור הפרוצדורה ומחזיר ערך מהפרוצדורה.
כדי להשתמש בפרמטרים OUT ו-INOUT, מוסיפים את מילת המפתח OUT או INOUT לפני שם הפרמטר כשיוצרים את הפרוצדורה של Spark. בקוד PySpark, משתמשים בספרייה המובנית כדי להחזיר ערך כפרמטר OUT או INOUT. בדומה לפרמטרים של קלט, הספרייה המובנית תומכת ברוב סוגי הנתונים של BigQuery, למעט INTERVAL, GEOGRAPHY, NUMERIC ו-BIGNUMERIC. הערכים של סוגי הנתונים TIME ו-DATETIME מומרים לאזור הזמן UTC כשהם מוחזרים כפרמטרים OUT או INOUT.
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
קריאה מטבלה ב-Hive Metastore וכתיבת התוצאות ל-BigQuery
בדוגמה הבאה מוסבר איך לשנות טבלה ב-Hive Metastore ולכתוב את התוצאות ב-BigQuery:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
הצגת מסנני היומן
אחרי שמתקשרים להליך מאוחסן ב-Spark, אפשר לראות את פרטי היומן. כדי לקבל את פרטי המסנן של Cloud Logging ואת נקודת הקצה של Spark History Cluster, משתמשים בפקודה bq
show.
פרטי המסנן זמינים בשדה SparkStatistics של משימת הצאצא. כדי לקבל מסנני יומנים:
עוברים לדף BigQuery.
בעורך השאילתות, מפרטים את עבודות הצאצא של סקריפט העבודה של הפרוצדורה המאוחסנת:
bq ls -j --parent_job_id=$parent_job_id
במאמר איך צופים בפרטי המשימה מוסבר איך מקבלים את מזהה המשימה.
הפלט אמור להיראות כך:
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
מזהים את
jobIdשל הפרוצדורה המאוחסנת ומשתמשים בפקודהbq showכדי לראות את פרטי העבודה:bq show --format=prettyjson --job $child_job_id
מעתיקים את השדה
sparkStatisticsכי תצטרכו אותו בשלב אחר.הפלט אמור להיראות כך:
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
ב-Logging, יוצרים מסנני יומן עם השדות
SparkStatistics:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
היומנים נכתבים במשאב
bigquery.googleapis.com/SparkJobהמנוטר. היומנים מסומנים בתוויות לפי הרכיביםINFO,DRIVERו-EXECUTOR. כדי לסנן יומנים ממנהל ההתקן של Spark, מוסיפים את הרכיבlabels.component = "DRIVER"לפילטרים של היומן. כדי לסנן יומנים מה-executor של Spark, מוסיפים את הרכיבlabels.component = "EXECUTOR"למסנני היומנים.
שימוש במפתח הצפנה בניהול הלקוח
הפרוצדורה של BigQuery Spark משתמשת במפתח הצפנה בניהול הלקוח (CMEK) כדי להגן על התוכן, בנוסף להצפנת ברירת המחדל שמספק BigQuery. כדי להשתמש ב-CMEK בהליך Spark, קודם צריך להפעיל את היצירה של חשבון השירות להצפנה ב-BigQuery ולהעניק את ההרשאות הנדרשות. הפרוצדורה של Spark תומכת גם במדיניות הארגון ל-CMEK אם היא חלה על הפרויקט.
אם הפרוצדורה המאוחסנת שלכם משתמשת במצב האבטחה INVOKER, צריך לציין את ה-CMEK באמצעות משתנה המערכת של SQL כשקוראים לפרוצדורה. אחרת, אפשר לציין את ה-CMEK דרך החיבור שמשויך לפרוצדורה המאוחסנת.
כדי לציין את ה-CMEK דרך החיבור כשיוצרים פרוצדורה מאוחסנת של Spark, משתמשים בקוד לדוגמה הבא:
bq mk --connection --connection_type='SPARK' \
--properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
--project_id=PROJECT_ID \
--location=LOCATION \
CONNECTION_NAME
כדי לציין CMEK דרך משתנה המערכת של SQL כשמפעילים את הפרוצדורה, משתמשים בקוד לדוגמה הבא:
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
שימוש ב-VPC Service Controls
בעזרת VPC Service Controls אפשר להגדיר מתחם אבטחה היקפית כדי להגן מפני זליגת נתונים. כדי להשתמש ב-VPC Service Controls עם פרוצדורת Spark לאבטחה נוספת, קודם צריך ליצור גבול גזרה לשירות.
כדי להגן באופן מלא על עבודות של פרוצדורות Spark, מוסיפים את ממשקי ה-API הבאים לגבולות גזרה לשירות:
- BigQuery API (
bigquery.googleapis.com) - Cloud Logging API (
logging.googleapis.com) - Cloud Storage API (
storage.googleapis.com), אם אתם משתמשים ב-Cloud Storage - Artifact Registry API (
artifactregistry.googleapis.com) או Container Registry API (containerregistry.googleapis.com), אם משתמשים בקונטיינר בהתאמה אישית - Dataproc Metastore API (
metastore.googleapis.com) ו-Cloud Run Admin API (run.googleapis.com), אם אתם משתמשים ב-Dataproc Metastore
מוסיפים את פרויקט השאילתה של הפרוצדורה של Spark להיקף. מוסיפים לגבולות הגזרה את הפרויקטים האחרים שמארחים את הקוד או הנתונים של Spark.
שיטות מומלצות
כשמשתמשים בחיבור בפרויקט בפעם הראשונה, לוקח עוד דקה בערך להקצאת משאבים. כדי לחסוך זמן, אתם יכולים להשתמש מחדש בחיבור Spark קיים כשאתם יוצרים פרוצדורה מאוחסנת ל-Spark.
כשיוצרים פרוצדורת Spark לשימוש בסביבת הייצור, מומלץ לציין גרסת זמן ריצה. רשימת גרסאות ה-runtime הנתמכות מופיעה במאמר בנושא גרסאות ה-runtime של Serverless for Apache Spark. מומלץ להשתמש בגרסת Long-Time-Support (LTS).
כשמציינים קונטיינר בהתאמה אישית בהליך Spark, מומלץ להשתמש ב-Artifact Registry ובהזרמת תמונות.
כדי לשפר את הביצועים, אפשר לציין מאפיינים של הקצאת משאבים בהליך Spark. פרוצדורות מאוחסנות של Spark תומכות ברשימה של מאפייני הקצאת משאבים, כמו ב-Serverless for Apache Spark.
מגבלות
- אפשר להשתמש רק בפרוטוקול של נקודת קצה gRPC כדי להתחבר אל Dataproc Metastore. אין תמיכה בסוגים אחרים של Hive Metastore.
- מפתחות הצפנה בניהול הלקוח (CMEK) זמינים רק כשלקוחות יוצרים פרוצדורות Spark באזור יחיד. לדוגמה, לא ניתן להשתמש במפתחות CMEK באזור גלובלי ובמפתחות CMEK במספר אזורים, כמו
EUאוUS. - העברת פרמטרים של פלט נתמכת רק ב-PySpark.
- אם מערך הנתונים שמשויך לפרוצדורה המאוחסנת של Spark משוכפל לאזור יעד באמצעות שכפול של מערך נתונים בין אזורים, אפשר לשלוח שאילתות לפרוצדורה המאוחסנת רק באזור שבו היא נוצרה.
- Spark לא תומך בגישה לנקודות קצה (endpoints) של HTTP ברשת הפרטית שלכם ב-VPC Service Controls.
מכסות ומגבלות
מידע על מכסות ומגבלות זמין במאמר stored procedures for Spark quotas and limits.