Ler do Pub/Sub para o Dataflow

Esta página descreve as práticas recomendadas para a leitura a partir do Pub/Sub no Dataflow.

O Apache Beam fornece uma implementação de referência do conetor de E/S do Pub/Sub para utilização por executores que não sejam do Dataflow. No entanto, o executor do Dataflow usa a sua própria implementação personalizada do conector. Esta implementação tira partido das APIs e dos serviços internos da Google Cloud Platform para oferecer marcas de água de baixa latência, elevada precisão das marcas de água e eliminação de duplicados eficiente para o processamento de mensagens exatamente uma vez. O conetor está disponível para Java, Python e Go.

Processamento exatamente uma vez

O Pub/Sub desassocia os publicadores de eventos dos consumidores de eventos. A aplicação publica mensagens num tópico e o Pub/Sub envia as mensagens de forma assíncrona aos subscritores.

O Pub/Sub atribui um ID de mensagem exclusivo a cada mensagem publicada com êxito num tópico. Por predefinição, o Pub/Sub executa a entrega de mensagens pelo menos uma vez. Para alcançar a semântica de, pelo menos, uma vez, se o Pub/Sub não receber a confirmação do subscritor dentro do prazo de confirmação, tenta novamente o envio da mensagem. As novas tentativas também podem ocorrer antes do prazo de confirmação ou depois de uma mensagem ter sido confirmada.

O Dataflow confirma as mensagens depois de serem processadas com êxito pela primeira fase unida e os efeitos secundários desse processamento terem sido escritos no armazenamento persistente. Para reduzir o número de mensagens duplicadas, o Dataflow prolonga continuamente o prazo de confirmação enquanto um lote de mensagens está a ser processado nesta fase.

Uma vez que o Pub/Sub pode voltar a entregar uma mensagem, é possível que cheguem mensagens duplicadas ao pipeline. Se o pipeline do Dataflow usar o modo de streaming exatamente uma vez, o Dataflow remove as duplicados destas mensagens para alcançar a semântica exatamente uma vez.

Se o seu pipeline puder tolerar alguns registos duplicados, considere usar o modo de streaming, pelo menos, uma vez. Este modo pode reduzir significativamente a latência e o custo total do seu pipeline. A desvantagem é que as mensagens duplicadas podem ser processadas duas vezes. Para mais informações, consulte o artigo Escolha o modo de streaming a usar.

Remova duplicados por atributo de mensagem

Por predefinição, o Dataflow remove duplicados com base no ID da mensagem. No entanto, uma aplicação pode enviar o mesmo registo duas vezes como duas mensagens do Pub/Sub distintas. Por exemplo, os dados de origem originais podem conter registos duplicados ou a aplicação pode publicar incorretamente a mesma mensagem duas vezes. Isto pode acontecer devido a novas tentativas, se a confirmação tiver sido ignorada devido a problemas de rede ou outras interrupções. Nestas situações, as mensagens duplicadas têm IDs de mensagens diferentes.

Consoante o seu cenário, os dados podem conter um campo único que pode ser usado para remover duplicados. Por exemplo, os registos podem conter um ID da transação exclusivo. Pode configurar o conector de E/S do Pub/Sub para remover mensagens duplicadas com base no valor de um atributo de mensagem, em vez de usar o ID da mensagem do Pub/Sub. Desde que o publicador defina este atributo de forma consistente durante as novas tentativas, o Dataflow pode detetar os duplicados. As mensagens têm de ser publicadas no Pub/Sub no prazo de 10 minutos umas das outras para a remoção de duplicados.

Para mais informações sobre a utilização de atributos ID, consulte os seguintes tópicos de referência do SDK:

Subscrições

Quando configura o pipeline, especifica um tópico do Pub/Sub ou uma subscrição do Pub/Sub a partir da qual ler. Se especificar uma subscrição, não use a mesma subscrição do Pub/Sub para vários pipelines. Se dois pipelines lerem a partir de uma única subscrição, cada pipeline recebe parte dos dados de forma não determinística, o que pode causar mensagens duplicadas, atraso da marca d'água e escalabilidade automática ineficiente. Em alternativa, crie uma subscrição separada para cada pipeline.

