Alur kerja menggunakan Cloud Composer

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

  • Managed Service for Apache Spark
  • Compute Engine
  • Cloud Composer

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Menyiapkan project

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. Instal Google Cloud CLI.

  6. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  7. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. Instal Google Cloud CLI.

  12. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  13. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init

Buat template alur kerja Managed Service for Apache Spark

Salin dan jalankan perintah berikut di jendela terminal lokal atau di Cloud Shell untuk membuat dan menentukan template alur kerja.

  1. Buat template alur kerja sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Tambahkan tugas spark ke template alur kerja sparkpi. Flag "compute" step-id mengidentifikasi tugas SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Gunakan cluster terkelola, node tunggal untuk menjalankan alur kerja. Managed Service for Apache Spark akan membuat cluster, menjalankan alur kerja di cluster tersebut, lalu menghapus cluster saat alur kerja selesai.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Konfirmasi pembuatan template alur kerja.

    Konsol

    Klik nama sparkpi di halaman Managed Service for Apache Spark Workflows di konsol Google Cloud untuk membuka halaman Workflow template details. Klik nama template alur kerja Anda untuk mengonfirmasi atribut template sparkpi.

    Perintah gcloud

    Jalankan perintah berikut:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Membuat dan mengupload DAG ke Cloud Storage

  1. Buat atau gunakan lingkungan Cloud Composer yang sudah ada.
  2. Menetapkan variabel lingkungan.

    UI Airflow

    1. Di toolbar, klik Admin > Variabel.
    2. Klik Create.
    3. Masukkan informasi berikut:
      • Kunci:project_id
      • Val: PROJECT_ID — project ID Google Cloud Anda
    4. Klik Simpan.

    Perintah gcloud

    Masukkan perintah berikut:

    • ENVIRONMENT adalah nama lingkungan Cloud Composer
    • LOCATION adalah region tempat lingkungan Cloud Composer berada
    • PROJECT_ID adalah project ID untuk project yang berisi lingkungan Cloud Composer
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Salin kode DAG berikut secara lokal ke dalam file berjudul "composer-dataproc-dag.py", yang menggunakan DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Aliran udara 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Upload DAG ke folder lingkungan Anda di Cloud Storage. Setelah upload berhasil diselesaikan, klik link Folder DAG di halaman Lingkungan Cloud Composer.

Melihat status tugas

UI Airflow

  1. Buka antarmuka web Airflow.
  2. Di halaman DAG, klik nama DAG (misalnya, dataproc_workflow_dag).
  3. Di halaman DAGs Details, klik Graph View.
  4. Periksa status:
    • Gagal: Tugas memiliki kotak merah di sekelilingnya. Anda juga dapat menahan kursor di tugas dan mencari Status: Gagal. tugas memiliki kotak merah di sekelilingnya, yang menunjukkan bahwa tugas tersebut gagal
    • Berhasil: Tugas memiliki kotak hijau di sekitarnya. Anda juga dapat mengarahkan kursor ke tugas dan memeriksa Status: Berhasil. tugas memiliki kotak hijau di sekitarnya, yang menunjukkan bahwa tugas telah berhasil

Konsol

Klik tab Alur Kerja untuk melihat status alur kerja.

Perintah gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Pembersihan

Agar akun Google Cloud Anda tidak dikenai biaya, Anda dapat menghapus resource yang digunakan dalam tutorial ini:

  1. Hapus lingkungan Cloud Composer.

  2. Hapus template alur kerja.

Langkah berikutnya