Usar una cola de Cloud Tasks para almacenar en búfer las ejecuciones de tu flujo de trabajo

En este tutorial se explica cómo crear una cola de Cloud Tasks que pueda regular la frecuencia de las ejecuciones de flujos de trabajo.

Hay un número máximo de ejecuciones de flujo de trabajo activas que pueden producirse simultáneamente. Una vez que se agota esta cuota, y si la acumulación de ejecuciones está inhabilitada o se alcanza la cuota de ejecuciones acumuladas, las nuevas ejecuciones fallarán y se devolverá el código de estado HTTP 429 Too many requests. Si habilitas una cola de Cloud Tasks para que ejecute flujos de trabajo secundarios a la velocidad que definas, puedes evitar problemas relacionados con las cuotas de Workflows y conseguir una mejor velocidad de ejecución.

Ten en cuenta que Cloud Tasks se ha diseñado para ofrecer una entrega "al menos una vez". Sin embargo, Workflows no garantiza el procesamiento exactamente una vez de las solicitudes duplicadas de Cloud Tasks.

En el siguiente diagrama, un flujo de trabajo principal invoca flujos de trabajo secundarios regulados por una cola de Cloud Tasks a la que se le ha aplicado una tasa de envío.

Flujo de trabajo principal que invoca iteraciones de un flujo de trabajo secundario a través de la cola de tareas de Cloud

Crear una cola de Cloud Tasks

Crea una cola de Cloud Tasks que puedas usar en el flujo de trabajo principal y que te permita regular la frecuencia de las ejecuciones del flujo de trabajo.

Consola

  1. En la Google Cloud consola, ve a la página Cloud Tasks:

    Ir a Cloud Tasks

  2. Haz clic en Crear cola push.

  3. Introduce el nombre de la cola queue-workflow-child.

  4. En la lista Región, selecciona us-central1 (Iowa).

  5. Haz clic en Crear.

gcloud

QUEUE=queue-workflow-child
LOCATION=us-central1
gcloud tasks queues create $QUEUE --location=$LOCATION

Crear y desplegar un flujo de trabajo secundario

Un flujo de trabajo secundario puede recibir y procesar datos de un flujo de trabajo principal. Crea y despliega un flujo de trabajo secundario que haga lo siguiente:

  • Recibe un iteration como argumento.
  • Espera 10 segundos para simular algún procesamiento
  • Devuelve una cadena si la ejecución se realiza correctamente.

Consola

  1. En la Google Cloud consola, ve a la página Flujos de trabajo.

    Ir a Workflows

  2. Haz clic en Crear.

  3. Introduce el nombre, workflow-child, del nuevo flujo de trabajo.

  4. En la lista Región, selecciona us-central1 (Iowa).

  5. En la lista Cuenta de servicio, selecciona Cuenta de servicio predeterminada de Compute Engine.

  6. Haz clic en Siguiente.

  7. En el editor del flujo de trabajo, introduce la siguiente definición para tu flujo de trabajo:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  8. Haz clic en Desplegar.

gcloud

  1. Crea un archivo de código fuente para tu flujo de trabajo:

    touch workflow-child.yaml
  2. Abre el archivo de código fuente en un editor de texto y copia el siguiente flujo de trabajo en el archivo.

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  3. Despliega el flujo de trabajo:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

Crear y desplegar el flujo de trabajo principal

El flujo de trabajo principal ejecuta varias ramas del flujo de trabajo secundario mediante un bucle for.

  1. Copia el código fuente que define el flujo de trabajo principal:

    main:
      steps:
        - init:
            assign:
              - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - project_number: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
              - location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
              - workflow_child_name: "workflow-child"
              - queue_name: "queue-workflow-child"
        - enqueue_tasks_to_execute_child_workflow:
            for:
              value: iteration
              range: [1, 100]
              steps:
                  - iterate:
                      assign:
                        - data:
                            iteration: ${iteration}
                        - exec:
                            # Encode object to JSON string in expression for workflow argument
                            argument: ${json.encode_to_string(data)}
                  - create_task_to_execute_child_workflow:
                      call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
                      args:
                          parent: ${"projects/" + project_id + "/locations/" + location + "/queues/" + queue_name}
                          body:
                            task:
                              httpRequest:
                                body: ${base64.encode(json.encode(exec))}
                                url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_child_name + "/executions"}
                                oauthToken:
                                  serviceAccountEmail: ${project_number + "-compute@developer.gserviceaccount.com"}

    El flujo de trabajo consta de las siguientes partes:

    • Mapa que se usa para asignar constantes que hacen referencia al flujo de trabajo secundario y al nombre de la cola de Cloud Tasks. Para obtener más información, consulta Maps.

    • Un bucle for que se ejecuta para invocar el flujo de trabajo secundario de forma iterativa. Para obtener más información, consulta Iteración.

    • Un paso de flujo de trabajo que crea y añade un gran número de tareas a la cola de Cloud Tasks para ejecutar el flujo de trabajo secundario. Para obtener más información, consulta el artículo sobre el conector de la API Cloud Tasks.

  2. Despliega el flujo de trabajo:

    Consola

    1. En la Google Cloud consola, ve a la página Flujos de trabajo:

      Ir a Workflows

    2. Haz clic en Crear.

    3. Introduce el nombre, workflow-parent, del nuevo flujo de trabajo.

    4. En la lista Región, selecciona us-central1 (Iowa).

    5. En la lista Cuenta de servicio, selecciona Cuenta de servicio predeterminada de Compute Engine.

    6. Haz clic en Siguiente.

    7. En el editor de flujos de trabajo, pega la definición del flujo de trabajo principal.

    8. Haz clic en Desplegar.

    gcloud

    1. Crea un archivo de código fuente para tu flujo de trabajo:

      touch workflow-parent.yaml
    2. Abre el archivo de código fuente en un editor de texto y pega la definición del flujo de trabajo principal.

    3. Despliega el flujo de trabajo:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

