Usa una cola de Cloud Tasks para almacenar en búfer las ejecuciones de tu flujo de trabajo

En este instructivo, se muestra cómo crear una cola de Cloud Tasks que puede regular la tasa de ejecuciones de flujos de trabajo.

Existe una cantidad máxima de ejecuciones de flujos de trabajo activas que pueden ocurrir de forma simultánea. Una vez que se agota esta cuota, y si la acumulación de ejecuciones está inhabilitada o si se alcanza la cuota para las ejecuciones acumuladas, las ejecuciones nuevas fallan con un código de estado HTTP 429 Too many requests. Si habilitas una cola de Cloud Tasks para que ejecute flujos de trabajo secundarios a una velocidad que definas, puedes evitar problemas relacionados con la cuota de Workflows y lograr una mejor tasa de ejecución.

Ten en cuenta que Cloud Tasks está diseñado para proporcionar una entrega "al menos una vez"; sin embargo, Workflows no garantiza el procesamiento exactamente una vez de las solicitudes duplicadas de Cloud Tasks.

En el siguiente diagrama, un flujo de trabajo principal invoca flujos de trabajo secundarios que están regulados por una cola de Cloud Tasks a la que se le aplica una tasa de envío.

Flujo de trabajo principal que invoca iteraciones de un flujo de trabajo secundario a través de Cloud Tasks Queue

Crea una cola de Cloud Tasks

Crea una cola de Cloud Tasks que puedas usar en el flujo de trabajo principal y que te permita regular la tasa de ejecuciones del flujo de trabajo.

Console

  1. En la consola de Google Cloud , ve a la página Cloud Tasks:

    Ir a Cloud Tasks

  2. Haz clic en Crear cola de envío.

  3. Ingresa el Nombre de la cola, queue-workflow-child.

  4. En la lista Región, selecciona us-central1 (Iowa).

  5. Haz clic en Crear.

gcloud

QUEUE=queue-workflow-child
LOCATION=us-central1
gcloud tasks queues create $QUEUE --location=$LOCATION

Crea e implementa un flujo de trabajo secundario

Un flujo de trabajo secundario puede recibir y procesar datos de un flujo de trabajo principal. Crea e implementa un flujo de trabajo secundario que haga lo siguiente:

  • Recibe un iteration como argumento.
  • Suspende la ejecución durante 10 segundos para simular algún procesamiento.
  • Devuelve una cadena si la ejecución es exitosa.

Console

  1. En la consola de Google Cloud , ve a la página Recomendaciones.

    Ir a Workflows

  2. Haz clic en  Crear.

  3. Ingresa el nombre workflow-child para el nuevo flujo de trabajo.

  4. En la lista Región, selecciona us-central1 (Iowa).

  5. En la lista Cuenta de servicio, selecciona la cuenta de servicio predeterminada de Compute Engine.

  6. Haz clic en Siguiente.

  7. En el editor de flujos de trabajo, ingresa la siguiente definición para el flujo de trabajo:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  8. Haz clic en Implementar.

gcloud

  1. Crea un archivo de código fuente para tu flujo de trabajo:

    touch workflow-child.yaml
  2. Abre el archivo de código fuente en un editor de texto y copia el siguiente flujo de trabajo en el archivo.

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  3. Implementa el flujo de trabajo:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

Crea e implementa el flujo de trabajo principal

El flujo de trabajo principal ejecuta varias ramas del flujo de trabajo secundario con un bucle for.

  1. Copia el código fuente que define el flujo de trabajo principal:

    main:
      steps:
        - init:
            assign:
              - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - project_number: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
              - location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
              - workflow_child_name: "workflow-child"
              - queue_name: "queue-workflow-child"
        - enqueue_tasks_to_execute_child_workflow:
            for:
              value: iteration
              range: [1, 100]
              steps:
                  - iterate:
                      assign:
                        - data:
                            iteration: ${iteration}
                        - exec:
                            # Encode object to JSON string in expression for workflow argument
                            argument: ${json.encode_to_string(data)}
                  - create_task_to_execute_child_workflow:
                      call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
                      args:
                          parent: ${"projects/" + project_id + "/locations/" + location + "/queues/" + queue_name}
                          body:
                            task:
                              httpRequest:
                                body: ${base64.encode(json.encode(exec))}
                                url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_child_name + "/executions"}
                                oauthToken:
                                  serviceAccountEmail: ${project_number + "-compute@developer.gserviceaccount.com"}

    El flujo de trabajo consta de las siguientes partes:

    • Es un mapa que se usa para asignar constantes que hacen referencia al flujo de trabajo secundario y al nombre de la cola de Cloud Tasks. Para obtener más información, consulta Maps.

    • Es un bucle for que se ejecuta para invocar el flujo de trabajo secundario de forma iterativa. Para obtener más información, consulta Iteración.

    • Es un paso del flujo de trabajo que crea y agrega una gran cantidad de tareas a la cola de Cloud Tasks para ejecutar el flujo de trabajo secundario. Para obtener más información, consulta el conector de la API de Cloud Tasks.

  2. Implementa el flujo de trabajo:

    Console

    1. En la consola de Google Cloud , ve a la página Workflows:

      Ir a Workflows

    2. Haz clic en  Crear.

    3. Ingresa el nombre workflow-parent para el nuevo flujo de trabajo.

    4. En la lista Región, selecciona us-central1 (Iowa).

    5. En la lista Cuenta de servicio, selecciona la cuenta de servicio predeterminada de Compute Engine.

    6. Haz clic en Siguiente.

    7. En el editor de flujos de trabajo, pega la definición del flujo de trabajo principal.

    8. Haz clic en Implementar.

    gcloud

    1. Crea un archivo de código fuente para tu flujo de trabajo:

      touch workflow-parent.yaml
    2. Abre el archivo de código fuente en un editor de texto y pega la definición del flujo de trabajo principal.

    3. Implementa el flujo de trabajo:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

