Ejecuta un flujo de trabajo que ejecute otros flujos de trabajo en paralelo

En este instructivo, se muestra cómo crear y ejecutar un flujo de trabajo principal que ejecuta varios flujos de trabajo secundarios en paralelo.

En el siguiente diagrama, se invocan cuatro ejecuciones paralelas del flujo de trabajo secundario. Esto permite que el flujo de trabajo principal procese datos en ramas paralelas y reduce el tiempo de ejecución general. El flujo de trabajo principal espera a que finalicen todas las ejecuciones del flujo de trabajo secundario antes de mostrar un resumen de las ejecuciones exitosas y fallidas, lo que simplifica la detección de errores.

Flujo de trabajo principal que invoca iteraciones paralelas de un flujo de trabajo secundario

Crea e implementa un flujo de trabajo secundario

Un flujo de trabajo secundario puede recibir y procesar datos de un flujo de trabajo principal. El flujo de trabajo secundario demuestra esto haciendo lo siguiente:

  • Recibe un número entero como argumento.
  • Suspende la ejecución durante 10 segundos para simular algún procesamiento.
  • Devuelve un indicador (según si el número entero es par o impar) para simular el éxito o el fracaso de la ejecución del flujo de trabajo.

Console

  1. En la consola de Google Cloud , ve a la página Recomendaciones.

    Ir a Workflows

  2. Haz clic en  Crear.

  3. Ingresa el nombre workflow-child para el nuevo flujo de trabajo.

  4. En la lista Región, selecciona us-central1.

  5. Selecciona la cuenta de servicio que creaste anteriormente.

  6. Haz clic en Siguiente.

  7. En el editor de flujos de trabajo, ingresa la siguiente definición para el flujo de trabajo:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - check_iteration_even_or_odd:
            switch:
              - condition: ${iteration % 2 == 0}
                next: raise_error
        - return_message:
            return: ${"Hello world"+iteration}
        - raise_error:
            raise: ${"Error with iteration "+iteration}
  8. Haz clic en Implementar.

gcloud

  1. Crea un archivo de código fuente para tu flujo de trabajo:

    touch workflow-child.yaml
  2. Abre el archivo de código fuente en un editor de texto y copia el siguiente flujo de trabajo en el archivo.

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - check_iteration_even_or_odd:
            switch:
              - condition: ${iteration % 2 == 0}
                next: raise_error
        - return_message:
            return: ${"Hello world"+iteration}
        - raise_error:
            raise: ${"Error with iteration "+iteration}
  3. Implementa el flujo de trabajo:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Reemplaza SERVICE_ACCOUNT_NAME por el nombre de la cuenta de servicio que creaste anteriormente.

Crea e implementa el flujo de trabajo principal

El flujo de trabajo principal ejecuta varias ramas del flujo de trabajo secundario con un bucle for paralelo.

  1. Copia el código fuente de la definición del flujo de trabajo. Consta de las siguientes partes:

    1. Se usa un mapa para almacenar los resultados de las ejecuciones del flujo de trabajo secundario. Para obtener más información, consulta Maps.

      main:
        steps:
          - init:
              assign:
                - execution_results: {} # results from each execution
                - execution_results.success: {} # successful executions saved under 'success' key
                - execution_results.failure: {} # failed executions saved under 'failure' key
    2. Se ejecuta un bucle for en paralelo para invocar el flujo de trabajo secundario. Para obtener más información, consulta Pasos paralelos y Iteración.

      - execute_child_workflows:
          parallel:
            shared: [execution_results]
            for:
              value: iteration
              in: [1, 2, 3, 4]
              steps:
                  - iterate:
    3. El flujo de trabajo secundario se invoca con un conector. Cada iteración del flujo de trabajo secundario recibe el argumento iteration. El flujo de trabajo principal espera y almacena el resultado de cada ejecución del flujo de trabajo secundario. Para obtener más información, consulta el conector de la API de Workflows Executions y los argumentos de tiempo de ejecución.

      try:
        steps:
          - execute_child_workflow:
              call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
              args:
                workflow_id: workflow-child
                #location: ...
                #project_id: ...
                argument:
                  iteration: ${iteration}
              result: execution_result
          - save_successful_execution:
              assign:
                - execution_results.success[string(iteration)]: ${execution_result}
      except:
          as: e
          steps:
            - save_failed_execution:
                assign:
                  - execution_results.failure[string(iteration)]: ${e}
    4. Se devuelven los resultados de la ejecución. Para obtener más información, consulta Cómo completar la ejecución de un flujo de trabajo.

      - return_execution_results:
          return: ${execution_results}
  2. Implementa el flujo de trabajo:

    Console

    1. En la consola de Google Cloud , ve a la página Workflows:

      Ir a Workflows

    2. Haz clic en  Crear.

    3. Ingresa el nombre workflow-parent para el nuevo flujo de trabajo.

    4. En la lista Región, selecciona us-central1.

    5. Selecciona la cuenta de servicio que creaste anteriormente.

    6. Haz clic en Siguiente.

    7. En el editor de flujos de trabajo, pega la definición del flujo de trabajo principal.

    8. Haz clic en Implementar.

    gcloud

    1. Crea un archivo de código fuente para tu flujo de trabajo:

      touch workflow-parent.yaml
    2. Abre el archivo de código fuente en un editor de texto y pega la definición del flujo de trabajo principal.

    3. Implementa el flujo de trabajo:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

      Reemplaza SERVICE_ACCOUNT_NAME por el nombre de la cuenta de servicio que creaste anteriormente.

