Ejecutar un flujo de trabajo que ejecute otros flujos de trabajo en paralelo

En este tutorial se explica cómo crear y ejecutar un flujo de trabajo principal que ejecute varios flujos de trabajo secundarios en paralelo.

En el siguiente diagrama, se invocan cuatro ejecuciones paralelas del flujo de trabajo secundario. De esta forma, el flujo de trabajo principal puede procesar datos en ramas paralelas, lo que reduce el tiempo de ejecución general. El flujo de trabajo principal espera a que finalicen todas las ejecuciones del flujo de trabajo secundario antes de devolver un resumen de las ejecuciones correctas y fallidas, lo que simplifica la detección de errores.

Flujo de trabajo principal que invoca iteraciones paralelas de un flujo de trabajo secundario

Crear y desplegar un flujo de trabajo secundario

Un flujo de trabajo secundario puede recibir y procesar datos de un flujo de trabajo principal. El flujo de trabajo secundario lo demuestra haciendo lo siguiente:

  • Recibe un número entero como argumento.
  • Espera 10 segundos para simular algún procesamiento
  • Devuelve un indicador (basado en si el número entero es par o impar) para simular el éxito o el fracaso de la ejecución del flujo de trabajo.

Consola

  1. En la Google Cloud consola, ve a la página Flujos de trabajo.

    Ir a Workflows

  2. Haz clic en Crear.

  3. Introduce el nombre, workflow-child, del nuevo flujo de trabajo.

  4. En la lista Región, selecciona us-central1.

  5. Selecciona la cuenta de servicio que has creado anteriormente.

  6. Haz clic en Siguiente.

  7. En el editor del flujo de trabajo, introduce la siguiente definición para tu flujo de trabajo:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - check_iteration_even_or_odd:
            switch:
              - condition: ${iteration % 2 == 0}
                next: raise_error
        - return_message:
            return: ${"Hello world"+iteration}
        - raise_error:
            raise: ${"Error with iteration "+iteration}
  8. Haz clic en Desplegar.

gcloud

  1. Crea un archivo de código fuente para tu flujo de trabajo:

    touch workflow-child.yaml
  2. Abre el archivo de código fuente en un editor de texto y copia el siguiente flujo de trabajo en el archivo.

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - check_iteration_even_or_odd:
            switch:
              - condition: ${iteration % 2 == 0}
                next: raise_error
        - return_message:
            return: ${"Hello world"+iteration}
        - raise_error:
            raise: ${"Error with iteration "+iteration}
  3. Despliega el flujo de trabajo:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Sustituye SERVICE_ACCOUNT_NAME por el nombre de la cuenta de servicio que has creado anteriormente.

Crear y desplegar el flujo de trabajo principal

El flujo de trabajo principal ejecuta varias ramas del flujo de trabajo secundario mediante un bucle for paralelo.

  1. Copia el código fuente de la definición del flujo de trabajo. Consta de las siguientes partes:

    1. Se usa un mapa para almacenar los resultados de las ejecuciones del flujo de trabajo secundario. Para obtener más información, consulta Mapas.

      main:
        steps:
          - init:
              assign:
                - execution_results: {} # results from each execution
                - execution_results.success: {} # successful executions saved under 'success' key
                - execution_results.failure: {} # failed executions saved under 'failure' key
    2. Se ejecuta un bucle for en paralelo para invocar el flujo de trabajo secundario. Para obtener más información, consulta Pasos paralelos e Iteración.

      - execute_child_workflows:
          parallel:
            shared: [execution_results]
            for:
              value: iteration
              in: [1, 2, 3, 4]
              steps:
                  - iterate:
    3. El flujo de trabajo secundario se invoca mediante un conector. En cada iteración del flujo de trabajo secundario se pasa el argumento iteration. El flujo de trabajo principal espera y almacena el resultado de cada ejecución del flujo de trabajo secundario. Para obtener más información, consulta el conector de la API Executions de Workflows y los argumentos de tiempo de ejecución.

      try:
        steps:
          - execute_child_workflow:
              call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
              args:
                workflow_id: workflow-child
                #location: ...
                #project_id: ...
                argument:
                  iteration: ${iteration}
              result: execution_result
          - save_successful_execution:
              assign:
                - execution_results.success[string(iteration)]: ${execution_result}
      except:
          as: e
          steps:
            - save_failed_execution:
                assign:
                  - execution_results.failure[string(iteration)]: ${e}
    4. Se devuelven los resultados de la ejecución. Para obtener más información, consulta Completar la ejecución de un flujo de trabajo.

      - return_execution_results:
          return: ${execution_results}
  2. Despliega el flujo de trabajo:

    Consola

    1. En la Google Cloud consola, ve a la página Flujos de trabajo:

      Ir a Workflows

    2. Haz clic en Crear.

    3. Introduce el nombre, workflow-parent, del nuevo flujo de trabajo.

    4. En la lista Región, selecciona us-central1.

    5. Selecciona la cuenta de servicio que has creado anteriormente.

    6. Haz clic en Siguiente.

    7. En el editor de flujos de trabajo, pega la definición del flujo de trabajo principal.

    8. Haz clic en Desplegar.

    gcloud

    1. Crea un archivo de código fuente para tu flujo de trabajo:

      touch workflow-parent.yaml
    2. Abre el archivo de código fuente en un editor de texto y pega la definición del flujo de trabajo principal.

    3. Despliega el flujo de trabajo:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

      Sustituye SERVICE_ACCOUNT_NAME por el nombre de la cuenta de servicio que has creado anteriormente.

