שימוש בתכונות מתקדמות של מחברת Apache Beam

שימוש ב-Apache Beam interactive runner עם מחברות JupyterLab מאפשר לכם לפתח צינורות עיבוד נתונים באופן איטרטיבי, לבדוק את גרף צינור עיבוד הנתונים ולנתח PCollections בודדים בתהליך עבודה של קריאה-הערכה-הדפסה (REPL). במדריך פיתוח באמצעות מחברות Apache Beam מוסבר איך להשתמש ב-Apache Beam interactive runner עם מחברות JupyterLab.

בדף הזה מוסבר על תכונות מתקדמות שאפשר להשתמש בהן במחברת Apache Beam.

‫FlinkRunner אינטראקטיבי באשכולות שמנוהלים על ידי מחברת

כדי לעבוד עם נתונים בגודל של נתוני ייצור באופן אינטראקטיבי מתוך המחברת, אפשר להשתמש ב-FlinkRunner עם כמה אפשרויות כלליות של צינורות עיבוד נתונים כדי להגדיר בסשן של המחברת ניהול של אשכול Dataproc לטווח ארוך והרצה של צינורות עיבוד הנתונים של Apache Beam באופן מבוזר.

דרישות מוקדמות

כדי להשתמש בתכונה הזו:

  • מפעילים את Dataproc API.
  • מקצים לחשבון השירות שמריץ את מופע המחברת ב-Dataproc תפקיד אדמין או תפקיד עריכה.
  • משתמשים בגרעין של מחברת עם Apache Beam SDK בגרסה 2.40.0 ואילך.

הגדרות אישיות

לפחות צריך להגדיר את הדברים הבאים:

# Set a Cloud Storage bucket to cache source recording and PCollections.
# By default, the cache is on the notebook instance itself, but that does not
# apply to the distributed execution scenario.
ib.options.cache_root = 'gs://<BUCKET_NAME>/flink'

# Define an InteractiveRunner that uses the FlinkRunner under the hood.
interactive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())

options = PipelineOptions()
# Instruct the notebook that Google Cloud is used to run the FlinkRunner.
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.project = 'PROJECT_ID'

הקצאה מפורשת (אופציונלי)

אפשר להוסיף את האפשרויות הבאות.

# Change this if the pipeline needs to run in a different region
# than the default, 'us-central1'. For example, to set it to 'us-west1':
cloud_options.region = 'us-west1'

# Explicitly provision the notebook-managed cluster.
worker_options = options.view_as(WorkerOptions)
# Provision 40 workers to run the pipeline.
worker_options.num_workers=40
# Use the default subnetwork.
worker_options.subnetwork='default'
# Choose the machine type for the workers.
worker_options.machine_type='n1-highmem-8'

# When working with non-official Apache Beam releases, such as Apache Beam built from source
# code, configure the environment to use a compatible released SDK container.
# If needed, build a custom container and use it. For more information, see:
# https://beam.apache.org/documentation/runtime/environments/
options.view_as(PortableOptions).environment_config = 'apache/beam_python3.7_sdk:2.41.0 or LOCATION.pkg.dev/PROJECT_ID/REPOSITORY/your_custom_container'

Usage

# The parallelism is applied to each step, so if your pipeline has 10 steps, you
# end up having 10 * 10 = 100 tasks scheduled, which can be run in parallel.
options.view_as(FlinkRunnerOptions).parallelism = 10

p_word_count = beam.Pipeline(interactive_flink_runner, options=options)
word_counts = (
    p_word_count
    | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
    | 'count' >> beam.combiners.Count.PerElement())
# The notebook session automatically starts and manages a cluster to run
# your pipelines with the FlinkRunner.
ib.show(word_counts)

