כתיבת תרשימי DAG ב-Airflow

Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)

במדריך הזה נסביר איך לכתוב גרף אציקלי מכוון (DAG) של Apache Airflow שפועל בסביבת Managed Service for Apache Airflow.

מכיוון ש-Apache Airflow לא מספק בידוד חזק של DAG ומשימות, מומלץ להשתמש בסביבות ייצור ובדיקה נפרדות כדי למנוע הפרעות ב-DAG. מידע נוסף זמין במאמר בנושא בדיקת DAG.

הגדרת מבנה של DAG ב-Airflow

גרף מכוון מחזורי (DAG) של Airflow מוגדר בקובץ Python ומורכב מהרכיבים הבאים:

  • הגדרת DAG
  • פעולות על זרימת אוויר
  • קשרי אופרטורים

בקטעי הקוד הבאים מוצגות דוגמאות לכל רכיב ללא הקשר.

הגדרת DAG

בדוגמה הבאה מוצגת הגדרה של Airflow DAG:

Airflow 2

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

אופרטורים ומשימות

רכיבי Airflow Operators מתארים את העבודה שצריך לבצע. משימה היא מופע ספציפי של אופרטור.

Airflow 2

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

Airflow 1

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

קשרים בין משימות

קשרי משימות מתארים את הסדר שבו צריך להשלים את העבודה.

Airflow 2

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Airflow 1

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

דוגמה מלאה לתהליך עבודה של DAG ב-Python

תהליך העבודה הבא הוא תבנית DAG מלאה שפועלת ומורכבת משתי משימות: משימה hello_python ומשימה goodbye_bash:

Airflow 2


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

מידע נוסף על הגדרת DAG ב-Airflow זמין בהדרכה בנושא Airflow ובמאמר מושגים ב-Airflow.

פעולות על זרימת אוויר

בדוגמאות הבאות מוצגים כמה אופרטורים פופולריים של Airflow. לעיון במקור מידע מהימן על אופרטורים של Airflow, אפשר לעיין בחומר העזר בנושא אופרטורים ו-Hooks ובאינדקס הספקים.

BashOperator

משתמשים ב-BashOperator כדי להריץ תוכניות של שורת פקודה.

Airflow 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Airflow 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

‫Managed Airflow מריץ את הפקודות שצוינו בסקריפט Bash ב-worker של Airflow. ה-worker הוא קונטיינר Docker מבוסס-Debian וכולל כמה חבילות.

PythonOperator

משתמשים ב-PythonOperator כדי להריץ קוד Python שרירותי.

‫Managed Airflow מריץ את קוד Python בקונטיינר שכולל חבילות עבור גרסת האימג' של Managed Airflow שבה נעשה שימוש בסביבה שלכם.

כדי להתקין חבילות Python נוספות, ראו התקנת יחסי תלות ב-Python.

Google Cloud אופרטורים

כדי להריץ משימות שמשתמשות במוצרי Google Cloud , משתמשים באופרטורים שלGoogle Cloud Airflow. לדוגמה, אופרטורים של BigQuery שולחים שאילתות ומעבדים נתונים ב-BigQuery.

יש עוד הרבה אופרטורים של Airflow ל Google Cloud ולשירותים ספציפיים שמוצעים על ידי Google Cloud. Google Cloud רשימה מלאה של אופרטורים

Airflow 2

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

Airflow 1

from airflow.contrib.operators import bigquery_operator

    # Query recent StackOverflow questions.
    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
        task_id="bq_recent_questions_query",
        sql="""
        SELECT owner_display_name, title, view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
            AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
        ORDER BY view_count DESC
        LIMIT 100
        """.format(
            max_date=max_query_date, min_date=min_query_date
        ),
        use_legacy_sql=False,
        destination_dataset_table=bq_recent_questions_table_id,
    )

EmailOperator

משתמשים ב-EmailOperator כדי לשלוח אימייל מ-DAG. כדי לשלוח אימייל מסביבת Managed Airflow, צריך להגדיר את הסביבה לשימוש ב-SendGrid.

Airflow 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Airflow 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

התראות על כשלים באופרטור

