Esta página descreve como executar uma tarefa do Dataflow através de um modelo flexível. Os modelos flexíveis permitem-lhe agrupar um pipeline do Dataflow para que possa executar o pipeline sem ter um ambiente de desenvolvimento do Apache Beam.
Autorizações necessárias
Quando executa um modelo flexível, o Dataflow cria uma tarefa para si. Para criar a tarefa, a conta de serviço do Dataflow precisa da seguinte autorização:
dataflow.serviceAgent
Quando usa o Dataflow pela primeira vez, o serviço atribui-lhe esta função, pelo que não precisa de conceder esta autorização.
Por predefinição, a conta de serviço do Compute Engine é usada para VMs iniciadoras e VMs de trabalho. A conta de serviço precisa das seguintes funções e capacidades:
- Administrador de objetos de armazenamento (
roles/storage.objectAdmin) - Leitor (
roles/viewer) - Trabalhador do Dataflow (
roles/dataflow.worker) - Acesso de leitura e escrita ao contentor de preparação
- Acesso de leitura à imagem do modelo flexível
Para conceder acesso de leitura e escrita ao contentor de preparação, pode usar a função Storage Object Admin (roles/storage.objectAdmin). Para mais informações, consulte Funções de IAM para o Cloud Storage.
Para conceder acesso de leitura à imagem do modelo flexível, pode usar a função Storage Object Viewer (roles/storage.objectViewer). Para mais informações, consulte Configurar o controlo de acesso.
Execute um modelo flexível
Para executar um modelo flexível, use o comando
gcloud dataflow flex-template run:
gcloud dataflow flex-template run JOB_ID \ --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \ --region REGION \ --staging-location STAGING_LOCATION \ --temp-location TEMP_LOCATION \ --parameters PARAMETERS \ --additional-user-labels LABELS \
Substitua o seguinte:
JOB_ID: o ID do seu trabalhoTEMPLATE_FILE_LOCATION: a localização do ficheiro de modelo no Cloud StorageREGION: a região na qual executar a tarefa do DataflowSTAGING_LOCATION: a localização do Cloud Storage para preparar ficheiros locaisTEMP_LOCATION: a localização do Cloud Storage para escrever ficheiros temporários. Se não for definido, a predefinição é a localização de preparação.PARAMETERS: parâmetros do pipeline para a tarefaLABELS: opcional. Etiquetas anexadas à sua tarefa, usando o formatoKEY_1=VALUE_1,KEY_2=VALUE_2,....
Durante o passo de preparação do lançamento de um modelo, o Dataflow escreve ficheiros na localização de preparação. O fluxo de dados lê estes ficheiros preparados para criar o gráfico de tarefas. Durante o passo de execução, o Dataflow escreve ficheiros na localização temporária.
Defina opções de tubagens
Para definir opções de pipeline quando executa um modelo flexível, use as seguintes flags no comando gcloud dataflow flex-template run:
parameters: Use esta flag para definir os seguintes tipos de opção de pipeline:Opções de pipeline suportadas por modelos flexíveis. Para ver uma lista das opções suportadas pelos modelos flexíveis, consulte o artigo Opções de pipeline.
Opções de pipeline declaradas nos metadados do modelo.
additional-pipeline-options: Use esta flag para definir outras opções de pipeline do Apache Beam que não são suportadas diretamente pelos modelos flexíveis.additional-experiments: Use esta flag para definir opções de pipeline experimentais (equivalente à opçãoexperiments).
gcloud
Inclua opções de pipeline através da flag
parameters.Inclua experiências de tempo de execução e opções de pipeline através das flags
additional-experimentseadditional-pipeline-options.
Quando transmite parâmetros do tipo List ou Map, pode ter de definir
parâmetros num ficheiro YAML e usar a flag flags-file.
API
Inclua opções de pipeline através do campo
parameters.Inclua experiências de tempo de execução e opções de pipeline através dos campos
additionalExperimentseadditionalPipelineOptions.
O exemplo seguinte mostra como incluir opções de pipeline, experiências e opções adicionais num corpo do pedido:
{
"jobName": "my-flex-template-job",
"parameters": {
"option_defined_in_metadata": "value"
},
"environment": {
"additionalExperiments": [
"use_runner_v2"
],
"additionalPipelineOptions": {
"common_pipeline_option": "value"
}
}
}
Quando usa modelos flexíveis, pode configurar algumas opções de pipeline durante a inicialização do pipeline, mas não é possível alterar outras opções de pipeline. Se os argumentos da linha de comandos exigidos pelo modelo flexível forem substituídos, a tarefa pode ignorar, substituir ou rejeitar as opções do pipeline transmitidas pelo iniciador de modelos. A tarefa pode não ser iniciada ou pode ser iniciada uma tarefa que não use o modelo flexível. Para mais informações, consulte o artigo Falha ao ler o ficheiro de tarefa.
Durante a inicialização do pipeline, não altere as seguintes opções do pipeline:
Java
runnerprojectjobNametemplateLocationregion
Python
runnerprojectjob_nametemplate_locationregion
Ir
runnerprojectjob_nametemplate_locationregion
Bloqueie as chaves SSH do projeto em VMs que usam chaves SSH baseadas em metadados
Pode impedir que as VMs aceitem chaves SSH armazenadas nos metadados do projeto
bloqueando as chaves SSH do projeto nas VMs. Use o sinalizador additional-experiments com a opção de serviço block_project_ssh_keys:
--additional-experiments=block_project_ssh_keys
Para mais informações, consulte as opções de serviço do Dataflow.
Atualize uma tarefa de modelo flexível
O pedido de exemplo seguinte mostra como atualizar uma tarefa de streaming de modelos usando o método projects.locations.flexTemplates.launch. Se quiser usar a CLI gcloud, consulte o artigo Atualize um pipeline existente.
Se quiser atualizar um modelo clássico, use projects.locations.templates.launch em alternativa.
Siga os passos para criar uma tarefa de streaming a partir de um modelo flexível. Envie o seguinte pedido HTTP POST com os valores modificados:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch { "launchParameter": { "update": true "jobName": "JOB_NAME", "parameters": { "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "output_table": "PROJECT_ID:DATASET.TABLE_NAME" }, "containerSpecGcsPath": "STORAGE_PATH" }, }- Substitua
PROJECT_IDpelo ID do seu projeto. - Substitua
REGIONpela região do Dataflow do trabalho que está a atualizar. - Substitua
JOB_NAMEpelo nome exato da tarefa que quer atualizar. - Defina
parameterspara a sua lista de pares de chave-valor. Os parâmetros indicados são específicos deste exemplo de modelo. Se estiver a usar um modelo personalizado, modifique os parâmetros conforme necessário. Se estiver a usar o modelo de exemplo, substitua as seguintes variáveis.- Substitua
SUBSCRIPTION_NAMEpelo nome da sua subscrição do Pub/Sub. - Substitua
DATASETpelo nome do seu conjunto de dados do BigQuery. - Substitua
TABLE_NAMEpelo nome da tabela do BigQuery.
- Substitua
- Substitua
STORAGE_PATHpela localização do ficheiro de modelo no Cloud Storage. A localização deve começar comgs://.
- Substitua
Use o parâmetro
environmentpara alterar as definições do ambiente. Para mais informações, consulteFlexTemplateRuntimeEnvironment.Opcional: para enviar o seu pedido através do curl (Linux, macOS ou Cloud Shell), guarde o pedido num ficheiro JSON e, em seguida, execute o seguinte comando:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launchSubstitua FILE_PATH pelo caminho para o ficheiro JSON que contém o corpo do pedido.
Use a interface de monitorização do fluxo de dados para verificar se foi criada uma nova tarefa com o mesmo nome. Esta tarefa tem o estado Atualizado.
O que se segue?
- Saiba como criar um modelo flexível para o seu pipeline do Apache Beam.
- Para saber mais sobre os modelos clássicos, os modelos flexíveis e os respetivos cenários de utilização, consulte Modelos do Dataflow.
- Para informações de resolução de problemas de modelos flexíveis, consulte o artigo Resolva problemas de tempo limite de modelos flexíveis.
- Para ver mais arquiteturas de referência, diagramas e práticas recomendadas, explore o Centro de arquitetura na nuvem.