תהליך עבודה באמצעות Managed Airflow

במסמך הזה משתמשים ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:

  • Managed Service for Apache Spark
  • Compute Engine
  • Managed Airflow

כדי להעריך את ההוצאות בהתאם לתחזית השימוש שלכם, אתם יכולים להיעזר במחשבון העלויות.

משתמשים חדשים של Google Cloud ? יכול להיות שאתם זכאים לתקופת ניסיון בחינם.

לפני שמתחילים

הגדרת הפרויקט

  1. נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Managed Airflow APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. התקינו את ה-CLI של Google Cloud.

  6. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  7. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Managed Airflow APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. התקינו את ה-CLI של Google Cloud.

  12. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  13. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init

יצירת תבנית של תהליך עבודה ב-Managed Service for Apache Spark

מעתיקים ומריצים את הפקודות הבאות בחלון טרמינל מקומי או ב-Cloud Shell כדי ליצור ולהגדיר תבנית של זרימת עבודה.

  1. יוצרים את תבנית זרימת העבודה sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. מוסיפים את עבודת ה-Spark לתבנית של תהליך העבודה sparkpi. הדגל step-id compute מזהה את עבודת SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. כדי להפעיל את תהליך העבודה, צריך להשתמש באשכול מנוהל עם צומת יחיד. השירות המנוהל ל-Apache Spark ייצור את האשכול, יפעיל בו את תהליך העבודה ואז ימחק את האשכול כשתהליך העבודה יסתיים.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. מאשרים את יצירת תבנית תהליך העבודה.

    המסוף

    לוחצים על השם של sparkpi בדף Workflows של Managed Service for Apache Spark במסוף Google Cloud כדי לפתוח את הדף Workflow template details. לוחצים על השם של תבנית זרימת העבודה כדי לאשר את sparkpi מאפייני התבנית.

    פקודת gcloud

    מריצים את הפקודה הבאה:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

יצירה והעלאה של DAG ל-Cloud Storage

  1. יוצרים סביבת Managed Airflow או משתמשים בסביבה קיימת.
  2. מגדירים משתני סביבה.

    ממשק המשתמש של Airflow

    1. בסרגל הכלים, לוחצים על Admin > Variables (אדמין > משתנים).
    2. לוחצים על יצירה.
    3. מזינים את הפרטים הבאים:
      • מקש:project_id
      • ‫Val: PROJECT_ID – מזהה הפרויקט ב- Google Cloud
    4. לוחצים על Save.

    פקודת gcloud

    מזינים את הפקודות הבאות:

    • ENVIRONMENT הוא שם סביבת Managed Airflow
    • LOCATION הוא האזור שבו נמצאת סביבת Managed Airflow
    • PROJECT_ID הוא מזהה הפרויקט של הפרויקט שמכיל את סביבת Managed Airflow
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. מעתיקים את קוד ה-DAG הבא באופן מקומי לקובץ בשם composer-dataproc-dag.py, שמשתמש ב-DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. מעלים את ה-DAG לתיקיית הסביבה ב-Cloud Storage. אחרי שההעלאה תסתיים בהצלחה, לוחצים על הקישור DAGs Folder בדף Managed Airflow Environment.

צפייה בסטטוס של משימה

ממשק המשתמש של Airflow

  1. פותחים את ממשק האינטרנט של Airflow.
  2. בדף DAGs, לוחצים על שם ה-DAG (לדוגמה, dataproc_workflow_dag).
  3. בדף הפרטים של DAG, לוחצים על תצוגת גרף.
  4. בדיקת הסטטוס:
    • נכשל: יש תיבה אדומה מסביב למשימה. אפשר גם להעביר את מצביע העכבר מעל המשימה ולחפש את הסטטוס: נכשל. המשימה מוקפת בתיבה אדומה, שמציינת שהיא נכשלה
    • הצלחה: המשימה מוקפת בתיבה ירוקה. אפשר גם להעביר את מצביע העכבר מעל המשימה ולבדוק אם מופיע State: Success (מצב: הצלחה). המשימה מוקפת בתיבה ירוקה, שמציינת שהיא הושלמה בהצלחה

המסוף

לוחצים על הכרטיסייה Workflows (תהליכי עבודה) כדי לראות את הסטטוס של תהליך העבודה.

פקודת gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

הסרת המשאבים

כדי להימנע מחיובים בחשבון Google Cloud , אפשר למחוק את המשאבים שבהם השתמשתם במדריך הזה:

  1. מחיקת סביבת Managed Airflow

  2. מחיקת תבנית של תהליך עבודה

המאמרים הבאים