# Interactively adjust the parallelism.
options.view_as(FlinkRunnerOptions).parallelism = 150
# The BigQuery read needs a Cloud Storage bucket as a temporary location.
options.view_as(GoogleCloudOptions).temp_location = ib.options.cache_root
p_bq = beam.Pipeline(runner=interactive_flink_runner, options=options)
delays_by_airline = (
    p_bq
    | 'Read Dataset from BigQuery' >> beam.io.ReadFromBigQuery(
        project=project, use_standard_sql=True,
        query=('SELECT airline, arrival_delay '
               'FROM `bigquery-samples.airline_ontime_data.flights` '
               'WHERE date >= "2010-01-01"'))
    | 'Rebalance Data to TM Slots' >> beam.Reshuffle(num_buckets=1000)
    | 'Extract Delay Info' >> beam.Map(
        lambda e: (e['airline'], e['arrival_delay'] > 0))
    | 'Filter Delayed' >> beam.Filter(lambda e: e[1])
    | 'Count Delayed Flights Per Airline' >> beam.combiners.Count.PerKey())
# This step reuses the existing cluster.
ib.collect(delays_by_airline)

# Describe the cluster running the pipelines.
# You can access the Flink dashboard from the printed link.
ib.clusters.describe()

# Cleans up all long-lasting clusters managed by the notebook session.
ib.clusters.cleanup(force=True)

אשכולות בניהול Notebook

  • כברירת מחדל, אם לא מספקים אפשרויות צינור, ‏Interactive Apache Beam תמיד משתמש מחדש באשכול שהיה בשימוש לאחרונה כדי להריץ צינור עם FlinkRunner.
    • כדי להימנע מההתנהגות הזו, למשל כדי להפעיל צינור אחר באותה סשן של מחברת עם FlinkRunner שלא מתארח במחברת, מריצים את ib.clusters.set_default_cluster(None).
  • כשיוצרים מופע של צינור חדש שמשתמש בפרויקט, באזור ובהגדרת הקצאת משאבים שממופים לאשכול Dataproc קיים, Dataflow גם משתמש מחדש באשכול, למרות שהוא לא בהכרח משתמש באשכול שהיה בשימוש לאחרונה.
  • עם זאת, בכל פעם שמתבצע שינוי בהקצאת משאבים, למשל כשמשנים את הגודל של אשכול, נוצר אשכול חדש כדי להפעיל את השינוי הרצוי. אם אתם מתכוונים לשנות את הגודל של אשכול, כדי לא לנצל את כל משאבי הענן, כדאי לנקות אשכולות מיותרים באמצעות ib.clusters.cleanup(pipeline).
  • אם מציינים Flink master_url, והוא שייך לאשכול שמנוהל על ידי סשן המחברת, Dataflow משתמש מחדש באשכול המנוהל.
    • אם מחברת הפעלה לא מכירה את master_url, זה אומר שרוצים FlinkRunner שמתארח על ידי המשתמש. המחברת לא עושה שום דבר באופן מרומז.

פתרון בעיות

בקטע הזה מוסבר איך לפתור בעיות ולבצע ניפוי באגים ב-FlinkRunner האינטראקטיבי באשכולות שמנוהלים על ידי מחברת.

כדי לפשט את התהליך, הגדרת מאגר הרשת של Flink לא מוצגת להגדרה.

אם תרשים העבודה שלכם מסובך מדי או שהמקביליות מוגדרת גבוה מדי, יכול להיות שהקרדינליות של השלבים כפול המקביליות תהיה גדולה מדי, יתוזמנו יותר מדי משימות במקביל וההפעלה תיכשל.

הטיפים הבאים יעזרו לכם לשפר את המהירות של הפעלות אינטראקטיביות:

  • מקצים למשתנה רק את PCollection שרוצים לבדוק.
  • בודקים PCollections כל אחד מהם.
  • משתמשים בערבוב מחדש אחרי טרנספורמציות עם פיצול גבוה.
  • כדאי לשנות את רמת המקביליות בהתאם לגודל הנתונים. לפעמים קטן יותר זה מהיר יותר.

הבדיקה של הנתונים נמשכת יותר מדי זמן

בודקים את לוח הבקרה של Flink כדי לראות את העבודה שמתבצעת. יכול להיות שתראו שלב שבו מאות משימות הסתיימו ונשארה רק אחת, כי נתונים שנמצאים בתהליך ממוקמים במחשב אחד ולא עוברים ערבוב.