Se especificar um tópico, o conetor cria uma nova subscrição temporária. Esta subscrição é exclusiva por pipeline.

Indicações de tempo e marcas de água

Todas as mensagens do Pub/Sub têm uma data/hora, que representa a hora em que o Pub/Sub recebe a mensagem. Os seus dados também podem ter uma data/hora do evento, que é a hora em que o registo foi gerado pela origem.

Pode configurar o conetor para ler a data/hora do evento a partir de um atributo na mensagem do Pub/Sub. Nesse caso, o conetor usa a data/hora do evento para a marca d'água. Caso contrário, usa por predefinição a data/hora da mensagem do Pub/Sub.

Para mais informações sobre a utilização de datas/horas dos eventos, consulte os seguintes tópicos de referência do SDK:

O conetor do Pub/Sub tem acesso à API privada do Pub/Sub que indica a antiguidade da mensagem não reconhecida mais antiga numa subscrição. Esta API oferece uma latência inferior à disponível no Cloud Monitoring. Permite que o Dataflow avance as marcas cronológicas dos pipelines e emita resultados de computação em janelas com latências baixas.

Se configurar o conetor para usar as datas/horas dos eventos, o Dataflow cria uma segunda subscrição do Pub/Sub, denominada subscrição de acompanhamento. O fluxo de dados usa a subscrição de acompanhamento para inspecionar as horas dos eventos das mensagens que ainda estão na fila de espera. Esta abordagem permite que o Dataflow estime o atraso de tempo do evento com precisão. A conta de serviço do trabalhador tem de ter, pelo menos, as seguintes autorizações no projeto que contém a subscrição de acompanhamento:

  • pubsub.subscriptions.create
  • pubsub.subscription.consume
  • pubsub.subscription.delete

Além disso, precisa da autorização pubsub.topics.attachSubscription no tópico Pub/Sub. Recomendamos que crie uma função de gestão de identidade e de acesso personalizada que contenha apenas estas autorizações.

Para mais informações sobre marcas de água, consulte a página do StackOverflow que aborda como o Dataflow calcula as marcas de água do Pub/Sub.

Se um pipeline tiver várias origens do Pub/Sub e uma delas tiver um volume muito baixo ou estiver inativa, atrasa o avanço da marca d'água completa, o que aumenta a latência geral do pipeline. Se existirem temporizadores ou agregações de janelas no pipeline com base na marca de água, estes também são atrasados.

Pub/Sub Seek

O Pub/Sub Seek permite que os utilizadores reproduzam mensagens reconhecidas anteriormente. Pode usar o Pub/Sub Seek com o Dataflow para voltar a processar mensagens num pipeline.

No entanto, não é recomendável usar o Pub/Sub Seek num pipeline em execução. A procura para trás numa pipeline em execução pode originar mensagens duplicadas ou mensagens ignoradas. Também invalida a lógica de marca d'água do Dataflow e entra em conflito com o estado de um pipeline que incorpora dados processados.

Para voltar a processar mensagens através da funcionalidade Seek do Pub/Sub, recomendamos o seguinte fluxo de trabalho:

  1. Crie um instantâneo da subscrição.
  2. Crie uma nova subscrição para o tópico Pub/Sub. A nova subscrição herda o resumo.
  3. Esvazie ou cancele a tarefa do Dataflow atual.
  4. Reenvie o pipeline com a nova subscrição.

Para mais informações, consulte o artigo Reprocessamento de mensagens com a funcionalidade Snapshot e Seek do Pub/Sub.

Paralelismo da origem do Pub/Sub

A origem do Pub/Sub atribui a cada mensagem uma chave determinística para processamento e usa essas chaves para misturar as mensagens. Para tarefas do Streaming Engine, são usadas 1024 chaves para a mistura. Para tarefas do Streaming Engine, o número de chaves é a potência de 2 mais baixa superior a (4 * maximum workers).

Para substituir o número predefinido de chaves de aleatorização, defina a num_pubsub_keys opção de serviço:

Java

--dataflowServiceOptions=num_pubsub_keys=NUMBER_OF_KEYS

Python

--dataflow_service_options=num_pubsub_keys=NUMBER_OF_KEYS

