Managed Airflow (3ª gen.) | Managed Airflow (2ª gen.) | Managed Airflow (1ª gen. heredada)
En esta página, se describe cómo usar Managed Airflow (2ª gen.) para ejecutar cargas de trabajo de Managed Service para Apache Spark en Google Cloud.
En los ejemplos de las siguientes secciones, se muestra cómo usar operadores para administrar las cargas de trabajo por lotes de Managed Service para Apache Spark. Usas estos operadores en DAG que crean, borran, enumeran y obtienen una carga de trabajo por lotes de Managed Service para Apache Spark:
Crea DAG para operadores que funcionan con cargas de trabajo por lotes de Managed Service para Apache Spark:
Crea DAG que usen contenedores personalizados y Dataproc Metastore.
Configura el servidor de historial persistente para estos DAG.
Antes de comenzar
Habilita la API de Dataproc:
Console
Habilita la API de Managed Service para Apache Spark.
Roles necesarios para habilitar las APIs
Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (
roles/serviceusage.serviceUsageAdmin), que contiene el permisoserviceusage.services.enable. Obtén más información para otorgar roles.gcloud
Habilita la API de Managed Service para Apache Spark:
Roles necesarios para habilitar las APIs
Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (
roles/serviceusage.serviceUsageAdmin), que contiene elserviceusage.services.enablepermiso. Obtén más información para otorgar roles.gcloud services enable dataproc.googleapis.com
Selecciona la ubicación del archivo de carga de trabajo por lotes. Puedes usar cualquiera de las siguientes opciones:
- Crea un bucket de Cloud Storage que almacene este archivo.
- Usa el bucket de tu entorno. Como no necesitas sincronizar este archivo con Airflow, puedes crear una subcarpeta independiente fuera de las carpetas
/dagso/data. Por ejemplo,/batches. - Usa un bucket existente.
Configura archivos y variables de Airflow
En esta sección, se muestra cómo configurar archivos y variables de Airflow para este instructivo.
Sube un archivo de carga de trabajo de ML de Managed Service para Apache Spark a un bucket
La carga de trabajo de este instructivo ejecuta una secuencia de comandos de pyspark:
Guarda cualquier secuencia de comandos de pyspark en un archivo local llamado
spark-job.py. Por ejemplo, puedes usar la secuencia de comandos de pyspark de muestra.Sube el archivo a la ubicación que seleccionaste en Antes de comenzar.
Configura variables de Airflow
En los ejemplos de las siguientes secciones, se usan variables de Airflow. Estableces valores para estas variables en Airflow y, luego, el código DAG puede acceder a estos valores.
En los ejemplos de este instructivo, se usan las siguientes variables de Airflow. Puedes configurarlas según sea necesario, según el ejemplo que uses.
Configura las siguientes variables de Airflow para usarlas en tu código DAG:
project_id: ID del proyecto.bucket_name: Es el URI de un bucket en el que se encuentra el archivo principal de Python de la carga de trabajo (spark-job.py). Seleccionaste esta ubicación en Antes de comenzar.phs_cluster: Es el nombre del clúster del servidor de historial persistente. Configuraste esta variable cuando creaste un servidor de historial persistente.image_name: Es el nombre y la etiqueta de la imagen de contenedor personalizada (image:tag). Configuraste esta variable cuando usaste la imagen de contenedor personalizada con DataprocCreateBatchOperator.metastore_cluster: Es el nombre del servicio de Dataproc Metastore. Configuraste esta variable cuando usaste el servicio de Dataproc Metastore con DataprocCreateBatchOperator.region_name: Es la región en la que se encuentra el servicio de Dataproc Metastore. Configuraste esta variable cuando usaste el servicio de Dataproc Metastore con DataprocCreateBatchOperator.
Usa la Google Cloud consola y la IU de Airflow para configurar cada variable de Airflow
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el vínculo Airflow de tu entorno. Se abrirá la IU de Airflow.
En la IU de Airflow, selecciona Administrador > Variables.
Haz clic en Agregar un registro nuevo.
Especifica el nombre de la variable en el campo Clave y establece su valor en el campo Val.
Haz clic en Guardar.
Crea un servidor de historial persistente
Usa un servidor de historial persistente (PHS) para ver los archivos de historial de Spark de tus cargas de trabajo por lotes:
- Crea un servidor de historial persistente.
- Asegúrate de haber especificado el nombre del clúster de PHS en la
phs_clustervariable de Airflow.
DataprocCreateBatchOperator
El siguiente DAG inicia una carga de trabajo por lotes de Managed Service para Apache Spark.
Para obtener más información sobre los argumentos de DataprocCreateBatchOperator, consulta
el código fuente del operador.
Para obtener más información sobre los atributos que puedes pasar en el batch
parámetro de DataprocCreateBatchOperator, consulta la
descripción de la clase Batch.
Usa una imagen de contenedor personalizada con DataprocCreateBatchOperator
En el siguiente ejemplo, se muestra cómo usar una imagen de contenedor personalizada para ejecutar tus cargas de trabajo. Puedes usar un contenedor personalizado, por ejemplo, para agregar dependencias de Python que no proporcione la imagen de contenedor predeterminada.
Para usar una imagen de contenedor personalizada, haz lo siguiente:
Crea una imagen de contenedor personalizada y súbela a Container Registry.
Especifica la imagen en la
image_namevariable de Airflow.Usa DataprocCreateBatchOperator con tu imagen personalizada:
Usa el servicio de Dataproc Metastore con DataprocCreateBatchOperator
Para usar un servicio de Dataproc Metastore desde un DAG, haz lo siguiente:
Verifica que el servicio de Metastore ya se haya iniciado.
Para obtener información sobre cómo iniciar un servicio de Metastore, consulta Habilita y inhabilita Dataproc Metastore.
Para obtener información detallada sobre el operador Batch para crear la configuración, consulta PeripheralsConfig.
Una vez que el servicio de Metastore esté en funcionamiento, especifica su nombre en la variable
metastore_clustery su región en la variable de Airflowregion_name.Usa el servicio de Metastore en DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Puedes usar DataprocDeleteBatchOperator para borrar un lote según el ID de lote de la carga de trabajo.
DataprocListBatchesOperator
DataprocDeleteBatchOperator enumera los lotes que existen dentro de un project_id y una región determinados.
DataprocGetBatchOperator
DataprocGetBatchOperator recupera una carga de trabajo por lotes en particular.