Use uma fila do Cloud Tasks para armazenar em buffer as execuções do fluxo de trabalho

Este tutorial mostra como criar uma fila do Cloud Tasks que pode regular a taxa de execuções do fluxo de trabalho.

Existe um número máximo de execuções de fluxo de trabalho ativas que podem ocorrer em simultâneo. Quando esta quota se esgota e se a opção Execução em atraso estiver desativada ou se a quota de execuções em atraso for atingida, todas as novas execuções falham com um código de estado HTTP 429 Too many requests. Ao ativar uma fila do Cloud Tasks para executar fluxos de trabalho secundários a uma taxa que define, pode evitar problemas relacionados com a quota dos Workflows e alcançar uma melhor taxa de execução.

Tenha em atenção que o Cloud Tasks foi concebido para fornecer uma entrega "pelo menos uma vez"; no entanto, o Workflows não garante o processamento exatamente uma vez de pedidos duplicados do Cloud Tasks.

No diagrama seguinte, um fluxo de trabalho principal invoca fluxos de trabalho secundários que são regulados por uma fila do Cloud Tasks à qual foi aplicada uma taxa de envio.

Fluxo de trabalho principal que invoca iterações de um fluxo de trabalho secundário através da fila de tarefas do Cloud Tasks

Crie uma fila do Cloud Tasks

Crie uma fila do Cloud Tasks que pode usar no fluxo de trabalho principal e que lhe permite regular a taxa de execuções do fluxo de trabalho.

Consola

  1. Na Google Cloud consola, aceda à página Cloud Tasks:

    Aceda ao Cloud Tasks

  2. Clique em Criar fila de envio.

  3. Introduza o Nome da fila, queue-workflow-child.

  4. Na lista Região, selecione us-central1 (Iowa).

  5. Clique em Criar.

gcloud

QUEUE=queue-workflow-child
LOCATION=us-central1
gcloud tasks queues create $QUEUE --location=$LOCATION

Crie e implemente um fluxo de trabalho secundário

Um fluxo de trabalho secundário pode receber e processar dados de um fluxo de trabalho principal. Crie e implemente um fluxo de trabalho secundário que faça o seguinte:

  • Recebe um iteration como argumento
  • Dorme durante 10 segundos para simular algum processamento
  • Devolve uma string após a execução com êxito

Consola

  1. Na Google Cloud consola, aceda à página Fluxos de trabalho.

    Aceda a Fluxos de trabalho

  2. Clique em Criar.

  3. Introduza o nome, workflow-child, do novo fluxo de trabalho.

  4. Na lista Região, selecione us-central1 (Iowa).

  5. Na lista Conta de serviço, selecione a conta de serviço predefinida do Compute Engine.

  6. Clicar em Seguinte.

  7. No editor de fluxos de trabalho, introduza a seguinte definição para o seu fluxo de trabalho:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  8. Clique em Implementar.

gcloud

  1. Crie um ficheiro de código-fonte para o seu fluxo de trabalho:

    touch workflow-child.yaml
  2. Abra o ficheiro de código-fonte num editor de texto e copie o seguinte fluxo de trabalho para o ficheiro.

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  3. Implemente o fluxo de trabalho:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

Crie e implemente o fluxo de trabalho principal

O fluxo de trabalho principal executa vários ramos do fluxo de trabalho secundário através de um ciclo for.

  1. Copie o código fonte que define o fluxo de trabalho principal:

    main:
      steps:
        - init:
            assign:
              - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - project_number: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
              - location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
              - workflow_child_name: "workflow-child"
              - queue_name: "queue-workflow-child"
        - enqueue_tasks_to_execute_child_workflow:
            for:
              value: iteration
              range: [1, 100]
              steps:
                  - iterate:
                      assign:
                        - data:
                            iteration: ${iteration}
                        - exec:
                            # Encode object to JSON string in expression for workflow argument
                            argument: ${json.encode_to_string(data)}
                  - create_task_to_execute_child_workflow:
                      call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
                      args:
                          parent: ${"projects/" + project_id + "/locations/" + location + "/queues/" + queue_name}
                          body:
                            task:
                              httpRequest:
                                body: ${base64.encode(json.encode(exec))}
                                url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_child_name + "/executions"}
                                oauthToken:
                                  serviceAccountEmail: ${project_number + "-compute@developer.gserviceaccount.com"}

    O fluxo de trabalho é composto pelas seguintes partes:

    • Um mapa usado para atribuir constantes referentes ao fluxo de trabalho secundário e ao nome da fila do Cloud Tasks. Para mais informações, consulte o artigo Mapas.

    • Um ciclo for que é executado para invocar o fluxo de trabalho secundário, de forma iterativa. Para mais informações, consulte a secção Iteração.

    • Um passo do fluxo de trabalho que cria e adiciona um grande número de tarefas à fila do Cloud Tasks para executar o fluxo de trabalho secundário. Para mais informações, consulte o conetor da API Cloud Tasks.

  2. Implemente o fluxo de trabalho:

    Consola

    1. Na Google Cloud consola, aceda à página Fluxos de trabalho:

      Aceda a Fluxos de trabalho

    2. Clique em Criar.

    3. Introduza o nome, workflow-parent, do novo fluxo de trabalho.

    4. Na lista Região, selecione us-central1 (Iowa).

    5. Na lista Conta de serviço, selecione a conta de serviço predefinida do Compute Engine.

    6. Clicar em Seguinte.

    7. No editor de fluxos de trabalho, cole a definição do fluxo de trabalho principal.

    8. Clique em Implementar.

    gcloud

    1. Crie um ficheiro de código-fonte para o seu fluxo de trabalho:

      touch workflow-parent.yaml
    2. Abra o ficheiro de código fonte num editor de texto e cole a definição do fluxo de trabalho principal.

    3. Implemente o fluxo de trabalho:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