תמיד משתמשים בפעולת ערבוב מחדש אחרי טרנספורמציה עם פיצול גבוה, למשל במקרים הבאים:

  • קריאת שורות מקובץ
  • קריאת שורות מטבלה ב-BigQuery

בלי שינוי הסדר, נתוני הפיצול תמיד מופעלים באותו worker, ואי אפשר לנצל את היתרונות של מקביליות.

כמה עובדים צריך?

ככלל, מספר הליבות הווירטואליות (vCPU) באשכול Flink הוא בערך מספר משבצות העבודה כפול מספר הליבות הווירטואליות. לדוגמה, אם יש לכם 40 עובדים מסוג n1-highmem-8, באשכול Flink יש לכל היותר 320 משבצות, כלומר 8 כפול 40.

במצב אידיאלי, תהליך העבודה יכול לנהל משימה שקוראת, ממפה ומשלבת עם מקביליות שמוגדרת במאות, שמתזמנת אלפי משימות במקביל.

האם אפשר להשתמש בו לסטרימינג?

צינורות להזרמת נתונים לא תואמים כרגע לתכונה האינטראקטיבית של Flink באשכולות שמנוהלים על ידי מחברת.

‫Beam SQL ו-beam_sql magic

Beam SQL מאפשרת לשלוח שאילתות לנתונים מוגבלים ולא מוגבלים PCollections באמצעות הצהרות SQL. אם אתם עובדים במחברת Apache Beam, אתם יכולים להשתמש בפונקציית ה-magic המותאמת אישית של IPython‏ beam_sql כדי לזרז את פיתוח צינור עיבוד הנתונים.

אתם יכולים לבדוק את השימוש בתכונות מבוססות-AI באמצעות האפשרות -h או --help:beam_sql

בדיקה בעזרה של beam_sql

אפשר ליצור PCollection מערכים קבועים:

יצירת PCollection מערכים קבועים

אפשר להצטרף לכמה PCollections:

שילוב של כמה PCollection

אפשר להפעיל משימת Dataflow באמצעות האפשרות -r DataflowRunner או --runner DataflowRunner:

הפעלת משימת Dataflow באמצעות Apache Beam SQL

מידע נוסף זמין בקובץ ה-notebook לדוגמה Apache Beam SQL in notebooks.

שיפור המהירות באמצעות הידור JIT ו-GPU

אפשר להשתמש בספריות כמו numba וGPUs כדי להאיץ את קוד Python ואת צינורות הנתונים של Apache Beam. במופע של Apache Beam notebook שנוצר עם nvidia-tesla-t4 GPU, כדי להריץ את הקוד ב-GPU, צריך לקמפל את קוד Python באמצעות numba.cuda.jit. אופציונלי: כדי לזרז את ההרצה במעבדי CPU, אפשר לקמפל את קוד Python לשפת מכונה באמצעות numba.jit או numba.njit.

בדוגמה הבאה נוצר DoFn שמעובד ב-GPU:

class Sampler(beam.DoFn):
    def __init__(self, blocks=80, threads_per_block=64):
        # Uses only 1 cuda grid with below config.
        self.blocks = blocks
        self.threads_per_block = threads_per_block

    def setup(self):
        import numpy as np
        # An array on host as the prototype of arrays on GPU to
        # hold accumulated sub count of points in the circle.
        self.h_acc = np.zeros(
            self.threads_per_block * self.blocks, dtype=np.float32)

    def process(self, element: Tuple[int, int]):
        from numba import cuda
        from numba.cuda.random import create_xoroshiro128p_states
        from numba.cuda.random import xoroshiro128p_uniform_float32

        @cuda.jit
        def gpu_monte_carlo_pi_sampler(rng_states, sub_sample_size, acc):
            """Uses GPU to sample random values and accumulates the sub count
            of values within a circle of radius 1.
            """
            pos = cuda.grid(1)
            if pos < acc.shape[0]:
                sub_acc = 0
                for i in range(sub_sample_size):
                    x = xoroshiro128p_uniform_float32(rng_states, pos)
                    y = xoroshiro128p_uniform_float32(rng_states, pos)
                    if (x * x + y * y) <= 1.0:
                        sub_acc += 1
                acc[pos] = sub_acc

        rng_seed, sample_size = element
        d_acc = cuda.to_device(self.h_acc)
        sample_size_per_thread = sample_size // self.h_acc.shape[0]
        rng_states = create_xoroshiro128p_states(self.h_acc.shape[0], seed=rng_seed)
        gpu_monte_carlo_pi_sampler[self.blocks, self.threads_per_block](
            rng_states, sample_size_per_thread, d_acc)
        yield d_acc.copy_to_host()