מגדירים את email_on_failure ל-True כדי לשלוח התראה באימייל כשמפעיל ב-DAG נכשל. כדי לשלוח התראות באימייל מסביבת Managed Airflow, צריך להגדיר את הסביבה לשימוש ב-SendGrid.

Airflow 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Airflow 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

הנחיות לתהליכי עבודה מסוג DAG

  • ממקמים את כל ספריות Python בהתאמה אישית בארכיון ZIP של DAG בספרייה מקוננת. אין למקם ספריות ברמה העליונה של ספריית ה-DAG.

    כש-Airflow סורק את התיקייה dags/, הוא בודק רק אם יש DAG במודולי Python שנמצאים ברמה העליונה של תיקיית ה-DAG וברמה העליונה של ארכיון ZIP שנמצא גם הוא ברמה העליונה של התיקייה dags/. אם Airflow נתקל במודול Python בארכיון ZIP שלא מכיל את מחרוזות המשנה airflow ו-DAG, הוא מפסיק את העיבוד של ארכיון ה-ZIP. ‫Airflow מחזיר רק את ה-DAG שנמצאו עד לנקודה הזו.

  • משתמשים ב-Airflow 2 במקום ב-Airflow 1.

    קהילת Airflow לא מפרסמת יותר גרסאות משניות חדשות או גרסאות תיקון ל-Airflow 1.

  • כדי להבטיח עמידות בפני תקלות, אל תגדירו כמה אובייקטים של DAG באותו מודול Python.

  • אל תשתמשו ב-SubDAGs. במקום זאת, מקבצים משימות בתוך DAGs.

  • ממקמים את הקבצים הנדרשים בזמן הניתוח של DAG בתיקייה dags/, ולא בתיקייה data/.

  • הטמעה של בדיקות יחידה ל-DAG

  • מומלץ לבדוק DAG שפותח או שונה לפי ההוראות לבדיקת DAG.

  • מוודאים שפיתוח של DAG לא גורם לעלייה גדולה מדי בזמני הניתוח של DAG.

  • יכולות להיות כמה סיבות לכשל במשימות של Airflow. כדי למנוע כשלים בהפעלות של DAG שלם, מומלץ להפעיל ניסיונות חוזרים של משימות. הגדרת מספר הניסיונות החוזרים המקסימלי לערך 0 פירושה שלא יתבצעו ניסיונות חוזרים.

    מומלץ לשנות את האפשרות default_task_retries עם ערך אחר של ניסיונות חוזרים למשימה, ולא 0. בנוסף, אפשר להגדיר את הפרמטר retries ברמת המשימה.

  • אם רוצים להשתמש ב-GPU במשימות Airflow, צריך ליצור אשכול GKE נפרד שמבוסס על צמתים באמצעות מכונות עם GPU. משתמשים ב-GKEStartPodOperator כדי להריץ את המשימות.

  • מומלץ להימנע מהרצת משימות שדורשות הרבה משאבים מהמעבד ומהזיכרון במאגר הצמתים של האשכול, שבו פועלים רכיבי Airflow אחרים (מתזמנים, עובדים, שרתי אינטרנט). במקום זאת, אפשר להשתמש ב-KubernetesPodOperator או ב-GKEStartPodOperator.

  • כשפורסים DAG בסביבה, מעלים לתיקייה /dags רק את הקבצים שנדרשים לפרשנות ולביצוע של DAG.

  • הגבלת מספר קובצי ה-DAG בתיקייה /dags.

    מערכת Airflow מנתחת באופן רציף קובצי DAG בתיקייה /dags. הניתוח הוא תהליך שחוזר על עצמו בתיקיית ה-DAG, ומספר הקבצים שצריך לטעון (עם התלות שלהם) משפיע על הביצועים של ניתוח ה-DAG ותזמון המשימות. יעיל הרבה יותר להשתמש ב-100 קבצים עם 100 DAG בכל אחד מאשר ב-10,000 קבצים עם DAG אחד בכל אחד, ולכן מומלץ לבצע אופטימיזציה כזו. האופטימיזציה הזו היא איזון בין זמן הניתוח לבין היעילות של יצירה וניהול של DAG.

    לדוגמה, אם רוצים לפרוס 10,000 קובצי DAG, אפשר ליצור 100 קובצי ZIP, שכל אחד מהם מכיל 100 קובצי DAG.

    בנוסף להצעות שלמעלה, אם יש לכם יותר מ-10,000 קובצי DAG, יכול להיות שכדאי ליצור DAG באופן פרוגרמטי. לדוגמה, אפשר להטמיע קובץ Python DAG יחיד שיוצר מספר מסוים של אובייקטים של DAG (לדוגמה, 20 או 100 אובייקטים של DAG).

  • לא מומלץ להשתמש באופרטורים של Airflow שהוצאו משימוש. במקום זאת, משתמשים בחלופות העדכניות שלהם.