Ejecutar el flujo de trabajo principal sin límites de frecuencia

Ejecuta el flujo de trabajo principal para invocar los flujos de trabajo secundarios a través de la cola de Cloud Tasks. Las ejecuciones deberían tardar unos 10 segundos en completarse.

Consola

  1. En la Google Cloud consola, ve a la página Flujos de trabajo:

    Ir a Workflows

  2. En la página Flujos de trabajo, haz clic en el flujo de trabajo workflow-parent para ir a su página de detalles.

  3. En la página Detalles del flujo de trabajo, haz clic en Ejecutar.

  4. Vuelve a hacer clic en Ejecutar.

  5. Mientras se ejecuta el flujo de trabajo principal, vuelve a la página Flujos de trabajo y haz clic en el flujo de trabajo workflow-child para ir a su página de detalles.

  6. Haz clic en la pestaña Ejecuciones.

    Deberías ver ejecuciones del flujo de trabajo secundario, que se ejecutan aproximadamente al mismo tiempo, como en el siguiente ejemplo:

    Detalles de las ejecuciones del flujo de trabajo secundario que se ejecutan aproximadamente al mismo tiempo.

gcloud

  1. Ejecuta el flujo de trabajo:

    gcloud workflows run workflow-parent \
         --location=us-central1
  2. Para verificar que se ha activado una ejecución de flujo de trabajo, enumera las cuatro últimas ejecuciones:

    gcloud workflows executions list workflow-child --limit=4

    Como el número de ejecuciones (100) es inferior al límite de simultaneidad de los flujos de trabajo, los resultados deberían ser similares a los siguientes. Pueden surgir problemas de cuota si envías miles de ejecuciones al mismo tiempo.

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/1570d06e-d133-4536-a859-b7b6a1a85524
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:15.093934448Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/82724960-7d92-4961-aa2c-a0f0be46212c
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.903007626Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/598126fb-37f9-45bc-91d8-aea7d795d702
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.698260524Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/d2e9960b-f93f-4df4-a594-3e7e5c2be53f
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.503818840Z
    END_TIME: 

Has creado y desplegado un flujo de trabajo que invoca 100 iteraciones del flujo de trabajo secundario.

Ejecutar el flujo de trabajo principal con límites de velocidad

Aplica un límite de frecuencia de un envío por segundo a la cola de Cloud Tasks y, a continuación, ejecuta el flujo de trabajo principal.

Consola

  1. En la Google Cloud consola, ve a la página Cloud Tasks:

    Ir a Cloud Tasks

  2. Haz clic en queue-workflow-child, la cola de Cloud Tasks que has creado, y luego en Editar cola.

  3. En la sección Límites de frecuencia de envíos de tareas, en el campo Máximo de envíos, escribe 1.

  4. Haz clic en Guardar.

  5. Ve a la página Workflows:

    Ir a Workflows

  6. Haz clic en el flujo de trabajo principal para ir a su página de detalles.

  7. En la página Detalles del flujo de trabajo, haz clic en Ejecutar.

  8. Vuelve a hacer clic en Ejecutar.

  9. Mientras se ejecuta el flujo de trabajo principal, vuelve a la página Flujos de trabajo y haz clic en el flujo de trabajo workflow-child para ir a su página de detalles.

  10. Haz clic en la pestaña Ejecuciones.

    Deberías ver ejecuciones del flujo de trabajo secundario, que se ejecutan a una solicitud por segundo, de forma similar a la siguiente:

    Detalles del flujo de trabajo secundario que se ejecuta por solicitud por segundo.

gcloud

  1. Actualiza la cola de Cloud Tasks para aplicar un límite de una distribución por segundo:

    gcloud tasks queues update $QUEUE \
        --max-dispatches-per-second=1 \
        --location=us-central1
  2. Ejecuta el flujo de trabajo:

    gcloud workflows run workflow-parent \
       --location=us-central1
  3. Para verificar que se ha activado una ejecución de flujo de trabajo, enumera las cuatro últimas ejecuciones:

    gcloud workflows executions list workflow-child --limit=4

    Los resultados deberían ser similares a los siguientes, con un flujo de trabajo ejecutado por segundo:

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/becf4957-9fb2-40d9-835d-0ff2dd0c1249
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:24.446361457Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/6c1e7c4b-7ac6-4121-b351-1e2d56d10903
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:23.448213989Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/f2ba5027-af40-4cd3-8cd0-b8033bcc6211
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:22.431485914Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/ecc61ee5-fe87-49eb-8803-89dba929f6c8
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:21.443466369Z
    END_TIME: 

Has desplegado correctamente un flujo de trabajo que invoca 100 iteraciones del flujo de trabajo secundario con una tasa de envío de una ejecución por segundo.