בתמונה הבאה אפשר לראות מחברת שפועלת ב-GPU:

הפעלת DoFn ב-GPU

פרטים נוספים זמינים במחברת לדוגמה שימוש במעבדים גרפיים עם Apache Beam.

יצירת מאגר תגים בהתאמה אישית

ברוב המקרים, אם צינור עיבוד הנתונים לא דורש תלות נוספת ב-Python או קובצי הפעלה, Apache Beam יכול להשתמש באופן אוטומטי בתמונות הקונטיינר הרשמיות שלו כדי להריץ את הקוד שהוגדר על ידי המשתמש. התמונות האלה כוללות הרבה מודולים נפוצים של Python, ואתם לא צריכים ליצור או לציין אותם באופן מפורש.

במקרים מסוימים, יכול להיות שיש לכם תלויות נוספות ב-Python או אפילו תלויות שאינן ב-Python. בתרחישים האלה, אפשר ליצור קונטיינר בהתאמה אישית ולהפוך אותו לזמין להרצה באוסף Flink. ברשימה הבאה מפורטים היתרונות של שימוש במאגר תגים בהתאמה אישית:

  • זמן הגדרה מהיר יותר להפעלות רצופות ואינטראקטיביות
  • הגדרות ויחסי תלות יציבים
  • יותר גמישות: אפשר להגדיר יותר יחסי תלות של Python

תהליך build של מאגר התגים יכול להיות מייגע, אבל אפשר לעשות הכול במחברת באמצעות דפוס השימוש הבא.

יצירת סביבת עבודה מקומית

קודם יוצרים ספריית עבודה מקומית בספריית הבית של Jupyter.

!mkdir -p /home/jupyter/.flink

הכנת יחסי תלות של Python

לאחר מכן, מתקינים את כל התלויות הנוספות של Python שאולי תשתמשו בהן, ומייצאים אותן לקובץ דרישות.

%pip install dep_a
%pip install dep_b
...

אפשר ליצור קובץ דרישות באופן מפורש באמצעות %%writefileהפונקציה הקסומה של המחברת.

%%writefile /home/jupyter/.flink/requirements.txt
dep_a
dep_b
...

אפשר גם להקפיא את כל התלויות המקומיות בקובץ דרישות. האפשרות הזו עלולה ליצור תלויות לא מכוונות.

%pip freeze > /home/jupyter/.flink/requirements.txt

הכנת יחסי התלות שאינם Python

מעתיקים את כל התלויות שאינן Python אל סביבת העבודה. אם אין לכם תלות שאינה Python, דלגו על השלב הזה.

!cp /path/to/your-dep /home/jupyter/.flink/your-dep
...

יצירת קובץ Dockerfile

יוצרים קובץ Dockerfile באמצעות הפקודה הקסומה %%writefile של מחברת. לדוגמה:

%%writefile /home/jupyter/.flink/Dockerfile
FROM apache/beam_python3.7_sdk:2.40.0

COPY  requirements.txt /tmp/requirements.txt
COPY  your_dep /tmp/your_dep
...

RUN python -m pip install -r /tmp/requirements.txt

קונטיינר לדוגמה משתמש בתמונה של Apache Beam SDK בגרסה 2.40.0 עם Python 3.7 כבסיס, מוסיף קובץ your_dep ומתקין את יחסי התלות הנוספים של Python. אפשר להשתמש בקובץ Docker הזה כתבנית ולערוך אותו בהתאם לתרחיש השימוש שלכם.

