Executar cargas de trabalho do Serviço Gerenciado para Apache Spark com o Airflow Gerenciado

Airflow gerenciado (Geração 3) | Airflow gerenciado (Geração 2) | Airflow gerenciado (Geração 1 legada)

Nesta página, descrevemos como usar o Airflow gerenciado (Geração 2) para executar cargas de trabalho do Serviço Gerenciado para Apache Spark em Google Cloud.

Os exemplos nas seções a seguir mostram como usar operadores para gerenciar cargas de trabalho em lote do Serviço Gerenciado para Apache Spark. Esses operadores são usados em DAGs que criam, excluem, listam e recebem uma carga de trabalho em lote do Serviço Gerenciado para Apache Spark:

Antes de começar

  1. Ative a API Dataproc:

    Console

    Ative a API do Serviço Gerenciado para Apache Spark.

    Funções necessárias para ativar APIs

    Para ativar as APIs, é necessário ter o papel do IAM de administrador de uso do serviço (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder papéis.

    Ativar a API

    gcloud

    Ative a API do Serviço Gerenciado para Apache Spark:

    Funções necessárias para ativar APIs

    Para ativar as APIs, é necessário ter o papel do IAM de administrador de Service Usage role (roles/serviceusage.serviceUsageAdmin), que contém a serviceusage.services.enable permissão. Saiba como conceder papéis.

    gcloud services enable dataproc.googleapis.com

  2. Selecione o local do arquivo de carga de trabalho em lote. É possível usar qualquer uma das seguintes opções:

    • Crie um bucket do Cloud Storage que armazene esse arquivo.
    • Use o bucket do ambiente. Como não é necessário sincronizar esse arquivo com o Airflow, é possível criar uma subpasta separada fora das pastas /dags ou /data. Por exemplo, /batches.
    • Use um bucket atual.

Configurar arquivos e variáveis do Airflow

Esta seção demonstra como configurar arquivos e variáveis do Airflow para este tutorial.

Fazer upload de um arquivo de carga de trabalho de ML do Serviço Gerenciado para Apache Spark para um bucket

A carga de trabalho neste tutorial executa um script do pyspark:

  1. Salve qualquer script do pyspark em um arquivo local chamado spark-job.py. Por exemplo, é possível usar o script de exemplo do pyspark.

  2. Faça upload do arquivo para o local selecionado em Antes de começar.

Definir variáveis do Airflow

Os exemplos nas seções a seguir usam variáveis do Airflow. Você define valores para essas variáveis no Airflow. Em seguida, o código DAG pode acessar esses valores.

Os exemplos neste tutorial usam as seguintes variáveis do Airflow. É possível defini-las conforme necessário, dependendo do exemplo usado.

Defina as seguintes variáveis do Airflow para uso no código DAG:

Use o Google Cloud console e a interface do Airflow para definir cada variável do Airflow

  1. No Google Cloud console, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no link Airflow do seu ambiente. A interface do Airflow é aberta.

  3. Na interface do Airflow, selecione Admin > Variables.

  4. Clique em Add a new record.

  5. Especifique o nome da variável no campo Key e defina o valor dela no campo Val.

  6. Clique em Salvar.

Criar um servidor de histórico permanente

Use um servidor de histórico permanente (PHS, na sigla em inglês) para conferir os arquivos de histórico do Spark das cargas de trabalho em lote:

  1. Criar um servidor de histórico permanente.
  2. Verifique se você especificou o nome do cluster do PHS na phs_cluster variável do Airflow.

DataprocCreateBatchOperator

O DAG a seguir inicia uma carga de trabalho em lote do Serviço Gerenciado para Apache Spark.

Para mais informações sobre os argumentos DataprocCreateBatchOperator, consulte o código-fonte do operador.

Para mais informações sobre os atributos que podem ser transmitidos no batch parâmetro de DataprocCreateBatchOperator, consulte a descrição da classe Batch.


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

Usar a imagem do contêiner personalizada com DataprocCreateBatchOperator

O exemplo a seguir mostra como usar uma imagem de contêiner personalizada para executar cargas de trabalho. É possível usar um contêiner personalizado, por exemplo, para adicionar dependências do Python não fornecidas pela imagem de contêiner padrão.

Para usar uma imagem de contêiner personalizada:

  1. Crie uma imagem de contêiner personalizada e faça upload dela para o Container Registry.

  2. Especifique a imagem na image_name variável do Airflow.

  3. Use DataprocCreateBatchOperator com sua imagem personalizada:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

Usar o serviço do metastore do Dataproc com DataprocCreateBatchOperator

Para usar um serviço Dataproc Metastore de um DAG:

  1. Verifique se o serviço do metastore já foi iniciado.

    Para saber como iniciar um serviço do metastore, consulte Ativar e desativar o metastore do Dataproc.

    Para informações detalhadas sobre o operador de lote para criar a configuração, consulte PeripheralsConfig.

  2. Depois que o serviço do metastore estiver em funcionamento, especifique o nome dele em a variável metastore_cluster e a região em region_name a variável do Airflow.

  3. Use o serviço do metastore no DataprocCreateBatchOperator:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

É possível usar DataprocDeleteBatchOperator para excluir um lote com base no ID da carga de trabalho.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator lista os lotes que existem em um determinado project_id e região.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator busca uma carga de trabalho em lote específica.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

A seguir