שאלות נפוצות בנושא כתיבת DAG

איך אפשר לצמצם את כפילויות הקוד אם רוצים להריץ את אותן משימות או משימות דומות בכמה DAG?

מומלץ להגדיר ספריות ועטיפות כדי לצמצם את כפילות הקוד.

איך אפשר לעשות שימוש חוזר בקוד בין קובצי DAG?

מכניסים את פונקציות השירות לספריית Python מקומית ומייבאים את הפונקציות. אפשר להפנות לפונקציות בכל DAG שנמצא בתיקייה dags/ בדלי של הסביבה.

איך אפשר לצמצם את הסיכון להגדרות שונות?

לדוגמה, יש לכם שני צוותים שרוצים לצבור נתונים גולמיים למדדי הכנסות. הצוותים כותבים שתי משימות שונות במקצת שמשיגות את אותו הדבר. הגדרת ספריות כדי לעבוד עם נתוני ההכנסות, כך שהמטמיעים של DAG צריכים להבהיר את ההגדרה של ההכנסות שמצטברות.

איך מגדירים תלות בין DAG?

ההגדרה תלויה באופן שבו רוצים להגדיר את התלות.

אם יש לכם שני DAG‏ (DAG A ו-DAG B) ואתם רוצים ש-DAG B יופעל אחרי DAG A, אתם יכולים להוסיף TriggerDagRunOperator בסוף DAG A.

אם DAG B תלוי רק בארטיפקט שנוצר על ידי DAG A, כמו הודעת Pub/Sub, יכול להיות שחיישן יתאים יותר.

אם DAG B משולב בצורה הדוקה עם DAG A, יכול להיות שאפשר למזג את שני ה-DAG ל-DAG אחד.

איך מעבירים מזהי הפעלה ייחודיים ל-DAG ולמשימות שלו?

לדוגמה, אתם רוצים להעביר שמות של אשכולות Dataproc ונתיבי קבצים.

כדי ליצור מזהה ייחודי אקראי, מחזירים str(uuid.uuid4()) ב-PythonOperator. המזהה יופיע ב-XComs, כך שתוכלו להפנות למזהה באופרטורים אחרים באמצעות שדות של תבניות.

לפני שמפיקים uuid, כדאי לשקול אם מזהה ספציפי ל-DagRun יהיה שימושי יותר. אפשר גם להפנות למזהים האלה בהחלפות של Jinja באמצעות פקודות מאקרו.

איך מפרידים בין משימות ב-DAG?

כל משימה צריכה להיות יחידת עבודה אידמפוטנטית. לכן, מומלץ להימנע מהוספת תהליך עבודה מרובה שלבים בתוך משימה אחת, כמו תוכנית מורכבת שפועלת ב-PythonOperator.

האם כדאי להגדיר כמה משימות ב-DAG אחד כדי לצבור נתונים מכמה מקורות?

לדוגמה, יש לכם כמה טבלאות עם נתונים גולמיים ואתם רוצים ליצור נתונים מצטברים יומיים לכל טבלה. המשימות לא תלויות זו בזו. כדאי ליצור משימה אחת ו-DAG אחד לכל טבלה, או ליצור DAG כללי אחד?

אם אתם מסכימים שכל משימה תשתף את אותם מאפיינים ברמת ה-DAG, כמו schedule, כדאי להגדיר כמה משימות ב-DAG אחד. אחרת, כדי לצמצם את כפילות הקוד, אפשר ליצור כמה DAG ממודול Python יחיד על ידי הצבתם ב-globals() של המודול.

איך מגבילים את מספר המשימות שפועלות בו-זמנית ב-DAG?