Ejecuta el flujo de trabajo principal

Ejecuta el flujo de trabajo principal para que las invocaciones del flujo de trabajo secundario se ejecuten en paralelo. Las ejecuciones deberían tardar alrededor de 10 segundos en completarse.

Console

  1. En la consola de Google Cloud , ve a la página Workflows:

    Ir a Workflows

  2. En la página Flujos de trabajo, haz clic en el flujo de trabajo workflow-parent para ir a su página de detalles.

  3. En la página Detalles del flujo de trabajo, haz clic en Ejecutar.

  4. Haz clic de nuevo en Ejecutar.

  5. Consulta los resultados del flujo de trabajo en el panel Output.

    Los resultados deberían ser similares a los siguientes, lo que indica errores en las iteraciones 2 y 4, y éxito en las iteraciones 1 y 3.

    "failure": {
      "2": {
        "message": "Execution failed or cancelled.",
        "operation": {
          "argument": "{\"iteration\":2}",
          "duration": "10.157992541s",
          "endTime": "2023-07-11T13:13:13.028424329Z",
          "error": {
            "context": "RuntimeError: \"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18",
            "payload": "\"Error with iteration 2\"",
    ...
      "4": {
        "message": "Execution failed or cancelled.",
        "operation": {
          "argument": "{\"iteration\":4}",
          "duration": "10.157929734s",
          "endTime": "2023-07-11T13:13:13.061289142Z",
          "error": {
            "context": "RuntimeError: \"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18",
            "payload": "\"Error with iteration 4\"",
    ...
    "success": {
      "1": "Hello world1",
      "3": "Hello world3"

gcloud

Ejecuta el flujo de trabajo:

gcloud workflows run workflow-parent \
    --location=us-central1

Los resultados deberían ser similares a los siguientes, lo que indica errores en las iteraciones 2 y 4, y éxito en las iteraciones 1 y 3.

Waiting for execution [06c753e4-6947-4c62-ac0b-2a9d53fb1b8f] to complete...done.
argument: 'null'
duration: 14.065415004s
endTime: '2023-07-11T12:50:43.929023883Z'
name: projects/386837416586/locations/us-central1/workflows/workflow-parent/executions/06c753e4-6947-4c62-ac0b-2a9d53fb1b8f
result: '{"failure":{"2":{"message":"Execution failed or cancelled.","operation":{"argument":"{\"iteration\":2}","duration":"10.143718070s","endTime":"2023-07-11T12:50:40.673209821Z","error":{"context":"RuntimeError:
...
"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18","payload":"\"Error
...
"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18","payload":"\"Error
...
"success":{"1":"Hello world1","3":"Hello world3"}}'
startTime: '2023-07-11T12:50:29.863608879Z'
state: SUCCEEDED

Creaste y, luego, implementaste correctamente un flujo de trabajo que invoca un flujo de trabajo secundario, ejecuta cuatro iteraciones del flujo de trabajo secundario en ramas paralelas y devuelve un indicador de éxito o falla para cada ejecución del flujo de trabajo secundario.