Executar modelos flexíveis no Dataflow

Nesta página, descrevemos como executar um job do Dataflow usando um modelo Flex. Com os modelos Flex, é possível empacotar um pipeline do Dataflow para executá-lo sem um ambiente de desenvolvimento do Apache Beam.

Permissões necessárias

Quando você executa um modelo flexível, o Dataflow cria um job para você. Para criar o job, a conta de serviço do Dataflow precisa da seguinte permissão:

  • dataflow.serviceAgent

Quando você usa o Dataflow pela primeira vez, o serviço atribui esse papel a você para que não precise conceder essa permissão.

Por padrão, a conta de serviço do Compute Engine é usada para VMs de inicializador e de worker. A conta de serviço precisa dos seguintes papéis e capacidades:

  • Administrador de objetos do Storage (roles/storage.objectAdmin)
  • Visualizador (roles/viewer)
  • Worker do Dataflow (roles/dataflow.worker)
  • Acesso de leitura e gravação ao bucket de preparo
  • Acesso de leitura à imagem do modelo flexível

Para conceder acesso de leitura e gravação ao bucket de preparo, use o papel Administrador de objetos do Storage (roles/storage.objectAdmin). Para mais informações, consulte Papéis do IAM para o Cloud Storage.

Para conceder acesso de leitura à imagem do modelo flexível, use o papel Visualizador de objeto do Storage (roles/storage.objectViewer). Para mais informações, consulte Como configurar o controle de acesso.

Executar um modelo Flex

Para executar um modelo Flex, use o comando gcloud dataflow flex-template run:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \
  --region REGION \
  --staging-location STAGING_LOCATION \
  --temp-location TEMP_LOCATION \
  --parameters  PARAMETERS \
  --additional-user-labels LABELS \

Substitua:

  • JOB_ID: o ID do job

  • TEMPLATE_FILE_LOCATION: o local do Cloud Storage do arquivo de modelo

  • REGION: a região em que o job do Dataflow será executado.

  • STAGING_LOCATION: o local do Cloud Storage para preparar arquivos locais

  • TEMP_LOCATION: o local do Cloud Storage para gravar arquivos temporários. Se não for definido, o padrão será o local de organização temporária.

  • PARAMETERS: parâmetros de pipeline para o job.

  • LABELS: opcional. Rótulos anexados ao seu job, usando o formato KEY_1=VALUE_1,KEY_2=VALUE_2,....

Durante a etapa de preparo do lançamento de um modelo, o Dataflow grava arquivos no local de preparo. O Dataflow lê esses arquivos preparados para criar o gráfico de job. Durante a etapa de execução, o Dataflow grava arquivos no local temporário.

Definir opções de canal

Para definir opções de pipeline ao executar um modelo Flex, use as seguintes flags no comando gcloud dataflow flex-template run:

gcloud

Ao transmitir parâmetros do tipo List ou Map, talvez seja necessário definir parâmetros em um arquivo YAML e usar a flag flags-file.

API

O exemplo a seguir mostra como incluir opções de pipeline, experimentos e outras opções em um corpo de solicitação:

{
  "jobName": "my-flex-template-job",
  "parameters": {
    "option_defined_in_metadata": "value"
  },
  "environment": {
    "additionalExperiments": [
      "use_runner_v2"
    ],
    "additionalPipelineOptions": {
      "common_pipeline_option": "value"
    }
  }
}

Ao usar modelos Flex, é possível configurar algumas opções de pipeline durante a inicialização, mas outras opções não podem ser alteradas. Se os argumentos de linha de comando exigidos pelo modelo Flex forem substituídos, o job pode ignorar, substituir ou descartar as opções de pipeline transmitidas pelo inicializador de modelos. A inicialização do job pode falhar ou um job que não usa o modelo Flex pode ser iniciado. Para mais informações, consulte Falha ao ler o arquivo do job.

Durante a inicialização, não altere as seguintes opções de pipeline:

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Go

  • runner
  • project
  • job_name
  • template_location
  • region

Bloquear chaves SSH do projeto de VMs que usam chaves SSH baseadas em metadados

Para impedir que as VMs aceitem chaves SSH armazenadas nos metadados do projeto, bloqueie as chaves SSH do projeto das VMs. Use a sinalização additional-experiments com a opção de serviço block_project_ssh_keys:

--additional-experiments=block_project_ssh_keys

Para mais informações, consulte Opções de serviço do Dataflow.

Atualizar um job de modelo Flex

O exemplo de solicitação a seguir mostra como atualizar um job de streaming de modelo usando o método projects.locations.flexTemplates.launch. Se você quiser usar a gcloud CLI, consulte Atualizar um pipeline que já existe.

Para atualizar um modelo clássico, use projects.locations.templates.launch.

  1. Siga as etapas para criar um job de streaming com base em um modelo Flex. Envie a seguinte solicitação POST HTTP com os seguintes valores modificados:

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua REGION pela região do Dataflow do job que você está atualizando.
    • Substitua JOB_NAME pelo nome exato do job que você quer atualizar.
    • Defina parameters como sua lista de pares de chave/valor. Os parâmetros listados são específicos desse modelo de exemplo. Se você estiver usando um modelo personalizado, modifique os parâmetros conforme necessário. Se você estiver usando o modelo de exemplo, substitua as variáveis a seguir.
      • Substitua SUBSCRIPTION_NAME pelo nome da sua assinatura do Pub/Sub.
      • Substitua DATASET pelo nome do conjunto de dados do BigQuery.
      • Substitua TABLE_NAME pelo nome da sua tabela do BigQuery.
    • Substitua STORAGE_PATH pelo local do Cloud Storage do arquivo de modelo. O local precisa começar com gs://.
  2. Use o parâmetro environment para alterar as configurações do ambiente. Para mais informações, consulte FlexTemplateRuntimeEnvironment.

  3. Opcional: para enviar sua solicitação usando curl (Linux, macOS ou Cloud Shell), salve a solicitação em um arquivo JSON e execute o seguinte comando:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    Substitua FILE_PATH pelo caminho para o arquivo JSON que contém o corpo da solicitação.

  4. Use a interface de monitoramento do Dataflow para verificar se um novo job com o mesmo nome foi criado. Esse job tem o status Atualizado.

A seguir