לדוגמה, אם אתם רוצים להימנע מחריגה ממכסות או ממגבלות שימוש ב-API, או מריצה של יותר מדי תהליכים בו-זמנית.

אתם יכולים להגדיר מאגרי Airflow בממשק המשתמש האינטרנטי של Airflow ולשייך משימות למאגרים קיימים ב-DAG.

שאלות נפוצות על שימוש באופרטורים

כדאי להשתמש ב-DockerOperator?

אנחנו לא ממליצים להשתמש ב-DockerOperator, אלא אם משתמשים בו להפעלת קונטיינרים בהתקנת Docker מרחוק (לא בתוך אשכול של סביבה). בסביבת Managed Airflow, לאופרטור אין גישה לדמונים של Docker.

במקום זאת, צריך להשתמש ב-KubernetesPodOperator או ב-GKEStartPodOperator. האופרטורים האלה מפעילים pods של Kubernetes באשכולות Kubernetes או GKE, בהתאמה. הערה: אנחנו לא ממליצים להפעיל pods באשכול של סביבה, כי זה עלול להוביל לתחרות על משאבים.

כדאי להשתמש ב-SubDagOperator?

לא מומלץ להשתמש ב-SubDagOperator.

כדאי להשתמש בחלופות כמו שמפורט במאמר קיבוץ משימות.

האם כדאי להריץ קוד Python רק ב-PythonOperators כדי להפריד באופן מלא בין אופרטורים של Python?

בהתאם ליעד שלכם, יש כמה אפשרויות.

אם הדבר היחיד שחשוב לכם הוא לשמור על יחסי תלות נפרדים של Python, אתם יכולים להשתמש ב-PythonVirtualenvOperator.

כדאי להשתמש ב-KubernetesPodOperator. האופרטור הזה מאפשר להגדיר קבוצות Pod של Kubernetes ולהריץ את קבוצות ה-Pod באשכולות אחרים.

איך מוסיפים חבילות בינאריות או חבילות שאינן PyPI בהתאמה אישית?

אתם יכולים להתקין חבילות שמתארחות במאגרי חבילות פרטיים.

איך מעבירים ארגומנטים באופן אחיד ל-DAG ולמשימות שלו?

אתם יכולים להשתמש בתמיכה המובנית של Airflow בתבניות Jinja כדי להעביר ארגומנטים שאפשר להשתמש בהם בשדות של תבניות.

מתי מתבצעת החלפת התבנית?

החלפת התבנית מתבצעת בתהליכי העבודה של Airflow ממש לפני שקוראים לפונקציה pre_execute של אופרטור. בפועל, המשמעות היא שהתבניות לא מוחלפות עד לרגע שלפני הפעלת המשימה.

איך אפשר לדעת אילו ארגומנטים של אופרטור תומכים בהחלפת תבנית?

ארגומנטים של אופרטורים שתומכים בהחלפת תבניות Jinja2 מסומנים במפורש ככאלה.

מחפשים את השדה template_fields בהגדרת האופרטור, שמכיל רשימה של שמות ארגומנטים שעוברים החלפה בתבנית.

לדוגמה, אפשר לעיין ב-BashOperator, שתומך ביצירת תבניות לארגומנטים bash_command ו-env.

אופרטורים של Airflow שהוצאו משימוש והוסרו

האופרטורים של Airflow שמפורטים בטבלה הבאה הוצאו משימוש:

  • אל תשתמשו באופרטורים האלה ב-DAG. במקום זאת, צריך להשתמש באופרטורים העדכניים החלופיים שצוינו.

  • אם אופרטור מופיע כזמין, המשמעות היא שהאופרטור הזה עדיין זמין בגרסה האחרונה של Managed Airflow (1.20.12).

  • חלק מהאופרטורים להחלפה לא נתמכים באף גרסה של Managed Airflow (דור קודם 1). כדי להשתמש בהם, מומלץ לשדרג ל-Managed Airflow (דור 3) או ל-Managed Airflow (דור 2).