בצינורות עיבוד הנתונים של Apache Beam, כשמפנים ליחסי תלות שאינם Python, צריך להשתמש בCOPY יעדים שלהם. לדוגמה, /tmp/your_dep הוא נתיב הקובץ של קובץ your_dep.

יצירת קובץ אימג' של קונטיינר ב-Artifact Registry באמצעות Cloud Build

  1. מפעילים את השירותים Cloud Build ו-Artifact Registry, אם הם עדיין לא מופעלים.

    !gcloud services enable cloudbuild.googleapis.com
    !gcloud services enable artifactregistry.googleapis.com
    
  2. יוצרים מאגר ב-Artifact Registry כדי להעלות פריטי מידע שנוצרו בתהליך פיתוח (Artifact). כל מאגר יכול להכיל פריטי מידע שנוצרו בתהליך פיתוח (Artifact) בפורמט נתמך אחד בלבד.

    כל התוכן במאגר מוצפן באמצעות Google-owned and Google-managed encryption keys או באמצעות מפתחות הצפנה שמנוהלים על ידי הלקוח. ‫Artifact Registry משתמש ב-Google-owned and Google-managed encryption keys כברירת מחדל, ולא נדרשת הגדרה לאפשרות הזו.

    צריכה להיות לכם לפחות הרשאת כתיבה ב-Artifact Registry למאגר.

    מריצים את הפקודה הבאה כדי ליצור מאגר חדש. הפקודה משתמשת בדגל --async וחוזרת מיידית, בלי להמתין לסיום הפעולה.

    gcloud artifacts repositories create REPOSITORY \
    --repository-format=docker \
    --location=LOCATION \
    --async
    

    מחליפים את הערכים הבאים:

    • REPOSITORY: שם למאגר. שמות המאגרים חייבים להיות ייחודיים לכל מיקום מאגר בפרויקט.
    • LOCATION: המיקום של המאגר.
  3. כדי לדחוף או למשוך תמונות, צריך להגדיר את Docker לאימות בקשות ל-Artifact Registry. כדי להגדיר אימות למאגרי Docker, מריצים את הפקודה הבאה:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    הפקודה מעדכנת את ההגדרות של Docker. מעכשיו אפשר להתחבר אל Artifact Registry בפרויקט Google Cloud כדי להעלות תמונות.

  4. משתמשים ב-Cloud Build כדי ליצור את קובץ האימג' של הקונטיינר ושומרים אותו ב-Artifact Registry.

    !cd /home/jupyter/.flink \
    && gcloud builds submit \
     --tag LOCATION.pkg.dev/PROJECT_ID/REPOSITORY/flink:latest \
     --timeout=20m
    

    מחליפים את PROJECT_ID במזהה הפרויקט.

שימוש במאגרי תגים מותאמים אישית

בהתאם ל-Runner, אפשר להשתמש במאגרי תגים מותאמים אישית למטרות שונות.

למידע כללי על שימוש במאגרי Apache Beam, אפשר לעיין במאמרים הבאים:

למידע על השימוש במאגר Dataflow, אפשר לעיין במאמרים הבאים:

השבתה של כתובות IP חיצוניות

כשיוצרים מכונת מחברת של Apache Beam, כדי לשפר את האבטחה, כדאי להשבית כתובות IP חיצוניות. מכיוון שמכונות מחברת צריכות להוריד כמה משאבים מהאינטרנט הציבורי, כמו Artifact Registry, צריך קודם ליצור רשת VPC חדשה ללא כתובת IP חיצונית. לאחר מכן, יוצרים שער Cloud NAT לרשת ה-VPC הזו. מידע נוסף על Cloud NAT זמין במאמרי העזרה בנושא Cloud NAT. משתמשים ברשת VPC ובשער Cloud NAT כדי לגשת למשאבים הדרושים באינטרנט הציבורי בלי להפעיל כתובות IP חיצוניות.