Ejecuta el flujo de trabajo principal sin límites de frecuencia

Ejecuta el flujo de trabajo principal para invocar los flujos de trabajo secundarios a través de la cola de Cloud Tasks. Las ejecuciones deberían tardar unos 10 segundos en completarse.

Console

  1. En la consola de Google Cloud , ve a la página Workflows:

    Ir a Workflows

  2. En la página Flujos de trabajo, haz clic en el flujo de trabajo workflow-parent para ir a su página de detalles.

  3. En la página Detalles del flujo de trabajo, haz clic en Ejecutar.

  4. Haz clic de nuevo en Ejecutar.

  5. Mientras se ejecuta el flujo de trabajo principal, vuelve a la página Flujos de trabajo y haz clic en el flujo de trabajo workflow-child para ir a su página de detalles.

  6. Haz clic en la pestaña Ejecuciones.

    Deberías ver ejecuciones del flujo de trabajo secundario, que se ejecutan aproximadamente al mismo tiempo, de manera similar a lo siguiente:

    Son los detalles de las ejecuciones del flujo de trabajo secundario que se ejecutan aproximadamente al mismo tiempo.

gcloud

  1. Ejecuta el flujo de trabajo:

    gcloud workflows run workflow-parent \
         --location=us-central1
  2. Para verificar que se activó una ejecución de flujo de trabajo, enumera las últimas cuatro ejecuciones:

    gcloud workflows executions list workflow-child --limit=4

    Dado que la cantidad de ejecuciones (100) es inferior al límite de simultaneidad de Workflows, los resultados deberían ser similares a los siguientes. Pueden surgir problemas de cuota si envías miles de ejecuciones al mismo tiempo.

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/1570d06e-d133-4536-a859-b7b6a1a85524
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:15.093934448Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/82724960-7d92-4961-aa2c-a0f0be46212c
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.903007626Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/598126fb-37f9-45bc-91d8-aea7d795d702
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.698260524Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/d2e9960b-f93f-4df4-a594-3e7e5c2be53f
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.503818840Z
    END_TIME: 

Creaste e implementaste un flujo de trabajo que invoca 100 iteraciones del flujo de trabajo secundario.

Ejecuta el flujo de trabajo principal con límites de frecuencia

Aplica un límite de frecuencia de un envío por segundo a la fila de Cloud Tasks y, luego, ejecuta el flujo de trabajo principal.

Console

  1. En la consola de Google Cloud , ve a la página Cloud Tasks:

    Ir a Cloud Tasks

  2. Haz clic en queue-workflow-child, la cola de Cloud Tasks que creaste, y, luego, en Editar cola.

  3. En la sección Límite de frecuencia para envío de tareas, en el campo Max dispatches, escribe 1.

  4. Haz clic en Guardar.

  5. Ve a la página Workflows:

    Ir a Workflows

  6. Haz clic en el flujo de trabajo workflow-parent para ir a su página de detalles.

  7. En la página Detalles del flujo de trabajo, haz clic en Ejecutar.

  8. Haz clic de nuevo en Ejecutar.

  9. Mientras se ejecuta el flujo de trabajo principal, vuelve a la página Flujos de trabajo y haz clic en el flujo de trabajo workflow-child para ir a su página de detalles.

  10. Haz clic en la pestaña Ejecuciones.

    Deberías ver ejecuciones del flujo de trabajo secundario, que se ejecutan a una solicitud por segundo, de manera similar a lo siguiente:

    Son los detalles del flujo de trabajo secundario que se ejecuta a una solicitud por segundo.

gcloud

  1. Actualiza la cola de Cloud Tasks para aplicar un límite de frecuencia de un envío por segundo:

    gcloud tasks queues update $QUEUE \
        --max-dispatches-per-second=1 \
        --location=us-central1
  2. Ejecuta el flujo de trabajo:

    gcloud workflows run workflow-parent \
       --location=us-central1
  3. Para verificar que se activó una ejecución de flujo de trabajo, enumera las últimas cuatro ejecuciones:

    gcloud workflows executions list workflow-child --limit=4

    Los resultados deberían ser similares a los siguientes, con un flujo de trabajo ejecutado por segundo:

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/becf4957-9fb2-40d9-835d-0ff2dd0c1249
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:24.446361457Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/6c1e7c4b-7ac6-4121-b351-1e2d56d10903
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:23.448213989Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/f2ba5027-af40-4cd3-8cd0-b8033bcc6211
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:22.431485914Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/ecc61ee5-fe87-49eb-8803-89dba929f6c8
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:21.443466369Z
    END_TIME: 

Implementaste correctamente un flujo de trabajo que invoca 100 iteraciones del flujo de trabajo secundario con una tasa de envío de una ejecución por segundo.