אופרטור שהוצא משימוש סטטוס אופרטור החלפה אפשרות החלפה זמינה מ-
CreateAutoMLTextTrainingJobOperator זמין בגרסה 1.20.12 SupervisedFineTuningTrainOperator אופרטור החלפה לא זמין
GKEDeploymentHook זמין בגרסה 1.20.12 GKEKubernetesHook אופרטור החלפה לא זמין
GKECustomResourceHook זמין בגרסה 1.20.12 GKEKubernetesHook אופרטור החלפה לא זמין
GKEPodHook זמין בגרסה 1.20.12 GKEKubernetesHook אופרטור החלפה לא זמין
GKEJobHook זמין בגרסה 1.20.12 GKEKubernetesHook אופרטור החלפה לא זמין
GKEPodAsyncHook זמין בגרסה 1.20.12 GKEKubernetesAsyncHook אופרטור החלפה לא זמין
SecretsManagerHook זמין בגרסה 1.20.12 GoogleCloudSecretManagerHook אופרטור החלפה לא זמין
BigQueryExecuteQueryOperator זמין בגרסה 1.20.12 BigQueryInsertJobOperator זמין בגרסה 1.20.12
BigQueryPatchDatasetOperator זמין בגרסה 1.20.12 BigQueryUpdateDatasetOperator זמין בגרסה 1.20.12
DataflowCreateJavaJobOperator זמין בגרסה 1.20.12 beam.BeamRunJavaPipelineOperator זמין בגרסה 1.20.12
DataflowCreatePythonJobOperator זמין בגרסה 1.20.12 beam.BeamRunPythonPipelineOperator זמין בגרסה 1.20.12
DataprocSubmitPigJobOperator זמין בגרסה 1.20.12 DataprocSubmitJobOperator זמין בגרסה 1.20.12
DataprocSubmitHiveJobOperator זמין בגרסה 1.20.12 DataprocSubmitJobOperator זמין בגרסה 1.20.12
DataprocSubmitSparkSqlJobOperator זמין בגרסה 1.20.12 DataprocSubmitJobOperator זמין בגרסה 1.20.12
DataprocSubmitSparkJobOperator זמין בגרסה 1.20.12 DataprocSubmitJobOperator זמין בגרסה 1.20.12
DataprocSubmitHadoopJobOperator זמין בגרסה 1.20.12 DataprocSubmitJobOperator זמין בגרסה 1.20.12
DataprocSubmitPySparkJobOperator זמין בגרסה 1.20.12 DataprocSubmitJobOperator זמין בגרסה 1.20.12
BigQueryTableExistenceAsyncSensor זמין בגרסה 1.20.12 BigQueryTableExistenceSensor אופרטור החלפה לא זמין
BigQueryTableExistencePartitionAsyncSensor זמין בגרסה 1.20.12 BigQueryTablePartitionExistenceSensor אופרטור החלפה לא זמין
CloudComposerEnvironmentSensor זמין בגרסה 1.20.12 CloudComposerCreateEnvironmentOperator, CloudComposerDeleteEnvironmentOperator, CloudComposerUpdateEnvironmentOperator אופרטור החלפה לא זמין
GCSObjectExistenceAsyncSensor זמין בגרסה 1.20.12 GCSObjectExistenceSensor אופרטור החלפה לא זמין
GoogleAnalyticsHook זמין בגרסה 1.20.12 GoogleAnalyticsAdminHook אופרטור החלפה לא זמין
GoogleAnalyticsListAccountsOperator זמין בגרסה 1.20.12 GoogleAnalyticsAdminListAccountsOperator אופרטור החלפה לא זמין
GoogleAnalyticsGetAdsLinkOperator זמין בגרסה 1.20.12 GoogleAnalyticsAdminGetGoogleAdsLinkOperator אופרטור החלפה לא זמין
GoogleAnalyticsRetrieveAdsLinksListOperator זמין בגרסה 1.20.12 GoogleAnalyticsAdminListGoogleAdsLinksOperator אופרטור החלפה לא זמין
GoogleAnalyticsDataImportUploadOperator זמין בגרסה 1.20.12 GoogleAnalyticsAdminCreateDataStreamOperator אופרטור החלפה לא זמין
GoogleAnalyticsDeletePreviousDataUploadsOperator זמין בגרסה 1.20.12 GoogleAnalyticsAdminDeleteDataStreamOperator אופרטור החלפה לא זמין
DataPipelineHook זמין בגרסה 1.20.12 DataflowHook אופרטור החלפה לא זמין
CreateDataPipelineOperator זמין בגרסה 1.20.12 DataflowCreatePipelineOperator אופרטור החלפה לא זמין
RunDataPipelineOperator זמין בגרסה 1.20.12 DataflowRunPipelineOperator אופרטור החלפה לא זמין
AutoMLDatasetLink זמין בגרסה 1.20.12 TranslationLegacyDatasetLink אופרטור החלפה לא זמין
AutoMLDatasetListLink זמין בגרסה 1.20.12 TranslationDatasetListLink אופרטור החלפה לא זמין
AutoMLModelLink זמין בגרסה 1.20.12 TranslationLegacyModelLink אופרטור החלפה לא זמין
AutoMLModelTrainLink זמין בגרסה 1.20.12 TranslationLegacyModelTrainLink אופרטור החלפה לא זמין
AutoMLModelPredictLink זמין בגרסה 1.20.12 TranslationLegacyModelPredictLink אופרטור החלפה לא זמין
AutoMLBatchPredictOperator זמין בגרסה 1.20.12 vertex_ai.batch_prediction_job אופרטור החלפה לא זמין
AutoMLPredictOperator זמין בגרסה 1.20.12 vertex_aigenerative_model. TextGenerationModelPredictOperator, translate.TranslateTextOperator אופרטור החלפה לא זמין
PromptLanguageModelOperator זמין בגרסה 1.20.12 TextGenerationModelPredictOperator אופרטור החלפה לא זמין
GenerateTextEmbeddingsOperator זמין בגרסה 1.20.12 TextEmbeddingModelGetEmbeddingsOperator אופרטור החלפה לא זמין
PromptMultimodalModelOperator זמין בגרסה 1.20.12 GenerativeModelGenerateContentOperator אופרטור החלפה לא זמין
PromptMultimodalModelWithMediaOperator זמין בגרסה 1.20.12 GenerativeModelGenerateContentOperator אופרטור החלפה לא זמין
DataflowStartSqlJobOperator זמין בגרסה 1.20.12 DataflowStartYamlJobOperator אופרטור החלפה לא זמין
LifeSciencesHook זמין בגרסה 1.20.12 ה-hook של Google Cloud Batch Operators יפורסם בקרוב
DataprocScaleClusterOperator זמין בגרסה 1.20.12 DataprocUpdateClusterOperator יפורסם בקרוב
MLEngineStartBatchPredictionJobOperator זמין בגרסה 1.20.12 CreateBatchPredictionJobOperator יפורסם בקרוב
MLEngineManageModelOperator זמין בגרסה 1.20.12 MLEngineCreateModelOperator, MLEngineGetModelOperator יפורסם בקרוב
MLEngineGetModelOperator זמין בגרסה 1.20.12 GetModelOperator יפורסם בקרוב
MLEngineDeleteModelOperator זמין בגרסה 1.20.12 DeleteModelOperator יפורסם בקרוב
MLEngineManageVersionOperator זמין בגרסה 1.20.12 MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion יפורסם בקרוב
MLEngineCreateVersionOperator זמין בגרסה 1.20.12 פרמטר parent_model לאופרטורים של VertexAI יפורסם בקרוב
MLEngineSetDefaultVersionOperator זמין בגרסה 1.20.12 SetDefaultVersionOnModelOperator יפורסם בקרוב
MLEngineListVersionsOperator זמין בגרסה 1.20.12 ListModelVersionsOperator יפורסם בקרוב
MLEngineDeleteVersionOperator זמין בגרסה 1.20.12 DeleteModelVersionOperator יפורסם בקרוב
MLEngineStartTrainingJobOperator זמין בגרסה 1.20.12 CreateCustomPythonPackageTrainingJobOperator יפורסם בקרוב
MLEngineTrainingCancelJobOperator זמין בגרסה 1.20.12 CancelCustomTrainingJobOperator יפורסם בקרוב
LifeSciencesRunPipelineOperator זמין בגרסה 1.20.12 Google Cloud Batch Operators יפורסם בקרוב
MLEngineCreateModelOperator זמין בגרסה 1.20.12 אופרטור VertexAI תואם יפורסם בקרוב

המאמרים הבאים