Flusso di lavoro con Cloud Composer

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi Google Cloud utenti potrebbero avere diritto a una prova senza costi.

Prima di iniziare

Configura il progetto

  1. Accedi al tuo Google Cloud account. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. Installa Google Cloud CLI.

  6. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  7. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. Installa Google Cloud CLI.

  12. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  13. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init

Crea un modello di workflow Dataproc

Copia ed esegui i seguenti comandi in una finestra del terminale locale o in Cloud Shell per creare e definire un modello di workflow.

  1. Crea il modello di workflow sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Aggiungi il job Spark al modello di workflow sparkpi. Il flag "compute" step-id identifica il job SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Utilizza un cluster gestito, a nodo singolo per eseguire il workflow. Dataproc creerà il cluster, eseguirà il workflow al suo interno e poi lo eliminerà al termine del workflow.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Conferma la creazione del modello di workflow.

    Console

    Fai clic sul nome sparkpi nella pagina Workflow di Dataproc nella console per aprire la pagina Dettagli modello di workflow. Google Cloud Fai clic sul nome del tuo modello di workflow per confermare gli attributi del modello sparkpi.

    Comando g-cloud

    Esegui questo comando:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Crea e carica un DAG in Cloud Storage

  1. Crea o utilizza un ambiente Cloud Composer esistente.
  2. Imposta le variabili di ambiente.

    UI di Airflow

    1. Nella barra degli strumenti, fai clic su Amministrazione > Variabili.
    2. Fai clic su Crea.
    3. Inserisci le seguenti informazioni:
      • Chiave:project_id
      • Valore: PROJECT_ID — il tuo Google Cloud ID progetto
    4. Fai clic su Salva.

    Comando g-cloud

    Inserisci i seguenti comandi:

    • ENVIRONMENT è il nome dell'ambiente Cloud Composer
    • LOCATION è la regione in cui si trova l'ambiente Cloud Composer
    • PROJECT_ID è l'ID progetto del progetto che contiene l'ambiente Cloud Composer
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Copia il seguente codice DAG localmente in un file denominato "composer-dataproc-dag.py", che utilizza il DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Carica il DAG nella cartella dell'ambiente in Cloud Storage. Al termine del caricamento, fai clic sul link Cartella DAG nella pagina dell'ambiente Cloud Composer.

Visualizza lo stato di un'attività

UI di Airflow

  1. Apri l'interfaccia web di Airflow.
  2. Nella pagina DAG, fai clic sul nome del DAG (ad esempio dataproc_workflow_dag).
  3. Nella pagina dei dettagli dei DAG, fai clic su Visualizzazione grafico.
  4. Controlla lo stato:
    • Non riuscito: l'attività ha una casella rossa intorno. Puoi anche tenere il puntatore sopra l'attività e cercare Stato: Non riuscito. L'attività è racchiusa in un riquadro rosso, a indicare che non è riuscita
    • Riuscito: l'attività ha una casella verde intorno. Puoi anche tenere il puntatore sopra l'attività e cercare Stato: Riuscito. L'attività è racchiusa in un riquadro verde, a indicare che è stata completata correttamente

Console

Fai clic sulla scheda Workflows per visualizzare lo stato del workflow.

Comando g-cloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Libera spazio

Per evitare che al tuo Google Cloud account vengano addebitati costi, puoi eliminare le risorse utilizzate in questo tutorial:

  1. Elimina l'ambiente Cloud Composer.

  2. Elimina il modello di workflow.

Passaggi successivi