Esta página fornece dicas de resolução de problemas e estratégias de depuração que podem ser úteis se tiver problemas ao criar ou executar o seu pipeline do Dataflow. Estas informações podem ajudar a detetar uma falha no pipeline, determinar o motivo de uma execução do pipeline com falha e sugerir algumas medidas para corrigir o problema.
O diagrama seguinte mostra o fluxo de trabalho de resolução de problemas do Dataflow descrito nesta página.
O Dataflow fornece feedback em tempo real sobre a sua tarefa e existe um conjunto básico de passos que pode usar para verificar as mensagens de erro, os registos e as condições, como o progresso da tarefa ter parado.
Para orientações sobre erros comuns que pode encontrar ao executar a tarefa do Dataflow, consulte o artigo Resolva problemas de erros do Dataflow. Para monitorizar e resolver problemas de desempenho do pipeline, consulte o artigo Monitorize o desempenho do pipeline.
Práticas recomendadas para pipelines
Seguem-se as práticas recomendadas para pipelines Java, Python e Go.
- Para todos os pipelines, certifique-se de que define as localizações de preparação e temporárias para localizações diferentes.
- A utilização de um contentor de preparação separado pode acelerar o relançamento do pipeline, porque os artefactos podem ser reutilizados em vez de serem transferidos novamente.
- Os ficheiros preparados armazenam artefactos gerados na hora de início da tarefa, como o código do pipeline, e podem ser reutilizados ao longo da duração de uma tarefa.
- Para tarefas de streaming ou em lote concluídas, pode eliminar ficheiros organizados depois de a tarefa terminar.
- Para tarefas de streaming ou em lote em curso, não elimine ficheiros organizados, mesmo após uma atualização da pipeline.
- Os ficheiros temporários não são limpos automaticamente. Se não tiver uma política para limpar ficheiros antigos, como um tempo de vida (TTL), tem de os remover manualmente. Para evitar custos de armazenamento, configure uma política de tempo de vida (TTL) para os seus contentores temporários e de preparação.
- Para o contentor temporário, defina um TTL ligeiramente superior à duração da tarefa de execução mais longa. Por exemplo, um TTL de 7 dias é um ponto de partida razoável.
- Para o contentor de preparação, defina um TTL mais longo para permitir a reutilização de artefactos em execuções de tarefas subsequentes, o que pode acelerar o arranque das tarefas. Por exemplo, um TTL de 6 meses é um ponto de partida razoável.
- Recomendamos que desative a eliminação temporária para os contentores temporários e de preparação para evitar custos de armazenamento desnecessários.
Verifique o estado do pipeline
Pode detetar erros nas execuções do pipeline através da interface de monitorização do Dataflow.
- Aceda à Google Cloud consola.
- Selecione o seu projeto da Google Cloud Platform na lista de projetos.
- No menu de navegação, em Big Data, clique em Dataflow. É apresentada uma lista de tarefas em execução no painel do lado direito.
- Selecione a tarefa de pipeline que quer ver. Pode ver o estado das tarefas rapidamente no campo Estado: "Em execução", "Concluído" ou "Com falhas".
Encontre informações sobre falhas de pipelines
Se uma das tarefas do pipeline falhar, pode selecionar a tarefa para ver informações mais detalhadas sobre os erros e os resultados da execução. Quando seleciona uma tarefa, pode ver os principais gráficos do pipeline, o gráfico de execução, o painel Informações da tarefa e o painel Registos com os separadores Registos de tarefas, Registos de trabalhadores, Diagnósticos e Recomendações.
Verifique as mensagens de erro de tarefas
Para ver os registos de tarefas gerados pelo código do pipeline e pelo serviço Dataflow, no painel Registos, clique em segmentMostrar.
Pode filtrar as mensagens apresentadas nos Registos de tarefas clicando em Informaçõesarrow_drop_down e filter_listFiltrar. Para apresentar apenas mensagens de erro, clique em Informaçõesarrow_drop_down e selecione Erro.
Para expandir uma mensagem de erro, clique na secção expansível arrow_right.
Em alternativa, pode clicar no separador Diagnósticos. Este separador mostra onde ocorreram erros ao longo da cronologia escolhida, uma contagem de todos os erros registados e possíveis recomendações para o seu pipeline.
Veja os registos de passos da sua tarefa
Quando seleciona um passo no gráfico do pipeline, o painel de registos alterna entre a apresentação dos registos de tarefas gerados pelo serviço Dataflow e a apresentação dos registos das instâncias do Compute Engine que executam o passo do pipeline.
O Cloud Logging combina todos os registos recolhidos das instâncias do Compute Engine do seu projeto num único local. Consulte o artigo Registar mensagens de pipeline para mais informações sobre a utilização das várias capacidades de registo do Dataflow.
Resolva a rejeição de pipelines automatizados
Em alguns casos, o serviço Dataflow identifica que o seu pipeline pode acionar problemas conhecidos do SDK. Para evitar o envio de pipelines que provavelmente vão ter problemas, o Dataflow rejeita automaticamente o pipeline e apresenta a seguinte mensagem:
The workflow was automatically rejected by the service because it might trigger an identified bug in the SDK (details below). If you think this identification is in error, and would like to override this automated rejection, please re-submit this workflow with the following override flag: [OVERRIDE FLAG]. Bug details: [BUG DETAILS]. Contact Google Cloud Support for further help. Please use this identifier in your communication: [BUG ID].
Depois de ler as ressalvas nos detalhes do erro indicados no link, se quiser tentar executar o pipeline na mesma, pode substituir a rejeição automática. Adicione a flag
--experiments=<override-flag> e reenvie o pipeline.
Determine a causa de uma falha no pipeline
Normalmente, a falha na execução de um pipeline do Apache Beam pode ser atribuída a uma das seguintes causas:
- Erros de construção de gráficos ou pipelines. Estes erros ocorrem quando o Dataflow encontra um problema ao criar o gráfico de passos que compõem o seu pipeline, conforme descrito pelo seu pipeline do Apache Beam.
- Erros na validação de tarefas. O serviço Dataflow valida qualquer tarefa de pipeline que iniciar. Os erros no processo de validação podem impedir a criação ou a execução bem-sucedida da tarefa. Os erros de validação podem incluir problemas com o contentor do Cloud Storage do seu projeto ou com as autorizações do seu projeto. Google Cloud
- Exceções no código do trabalhador. Estes erros ocorrem quando existem erros ou erros no código fornecido pelo utilizador que o Dataflow distribui para trabalhadores paralelos, como as instâncias
DoFnde uma transformaçãoParDo. - Erros causados por falhas temporárias noutros Google Cloud serviços. O seu pipeline pode falhar devido a uma indisponibilidade temporária ou a outro problema nos Google Cloud serviços dos quais o Dataflow depende, como o Compute Engine ou o Cloud Storage.
Detetar erros de construção de gráficos ou pipelines
Pode ocorrer um erro de construção de um gráfico quando o Dataflow está a criar o gráfico de execução para o seu pipeline a partir do código no seu programa do Dataflow. Durante o tempo de construção do gráfico, o Dataflow verifica a existência de operações ilegais.
Se o Dataflow detetar um erro na construção do gráfico, tenha em atenção que não é criado nenhum trabalho no serviço Dataflow. Assim, não vê feedback na interface de monitorização do fluxo de dados. Em alternativa, é apresentada uma mensagem de erro semelhante à seguinte na consola ou na janela do terminal onde executou o pipeline do Apache Beam:
Java
Por exemplo, se o seu pipeline tentar realizar uma agregação como
GroupByKey num PCollection não acionado, sem limites e com janelas globais, é apresentada uma mensagem de erro semelhante à seguinte:
... ... Exception in thread "main" java.lang.IllegalStateException: ... GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. ... Use a Window.into or Window.triggering transform prior to GroupByKey ...
Python
Por exemplo, se o seu pipeline usar sugestões de tipo e o tipo de argumento numa das transformações não for o esperado, é apresentada uma mensagem de erro semelhante à seguinte:
... in <module> run()
... in run | beam.Map('count', lambda (word, ones): (word, sum(ones))))
... in __or__ return self.pipeline.apply(ptransform, self)
... in apply transform.type_check_inputs(pvalueish)
... in type_check_inputs self.type_check_inputs_or_outputs(pvalueish, 'input')
... in type_check_inputs_or_outputs pvalue_.element_type))
google.cloud.dataflow.typehints.decorators.TypeCheckError: Input type hint violation at group: expected Tuple[TypeVariable[K], TypeVariable[V]], got <type 'str'>
Ir
Por exemplo, se o seu pipeline usar um `DoFn` que não recebe nenhuma entrada, ocorre uma mensagem de erro semelhante à seguinte:
... panic: Method ProcessElement in DoFn main.extractFn is missing all inputs. A main input is required. ... Full error: ... inserting ParDo in scope root/CountWords ... graph.AsDoFn: for Fn named main.extractFn ... ProcessElement method has no main inputs ... goroutine 1 [running]: ... github.com/apache/beam/sdks/v2/go/pkg/beam.MustN(...) ... (more stacktrace)
Se encontrar um erro deste tipo, verifique o código do pipeline para garantir que as operações do pipeline são legais.
Detete erros na validação de tarefas do Dataflow
Assim que o serviço Dataflow receber o gráfico do pipeline, o serviço tenta validar a tarefa. Esta validação inclui o seguinte:
- Certificar-se de que o serviço pode aceder aos contentores do Cloud Storage associados ao seu trabalho para preparação de ficheiros e saída temporária.
- A verificar as autorizações necessárias no seu Google Cloud projeto.
- Garantir que o serviço consegue aceder a origens de entrada e saída, como ficheiros.
Se a tarefa falhar no processo de validação, é apresentada uma mensagem de erro na interface de monitorização do fluxo de dados, bem como na janela da consola ou do terminal se estiver a usar a execução de bloqueio. A mensagem de erro é semelhante à seguinte:
Java
INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_18_59_25-16868399470801620798
Submitted job: 2016-03-08_18_59_25-16868399470801620798
...
... Starting 3 workers...
... Executing operation BigQuery-Read+AnonymousParDo+BigQuery-Write
... Executing BigQuery import job "dataflow_job_16868399470801619475".
... Stopping worker pool...
... Workflow failed. Causes: ...BigQuery-Read+AnonymousParDo+BigQuery-Write failed.
Causes: ... BigQuery getting table "non_existent_table" from dataset "cws_demo" in project "my_project" failed.
Message: Not found: Table x:cws_demo.non_existent_table HTTP Code: 404
... Worker pool stopped.
... com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
INFO: Job finished with status FAILED
Exception in thread "main" com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException:
Job 2016-03-08_18_59_25-16868399470801620798 failed with status FAILED
at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:155)
at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:56)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
at com.google.cloud.dataflow.integration.BigQueryCopyTableExample.main(BigQueryCopyTableExample.java:74)
Python
INFO:root:Created job with id: [2016-03-08_14_12_01-2117248033993412477] ... Checking required Cloud APIs are enabled. ... Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_RUNNING. ... Combiner lifting skipped for step group: GroupByKey not followed by a combiner. ... Expanding GroupByKey operations into optimizable parts. ... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns ... Annotating graph with Autotuner information. ... Fusing adjacent ParDo, Read, Write, and Flatten operations ... Fusing consumer split into read ... ... Starting 1 workers... ... ... Executing operation read+split+pair_with_one+group/Reify+group/Write ... Executing failure step failure14 ... Workflow failed. Causes: ... read+split+pair_with_one+group/Reify+group/Write failed. Causes: ... Unable to view metadata for files: gs://dataflow-samples/shakespeare/missing.txt. ... Cleaning up. ... Tearing down pending resources... INFO:root:Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_FAILED.
Ir
A validação de tarefas descrita nesta secção não é atualmente suportada para o Go. Os erros devido a estes problemas aparecem como exceções de trabalhadores.
Detete uma exceção no código do trabalhador
Enquanto a tarefa está em execução, pode encontrar erros ou exceções no código do trabalhador. Geralmente, estes erros significam que os DoFns no seu código de pipeline geraram exceções não processadas, o que resulta em tarefas falhadas no seu trabalho do Dataflow.
As exceções no código do utilizador (por exemplo, as suas instâncias DoFn) são comunicadas na
interface de monitorização do fluxo de dados.
Se executar o pipeline com a execução de bloqueio, as mensagens de erro são apresentadas na janela da consola ou do terminal, como as seguintes:
Java
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/example_project/dataflow/job/2017-05-23_14_02_46-1117850763061203461
Submitted job: 2017-05-23_14_02_46-1117850763061203461
...
... To cancel the job using the 'gcloud' tool, run: gcloud beta dataflow jobs --project=example_project cancel 2017-05-23_14_02_46-1117850763061203461
... Autoscaling is enabled for job 2017-05-23_14_02_46-1117850763061203461.
... The number of workers will be between 1 and 15.
... Autoscaling was automatically enabled for job 2017-05-23_14_02_46-1117850763061203461.
...
... Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoads/GetTempFilePrefix+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Reify+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Write+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write
... Workers have started successfully.
...
... org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2017-05-23T21:06:33.711Z: (c14bab21d699a182): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ArithmeticException: / by zero
at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:146)
at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowAndCombineFn.closeWindow(BatchGroupAlsoByWindowAndCombineFn.java:191)
...
... Cleaning up.
... Stopping worker pool...
... Worker pool stopped.
Python
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING. ... INFO:root:... Expanding GroupByKey operations into optimizable parts. INFO:root:... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns INFO:root:... Annotating graph with Autotuner information. INFO:root:... Fusing adjacent ParDo, Read, Write, and Flatten operations ... INFO:root:...: Starting 1 workers... INFO:root:...: Executing operation group/Create INFO:root:...: Value "group/Session" materialized. INFO:root:...: Executing operation read+split+pair_with_one+group/Reify+group/Write INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING. INFO:root:...: ...: Workers have started successfully. INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING. INFO:root:...: Traceback (most recent call last): File ".../dataflow_worker/batchworker.py", line 384, in do_work self.current_executor.execute(work_item.map_task) ... File ".../apache_beam/examples/wordcount.runfiles/py/apache_beam/examples/wordcount.py", line 73, in <lambda> ValueError: invalid literal for int() with base 10: 'www'
Ir
... 2022-05-26T18:32:52.752315397Zprocess bundle failed for instruction ... process_bundle-4031463614776698457-2 using plan s02-6 : while executing ... Process for Plan[s02-6] failed: Oh no! This is an error message!
Considere proteger-se contra erros no seu código adicionando processadores de exceções. Por exemplo, se quiser ignorar elementos que falham alguma validação de entrada personalizada feita num ParDo, processe a exceção no seu DoFn e ignore o elemento.
Também pode acompanhar os elementos com falhas de várias formas:
- Pode registar os elementos com falhas e verificar o resultado através do Cloud Logging.
- Pode verificar os registos de início do worker e do worker do Dataflow para ver avisos ou erros seguindo as instruções em Ver registos.
- Pode fazer com que o
ParDoescreva os elementos com falhas num resultado adicional para inspeção posterior.
Para acompanhar as propriedades de um pipeline em execução, pode usar a classe Metrics, conforme mostrado no exemplo seguinte:
Java
final Counter counter = Metrics.counter("stats", "even-items"); PCollection<Integer> input = pipeline.apply(...); ... input.apply(ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { if (c.element() % 2 == 0) { counter.inc(); } });
Python
class FilterTextFn(beam.DoFn): """A DoFn that filters for a specific key based on a regex.""" def __init__(self, pattern): self.pattern = pattern # A custom metric can track values in your pipeline as it runs. Create # custom metrics to count unmatched words, and know the distribution of # word lengths in the input PCollection. self.word_len_dist = Metrics.distribution(self.__class__, 'word_len_dist') self.unmatched_words = Metrics.counter(self.__class__, 'unmatched_words') def process(self, element): word = element self.word_len_dist.update(len(word)) if re.match(self.pattern, word): yield element else: self.unmatched_words.inc() filtered_words = ( words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*')))
Ir
func addMetricDoFnToPipeline(s beam.Scope, input beam.PCollection) beam.PCollection { return beam.ParDo(s, &MyMetricsDoFn{}, input) } func executePipelineAndGetMetrics(ctx context.Context, p *beam.Pipeline) (metrics.QueryResults, error) { pr, err := beam.Run(ctx, runner, p) if err != nil { return metrics.QueryResults{}, err } // Request the metric called "counter1" in namespace called "namespace" ms := pr.Metrics().Query(func(r beam.MetricResult) bool { return r.Namespace() == "namespace" && r.Name() == "counter1" }) // Print the metric value - there should be only one line because there is // only one metric called "counter1" in the namespace called "namespace" for _, c := range ms.Counters() { fmt.Println(c.Namespace(), "-", c.Name(), ":", c.Committed) } return ms, nil } type MyMetricsDoFn struct { counter beam.Counter } func init() { beam.RegisterType(reflect.TypeOf((*MyMetricsDoFn)(nil))) } func (fn *MyMetricsDoFn) Setup() { // While metrics can be defined in package scope or dynamically // it's most efficient to include them in the DoFn. fn.counter = beam.NewCounter("namespace", "counter1") } func (fn *MyMetricsDoFn) ProcessElement(ctx context.Context, v beam.V, emit func(beam.V)) { // count the elements fn.counter.Inc(ctx, 1) emit(v) }
Resolva problemas de pipelines de execução lenta ou falta de saída
Consulte as seguintes páginas:
- Resolva problemas de trabalhos de streaming lentos ou bloqueados.
- Resolva problemas de trabalhos em lote lentos ou bloqueados.
Erros comuns e cursos de ação
Quando souber o erro que causou a falha do pipeline, consulte a página Resolva problemas de erros do Dataflow para obter orientações sobre a resolução de problemas de erros.