Ejecutar el flujo de trabajo principal

Ejecuta el flujo de trabajo principal para que las invocaciones del flujo de trabajo secundario se ejecuten en paralelo. Las ejecuciones deberían tardar unos 10 segundos en completarse.

Consola

  1. En la Google Cloud consola, ve a la página Flujos de trabajo:

    Ir a Workflows

  2. En la página Flujos de trabajo, haz clic en el flujo de trabajo workflow-parent para ir a su página de detalles.

  3. En la página Detalles del flujo de trabajo, haz clic en Ejecutar.

  4. Vuelve a hacer clic en Ejecutar.

  5. Consulta los resultados del flujo de trabajo en el panel Información.

    Los resultados deberían ser similares a los siguientes, que indican errores en las iteraciones 2 y 4, y que las iteraciones 1 y 3 se han completado correctamente.

    "failure": {
      "2": {
        "message": "Execution failed or cancelled.",
        "operation": {
          "argument": "{\"iteration\":2}",
          "duration": "10.157992541s",
          "endTime": "2023-07-11T13:13:13.028424329Z",
          "error": {
            "context": "RuntimeError: \"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18",
            "payload": "\"Error with iteration 2\"",
    ...
      "4": {
        "message": "Execution failed or cancelled.",
        "operation": {
          "argument": "{\"iteration\":4}",
          "duration": "10.157929734s",
          "endTime": "2023-07-11T13:13:13.061289142Z",
          "error": {
            "context": "RuntimeError: \"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18",
            "payload": "\"Error with iteration 4\"",
    ...
    "success": {
      "1": "Hello world1",
      "3": "Hello world3"

gcloud

Ejecuta el flujo de trabajo:

gcloud workflows run workflow-parent \
    --location=us-central1

Los resultados deberían ser similares a los siguientes, que indican errores en las iteraciones 2 y 4, y que las iteraciones 1 y 3 se han completado correctamente.

Waiting for execution [06c753e4-6947-4c62-ac0b-2a9d53fb1b8f] to complete...done.
argument: 'null'
duration: 14.065415004s
endTime: '2023-07-11T12:50:43.929023883Z'
name: projects/386837416586/locations/us-central1/workflows/workflow-parent/executions/06c753e4-6947-4c62-ac0b-2a9d53fb1b8f
result: '{"failure":{"2":{"message":"Execution failed or cancelled.","operation":{"argument":"{\"iteration\":2}","duration":"10.143718070s","endTime":"2023-07-11T12:50:40.673209821Z","error":{"context":"RuntimeError:
...
"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18","payload":"\"Error
...
"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18","payload":"\"Error
...
"success":{"1":"Hello world1","3":"Hello world3"}}'
startTime: '2023-07-11T12:50:29.863608879Z'
state: SUCCEEDED

Has creado y desplegado correctamente un flujo de trabajo que invoca un flujo de trabajo secundario, ejecuta cuatro iteraciones del flujo de trabajo secundario en ramas paralelas y devuelve un indicador de éxito o error para cada ejecución del flujo de trabajo secundario.