Ir

--dataflow_service_options=num_pubsub_keys=NUMBER_OF_KEYS

Substitua NUMBER_OF_KEYS pelo número de chaves. É usada a próxima potência de 2 superior ou igual ao valor especificado.

Por exemplo, pode definir esta opção nas seguintes situações:

Se definir esta opção, considere as compensações descritas em Paralelização e distribuição.

Não pode alterar o número de chaves como parte de uma atualização da pipeline. Para alterar o número de chaves de uma tarefa de pipeline existente, tem de iniciar uma nova tarefa.

Funcionalidades do Pub/Sub não suportadas

As seguintes funcionalidades do Pub/Sub não são suportadas na implementação do conector de E/S do Pub/Sub do executor do Dataflow.

Retirada exponencial

Quando cria uma subscrição do Pub/Sub, pode configurá-la para usar uma política de repetição de retirada exponencial. No entanto, a retirada exponencial não funciona com o Dataflow. Em alternativa, crie a subscrição com a política de repetição Tentar novamente imediatamente.

O recuo exponencial é acionado por uma confirmação negativa ou quando o prazo de confirmação expira. No entanto, o Dataflow não envia confirmações negativas quando o código do pipeline falha. Em vez disso, tenta processar a mensagem indefinidamente, enquanto prolonga continuamente o prazo de confirmação da mensagem.

Tópicos de mensagens não entregues

Não use tópicos de mensagens não entregues do Pub/Sub com o Dataflow pelos seguintes motivos:

  • O Dataflow envia reconhecimentos negativos por vários motivos internos (por exemplo, se um trabalhador estiver a ser encerrado). Como resultado, as mensagens podem ser entregues no tópico de mensagens rejeitadas, mesmo quando não ocorrem falhas no código do pipeline.

  • O Dataflow confirma as mensagens depois de um conjunto de mensagens ser processado com êxito pela primeira fase unida. Se o pipeline tiver várias fases unidas e ocorrerem falhas em qualquer ponto após a primeira fase, as mensagens já são confirmadas e não são enviadas para o tópico de mensagens rejeitadas.

Em alternativa, implemente o padrão de mensagens não entregues explicitamente no pipeline, encaminhando as mensagens com falhas para um destino para processamento posterior. Alguns destinos de E/S têm suporte incorporado para filas de mensagens rejeitadas. Os exemplos seguintes implementam padrões de mensagens não entregues:

Entrega exatamente uma vez do Pub/Sub

Uma vez que o Dataflow tem os seus próprios mecanismos para o processamento exatamente uma vez, não é recomendável usar a entrega exatamente uma vez do Pub/Sub com o Dataflow. A ativação da entrega exatamente uma vez do Pub/Sub reduz o desempenho do pipeline, porque limita o número de mensagens que estão disponíveis para processamento paralelo.

Ordenação de mensagens do Pub/Sub

A ordenação de mensagens é uma funcionalidade no Pub/Sub que permite a um subscritor receber mensagens na ordem em que foram publicadas.

Não é recomendado usar a ordenação de mensagens com o Dataflow pelos seguintes motivos:

  • O conetor de E/S do Pub/Sub pode não preservar a ordem das mensagens.
  • O Apache Beam não define diretrizes rigorosas relativamente à ordem em que os elementos são processados. Por conseguinte, a ordem pode não ser preservada nas transformações posteriores.
  • A utilização da ordenação de mensagens do Pub/Sub com o Dataflow pode aumentar a latência e diminuir o desempenho.

Transformações de mensagens únicas do Pub/Sub

As transformações de mensagens únicas (SMTs) permitem-lhe manipular, validar e filtrar mensagens com base nos respetivos atributos ou dados à medida que são transmitidas através do sistema. As subscrições que alimentam o Dataflow não devem usar SMTs que filtrem mensagens, pois podem interferir com o ajuste de escala automático. Isto acontece porque a filtragem SMT de subscrições pode fazer com que o registo pendente pareça maior do que o que é enviado para o Dataflow até que as mensagens filtradas sejam efetivamente processadas pelo SMT. Os SMTs de tópicos que filtram mensagens não causam problemas com o dimensionamento automático.

O que se segue?