Execute modelos flexíveis no Dataflow

Esta página descreve como executar uma tarefa do Dataflow através de um modelo flexível. Os modelos flexíveis permitem-lhe agrupar um pipeline do Dataflow para que possa executar o pipeline sem ter um ambiente de desenvolvimento do Apache Beam.

Autorizações necessárias

Quando executa um modelo flexível, o Dataflow cria uma tarefa para si. Para criar a tarefa, a conta de serviço do Dataflow precisa da seguinte autorização:

  • dataflow.serviceAgent

Quando usa o Dataflow pela primeira vez, o serviço atribui-lhe esta função, pelo que não precisa de conceder esta autorização.

Por predefinição, a conta de serviço do Compute Engine é usada para VMs iniciadoras e VMs de trabalho. A conta de serviço precisa das seguintes funções e capacidades:

  • Administrador de objetos de armazenamento (roles/storage.objectAdmin)
  • Leitor (roles/viewer)
  • Trabalhador do Dataflow (roles/dataflow.worker)
  • Acesso de leitura e escrita ao contentor de preparação
  • Acesso de leitura à imagem do modelo flexível

Para conceder acesso de leitura e escrita ao contentor de preparação, pode usar a função Storage Object Admin (roles/storage.objectAdmin). Para mais informações, consulte Funções de IAM para o Cloud Storage.

Para conceder acesso de leitura à imagem do modelo flexível, pode usar a função Storage Object Viewer (roles/storage.objectViewer). Para mais informações, consulte Configurar o controlo de acesso.

Execute um modelo flexível

Para executar um modelo flexível, use o comando gcloud dataflow flex-template run:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \
  --region REGION \
  --staging-location STAGING_LOCATION \
  --temp-location TEMP_LOCATION \
  --parameters  PARAMETERS \
  --additional-user-labels LABELS \

Substitua o seguinte:

  • JOB_ID: o ID do seu trabalho

  • TEMPLATE_FILE_LOCATION: a localização do ficheiro de modelo no Cloud Storage

  • REGION: a região na qual executar a tarefa do Dataflow

  • STAGING_LOCATION: a localização do Cloud Storage para preparar ficheiros locais

  • TEMP_LOCATION: a localização do Cloud Storage para escrever ficheiros temporários. Se não for definido, a predefinição é a localização de preparação.

  • PARAMETERS: parâmetros do pipeline para a tarefa

  • LABELS: opcional. Etiquetas anexadas à sua tarefa, usando o formato KEY_1=VALUE_1,KEY_2=VALUE_2,....

Durante o passo de preparação do lançamento de um modelo, o Dataflow escreve ficheiros na localização de preparação. O fluxo de dados lê estes ficheiros preparados para criar o gráfico de tarefas. Durante o passo de execução, o Dataflow escreve ficheiros na localização temporária.

Defina opções de tubagens

Para definir opções de pipeline quando executa um modelo flexível, use as seguintes flags no comando gcloud dataflow flex-template run:

gcloud

Quando transmite parâmetros do tipo List ou Map, pode ter de definir parâmetros num ficheiro YAML e usar a flag flags-file.

API

O exemplo seguinte mostra como incluir opções de pipeline, experiências e opções adicionais num corpo do pedido:

{
  "jobName": "my-flex-template-job",
  "parameters": {
    "option_defined_in_metadata": "value"
  },
  "environment": {
    "additionalExperiments": [
      "use_runner_v2"
    ],
    "additionalPipelineOptions": {
      "common_pipeline_option": "value"
    }
  }
}

Quando usa modelos flexíveis, pode configurar algumas opções de pipeline durante a inicialização do pipeline, mas não é possível alterar outras opções de pipeline. Se os argumentos da linha de comandos exigidos pelo modelo flexível forem substituídos, a tarefa pode ignorar, substituir ou rejeitar as opções do pipeline transmitidas pelo iniciador de modelos. A tarefa pode não ser iniciada ou pode ser iniciada uma tarefa que não use o modelo flexível. Para mais informações, consulte o artigo Falha ao ler o ficheiro de tarefa.

Durante a inicialização do pipeline, não altere as seguintes opções do pipeline:

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Ir

  • runner
  • project
  • job_name
  • template_location
  • region

Bloqueie as chaves SSH do projeto em VMs que usam chaves SSH baseadas em metadados

Pode impedir que as VMs aceitem chaves SSH armazenadas nos metadados do projeto bloqueando as chaves SSH do projeto nas VMs. Use o sinalizador additional-experiments com a opção de serviço block_project_ssh_keys:

--additional-experiments=block_project_ssh_keys

Para mais informações, consulte as opções de serviço do Dataflow.

Atualize uma tarefa de modelo flexível

O pedido de exemplo seguinte mostra como atualizar uma tarefa de streaming de modelos usando o método projects.locations.flexTemplates.launch. Se quiser usar a CLI gcloud, consulte o artigo Atualize um pipeline existente.

Se quiser atualizar um modelo clássico, use projects.locations.templates.launch em alternativa.

  1. Siga os passos para criar uma tarefa de streaming a partir de um modelo flexível. Envie o seguinte pedido HTTP POST com os valores modificados:

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • Substitua PROJECT_ID pelo ID do seu projeto.
    • Substitua REGION pela região do Dataflow do trabalho que está a atualizar.
    • Substitua JOB_NAME pelo nome exato da tarefa que quer atualizar.
    • Defina parameters para a sua lista de pares de chave-valor. Os parâmetros indicados são específicos deste exemplo de modelo. Se estiver a usar um modelo personalizado, modifique os parâmetros conforme necessário. Se estiver a usar o modelo de exemplo, substitua as seguintes variáveis.
      • Substitua SUBSCRIPTION_NAME pelo nome da sua subscrição do Pub/Sub.
      • Substitua DATASET pelo nome do seu conjunto de dados do BigQuery.
      • Substitua TABLE_NAME pelo nome da tabela do BigQuery.
    • Substitua STORAGE_PATH pela localização do ficheiro de modelo no Cloud Storage. A localização deve começar com gs://.
  2. Use o parâmetro environment para alterar as definições do ambiente. Para mais informações, consulte FlexTemplateRuntimeEnvironment.

  3. Opcional: para enviar o seu pedido através do curl (Linux, macOS ou Cloud Shell), guarde o pedido num ficheiro JSON e, em seguida, execute o seguinte comando:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    Substitua FILE_PATH pelo caminho para o ficheiro JSON que contém o corpo do pedido.

  4. Use a interface de monitorização do fluxo de dados para verificar se foi criada uma nova tarefa com o mesmo nome. Esta tarefa tem o estado Atualizado.

O que se segue?