Execute o fluxo de trabalho principal sem limites de velocidade

Execute o fluxo de trabalho principal para invocar os fluxos de trabalho secundários através da fila do Cloud Tasks. As execuções devem demorar cerca de 10 segundos a concluir.

Consola

  1. Na Google Cloud consola, aceda à página Fluxos de trabalho:

    Aceda a Fluxos de trabalho

  2. Na página Fluxos de trabalho, clique no fluxo de trabalho workflow-parent para aceder à respetiva página de detalhes.

  3. Na página Detalhes do fluxo de trabalho, clique em Executar.

  4. Clique novamente em Executar.

  5. Enquanto o fluxo de trabalho principal está em execução, regresse à página Fluxos de trabalho e clique no fluxo de trabalho workflow-child para aceder à respetiva página de detalhes.

  6. Clique no separador Execuções.

    Deverá ver execuções do fluxo de trabalho secundário, executadas aproximadamente ao mesmo tempo, semelhantes ao seguinte:

    Detalhes das execuções do fluxo de trabalho secundário em execução à mesma hora.

gcloud

  1. Execute o fluxo de trabalho:

    gcloud workflows run workflow-parent \
         --location=us-central1
  2. Para verificar se uma execução do fluxo de trabalho foi acionada, liste as últimas quatro execuções:

    gcloud workflows executions list workflow-child --limit=4

    Uma vez que o número de execuções (100) está abaixo do limite de concorrência dos fluxos de trabalho, os resultados devem ser semelhantes aos seguintes. Podem surgir problemas de quota se enviar milhares de execuções em simultâneo.

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/1570d06e-d133-4536-a859-b7b6a1a85524
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:15.093934448Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/82724960-7d92-4961-aa2c-a0f0be46212c
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.903007626Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/598126fb-37f9-45bc-91d8-aea7d795d702
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.698260524Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/d2e9960b-f93f-4df4-a594-3e7e5c2be53f
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.503818840Z
    END_TIME: 

Criou e implementou um fluxo de trabalho que invoca 100 iterações do fluxo de trabalho filho.

Execute o fluxo de trabalho principal com limites de velocidade

Aplique um limite de taxa de um envio por segundo à fila do Cloud Tasks e, em seguida, execute o fluxo de trabalho principal.

Consola

  1. Na Google Cloud consola, aceda à página Cloud Tasks:

    Aceda ao Cloud Tasks

  2. Clique em queue-workflow-child, a fila do Cloud Tasks que criou, e clique em Editar fila.

  3. Na secção Limites de taxa para envios de tarefas, no campo Envios máximos, escreva 1.

  4. Clique em Guardar.

  5. Aceda à página Fluxos de trabalho:

    Aceda a Fluxos de trabalho

  6. Clique no fluxo de trabalho workflow-parent para aceder à respetiva página de detalhes.

  7. Na página Detalhes do fluxo de trabalho, clique em Executar.

  8. Clique novamente em Executar.

  9. Enquanto o fluxo de trabalho principal está em execução, regresse à página Fluxos de trabalho e clique no fluxo de trabalho workflow-child para aceder à respetiva página de detalhes.

  10. Clique no separador Execuções.

    Deverá ver execuções do fluxo de trabalho secundário, em execução a um pedido por segundo, semelhantes ao seguinte:

    Detalhes do fluxo de trabalho secundário em execução por pedido por segundo.

gcloud

  1. Atualize a fila de tarefas do Cloud para aplicar um limite de taxa de um envio por segundo:

    gcloud tasks queues update $QUEUE \
        --max-dispatches-per-second=1 \
        --location=us-central1
  2. Execute o fluxo de trabalho:

    gcloud workflows run workflow-parent \
       --location=us-central1
  3. Para verificar se uma execução do fluxo de trabalho foi acionada, liste as últimas quatro execuções:

    gcloud workflows executions list workflow-child --limit=4

    Os resultados devem ser semelhantes aos seguintes, com um fluxo de trabalho executado por segundo:

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/becf4957-9fb2-40d9-835d-0ff2dd0c1249
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:24.446361457Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/6c1e7c4b-7ac6-4121-b351-1e2d56d10903
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:23.448213989Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/f2ba5027-af40-4cd3-8cd0-b8033bcc6211
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:22.431485914Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/ecc61ee5-fe87-49eb-8803-89dba929f6c8
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:21.443466369Z
    END_TIME: 

Implementou com êxito um fluxo de trabalho que invoca 100 iterações do fluxo de trabalho filho com uma taxa de envio de uma execução por segundo.