Nesta página, descrevemos como executar um job do Dataflow usando um modelo Flex. Com os modelos Flex, é possível empacotar um pipeline do Dataflow para executá-lo sem um ambiente de desenvolvimento do Apache Beam.
Permissões necessárias
Quando você executa um modelo flexível, o Dataflow cria um job para você. Para criar o job, a conta de serviço do Dataflow precisa da seguinte permissão:
dataflow.serviceAgent
Quando você usa o Dataflow pela primeira vez, o serviço atribui esse papel a você para que não precise conceder essa permissão.
Por padrão, a conta de serviço do Compute Engine é usada para VMs de inicializador e de worker. A conta de serviço precisa dos seguintes papéis e capacidades:
- Administrador de objetos do Storage (
roles/storage.objectAdmin) - Visualizador (
roles/viewer) - Worker do Dataflow (
roles/dataflow.worker) - Acesso de leitura e gravação ao bucket de preparo
- Acesso de leitura à imagem do modelo flexível
Para conceder acesso de leitura e gravação ao bucket de preparo, use o papel Administrador de objetos do Storage (roles/storage.objectAdmin). Para mais informações, consulte Papéis do IAM para o Cloud Storage.
Para conceder acesso de leitura à imagem do modelo flexível, use o papel Visualizador de objeto do Storage (roles/storage.objectViewer). Para mais informações, consulte Como configurar o controle de acesso.
Executar um modelo Flex
Para executar um modelo Flex, use o comando
gcloud dataflow flex-template run:
gcloud dataflow flex-template run JOB_ID \ --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \ --region REGION \ --staging-location STAGING_LOCATION \ --temp-location TEMP_LOCATION \ --parameters PARAMETERS \ --additional-user-labels LABELS \
Substitua:
JOB_ID: o ID do jobTEMPLATE_FILE_LOCATION: o local do Cloud Storage do arquivo de modeloREGION: a região em que o job do Dataflow será executado.STAGING_LOCATION: o local do Cloud Storage para preparar arquivos locaisTEMP_LOCATION: o local do Cloud Storage para gravar arquivos temporários. Se não for definido, o padrão será o local de organização temporária.PARAMETERS: parâmetros de pipeline para o job.LABELS: opcional. Rótulos anexados ao seu job, usando o formatoKEY_1=VALUE_1,KEY_2=VALUE_2,....
Durante a etapa de preparo do lançamento de um modelo, o Dataflow grava arquivos no local de preparo. O Dataflow lê esses arquivos preparados para criar o gráfico de job. Durante a etapa de execução, o Dataflow grava arquivos no local temporário.
Definir opções de canal
Para definir opções de pipeline ao executar um modelo Flex, use as seguintes flags
no comando gcloud dataflow flex-template run:
parameters: use esta flag para definir os seguintes tipos de opção de pipeline:Opções de pipeline compatíveis com modelos flexíveis. Para conferir uma lista das opções compatíveis com os modelos flexíveis, consulte Opções de pipeline.
Opções de pipeline declaradas nos metadados do modelo.
additional-pipeline-options: use essa flag para definir outras opções de pipeline do Apache Beam que não são diretamente compatíveis com os modelos Flex.additional-experiments: use essa flag para definir opções experimentais de pipeline (equivalente à opçãoexperiments).
gcloud
Inclua opções de pipeline usando a sinalização
parameters.Inclua experimentos de tempo de execução e opções de pipeline usando as flags
additional-experimentseadditional-pipeline-options.
Ao transmitir parâmetros do tipo List ou Map, talvez seja necessário definir parâmetros em um arquivo YAML e usar a flag flags-file.
API
Inclua opções de pipeline usando o campo
parameters.Inclua experimentos de tempo de execução e opções de pipeline usando os campos
additionalExperimentseadditionalPipelineOptions.
O exemplo a seguir mostra como incluir opções de pipeline, experimentos e outras opções em um corpo de solicitação:
{
"jobName": "my-flex-template-job",
"parameters": {
"option_defined_in_metadata": "value"
},
"environment": {
"additionalExperiments": [
"use_runner_v2"
],
"additionalPipelineOptions": {
"common_pipeline_option": "value"
}
}
}
Ao usar modelos Flex, é possível configurar algumas opções de pipeline durante a inicialização, mas outras opções não podem ser alteradas. Se os argumentos de linha de comando exigidos pelo modelo Flex forem substituídos, o job pode ignorar, substituir ou descartar as opções de pipeline transmitidas pelo inicializador de modelos. A inicialização do job pode falhar ou um job que não usa o modelo Flex pode ser iniciado. Para mais informações, consulte Falha ao ler o arquivo do job.
Durante a inicialização, não altere as seguintes opções de pipeline:
Java
runnerprojectjobNametemplateLocationregion
Python
runnerprojectjob_nametemplate_locationregion
Go
runnerprojectjob_nametemplate_locationregion
Bloquear chaves SSH do projeto de VMs que usam chaves SSH baseadas em metadados
Para impedir que as VMs aceitem chaves SSH armazenadas nos metadados do projeto, bloqueie as chaves SSH do projeto das VMs. Use a sinalização additional-experiments com
a opção de serviço block_project_ssh_keys:
--additional-experiments=block_project_ssh_keys
Para mais informações, consulte Opções de serviço do Dataflow.
Atualizar um job de modelo Flex
O exemplo de solicitação a seguir mostra como atualizar um job de streaming de modelo usando o método projects.locations.flexTemplates.launch. Se você quiser usar a gcloud CLI, consulte Atualizar um pipeline que já existe.
Para atualizar um modelo clássico, use projects.locations.templates.launch.
Siga as etapas para criar um job de streaming com base em um modelo Flex. Envie a seguinte solicitação POST HTTP com os seguintes valores modificados:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch { "launchParameter": { "update": true "jobName": "JOB_NAME", "parameters": { "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "output_table": "PROJECT_ID:DATASET.TABLE_NAME" }, "containerSpecGcsPath": "STORAGE_PATH" }, }- Substitua
PROJECT_IDpela ID do seu projeto. - Substitua
REGIONpela região do Dataflow do job que você está atualizando. - Substitua
JOB_NAMEpelo nome exato do job que você quer atualizar. - Defina
parameterscomo sua lista de pares de chave/valor. Os parâmetros listados são específicos desse modelo de exemplo. Se você estiver usando um modelo personalizado, modifique os parâmetros conforme necessário. Se você estiver usando o modelo de exemplo, substitua as variáveis a seguir.- Substitua
SUBSCRIPTION_NAMEpelo nome da sua assinatura do Pub/Sub. - Substitua
DATASETpelo nome do conjunto de dados do BigQuery. - Substitua
TABLE_NAMEpelo nome da sua tabela do BigQuery.
- Substitua
- Substitua
STORAGE_PATHpelo local do Cloud Storage do arquivo de modelo. O local precisa começar comgs://.
- Substitua
Use o parâmetro
environmentpara alterar as configurações do ambiente. Para mais informações, consulteFlexTemplateRuntimeEnvironment.Opcional: para enviar sua solicitação usando curl (Linux, macOS ou Cloud Shell), salve a solicitação em um arquivo JSON e execute o seguinte comando:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launchSubstitua FILE_PATH pelo caminho para o arquivo JSON que contém o corpo da solicitação.
Use a interface de monitoramento do Dataflow para verificar se um novo job com o mesmo nome foi criado. Esse job tem o status Atualizado.
A seguir
- Saiba como criar um modelo Flex para seu pipeline do Apache Beam.
- Para saber mais sobre os modelos clássicos e Flex e os cenários de caso de uso, consulte Modelos do Dataflow.
- Para ver informações sobre a solução de problemas de modelos Flex, consulte Resolver problemas de tempo limite do modelo Flex.
- Para mais arquiteturas de referência, diagramas e práticas recomendadas, confira a Central de arquitetura do Cloud.