Esta página oferece orientações e recomendações para atualizar os seus pipelines de streaming. Por exemplo, pode ter de atualizar para uma versão mais recente do SDK Apache Beam ou querer atualizar o código do pipeline. São disponibilizadas diferentes opções para se adequarem a diferentes cenários.
Enquanto os pipelines em lote param quando a tarefa é concluída, os pipelines de streaming são frequentemente executados continuamente para oferecer um processamento ininterrupto. Por conseguinte, quando atualiza os pipelines de streaming, tem de ter em conta as seguintes considerações:
- Pode ter de minimizar ou evitar a interrupção do pipeline. Em alguns casos, pode tolerar uma interrupção temporária do processamento enquanto é implementada uma nova versão de um pipeline. Noutros casos, a sua aplicação pode não conseguir tolerar nenhuma interrupção.
- Os processos de atualização do pipeline têm de processar as alterações ao esquema de forma a minimizar a interrupção do processamento de mensagens e de outros sistemas anexos. Por exemplo, se o esquema das mensagens num pipeline de processamento de eventos mudar, também podem ser necessárias alterações ao esquema nos destinos de dados a jusante.
Pode usar um dos seguintes métodos para atualizar pipelines de streaming, consoante os requisitos do pipeline e de atualização:
Para mais informações sobre problemas que pode encontrar durante uma atualização e como evitá-los, consulte os artigos Validar uma tarefa de substituição e Verificação da compatibilidade da tarefa.
Práticas recomendadas
- Atualize a versão do SDK do Apache Beam separadamente de quaisquer alterações ao código do pipeline.
- Teste o pipeline após cada alteração antes de fazer atualizações adicionais.
- Atualize regularmente a versão do SDK do Apache Beam que a sua pipeline usa.
- Use métodos automáticos sempre que possível, como atualizações em tempo real ou atualizações automáticas de pipelines paralelos.
Realize atualizações em voo
Pode atualizar alguns pipelines de streaming em curso sem parar a tarefa. Este cenário é denominado atualização de tarefa em curso. As atualizações de tarefas em voo só estão disponíveis em circunstâncias limitadas:
- A tarefa tem de usar o Streaming Engine.
- A tarefa tem de estar no estado de execução.
- Só está a alterar o número de trabalhadores que a tarefa usa.
Para mais informações, consulte o artigo Defina o intervalo de dimensionamento automático na página Dimensionamento automático horizontal.
Para ver instruções que explicam como fazer uma atualização de tarefa em curso, consulte o artigo Atualize um pipeline existente.
Inicie uma tarefa de substituição
Se a tarefa atualizada for compatível com a tarefa existente, pode atualizar o pipeline através da opção update. Quando substitui uma tarefa existente, uma nova tarefa executa o código do pipeline atualizado.
O serviço Dataflow retém o nome da tarefa, mas executa a tarefa de substituição com um ID da tarefa atualizado. Este processo pode causar tempo de inatividade enquanto a tarefa existente é interrompida, a verificação de compatibilidade é executada e a nova tarefa é iniciada. Para mais detalhes, consulte o artigo
Efeitos da substituição de uma tarefa.
O Dataflow realiza uma verificação de compatibilidade para garantir que o código do pipeline atualizado pode ser implementado em segurança no pipeline em execução. Determinadas alterações de código fazem com que a verificação de compatibilidade falhe, como quando as entradas laterais são adicionadas ou removidas de um passo existente. Quando a verificação de compatibilidade falha, não pode fazer uma atualização do trabalho no local.
Para ver instruções que explicam como iniciar uma tarefa de substituição, consulte o artigo Inicie uma tarefa de substituição.
Se a atualização do pipeline for incompatível com a tarefa atual, tem de parar e substituir o pipeline. Se o seu pipeline não tolerar tempo de inatividade, execute pipelines paralelos.
Pare e substitua condutas
Se puder interromper temporariamente o processamento, pode cancelar ou esvaziar o pipeline e, em seguida, substituí-lo pelo pipeline atualizado. O cancelamento de um pipeline faz com que o Dataflow pare imediatamente o processamento e encerre os recursos o mais rapidamente possível, o que pode causar alguma perda de dados que estão a ser processados, conhecidos como dados em trânsito. Para evitar a perda de dados, na maioria dos casos, a ação preferencial é a eliminação gradual. Também pode usar as capturas instantâneas do Dataflow para guardar o estado de um pipeline de streaming, o que lhe permite iniciar uma nova versão da tarefa do Dataflow sem perder o estado. Para mais informações, consulte o artigo Use instantâneos do Dataflow.
A drenagem de um pipeline fecha imediatamente todas as janelas em processamento e aciona todos os desencadeadores. Embora os dados em curso não sejam perdidos, a eliminação pode fazer com que as janelas tenham dados incompletos. Se isto acontecer, as janelas em processamento emitem resultados parciais ou incompletos. Para mais informações, consulte o artigo Efeitos da anulação de um trabalho. Após a conclusão da tarefa existente, inicie uma nova tarefa de streaming que contenha o código do pipeline atualizado, o que permite a retoma do processamento.
Com este método, incorre em algum tempo de inatividade entre o momento em que a tarefa de streaming existente para e o momento em que o pipeline de substituição está pronto para retomar o processamento de dados. No entanto, cancelar ou esgotar um pipeline existente e, em seguida, iniciar uma nova tarefa com o pipeline atualizado é menos complicado do que executar pipelines paralelos.
Para instruções mais detalhadas, consulte o artigo Drene uma tarefa do Dataflow. Depois de esgotar a tarefa atual, inicie uma nova tarefa com o mesmo nome.
Reprocessamento de mensagens com o instantâneo e a procura do Pub/Sub
Em algumas situações, depois de substituir ou cancelar um pipeline esgotado, pode ter de voltar a processar mensagens do Pub/Sub entregues anteriormente. Por exemplo, pode ter de usar uma lógica empresarial atualizada para voltar a processar os dados. O Pub/Sub Seek é uma funcionalidade que lhe permite repetir mensagens a partir de um instantâneo do Pub/Sub. Pode usar o Pub/Sub Seek com o Dataflow para voltar a processar mensagens a partir do momento em que a captura instantânea da subscrição é criada.
Durante o desenvolvimento e os testes, também pode usar o Pub/Sub Seek para repetir as mensagens conhecidas várias vezes para validar o resultado do seu pipeline. Quando usar o Pub/Sub Seek, não procure uma captura instantânea de subscrição quando a subscrição estiver a ser consumida por um pipeline. Se o fizer, a procura pode invalidar a lógica de marca d'água do Dataflow e pode afetar o processamento exatamente uma vez das mensagens do Pub/Sub.
Um fluxo de trabalho recomendado da CLI gcloud para usar o Pub/Sub Seek com pipelines do Dataflow numa janela do terminal é o seguinte:
Para criar uma captura instantânea da subscrição, use o comando
gcloud pubsub snapshots create:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
Para esvaziar ou cancelar o pipeline, use o comando
gcloud dataflow jobs drainou o comandogcloud dataflow jobs cancel:gcloud dataflow jobs drain JOB_ID
ou
gcloud dataflow jobs cancel JOB_ID
Para procurar o momento da captura de ecrã, use o comando
gcloud pubsub subscriptions seek:gcloud pubsub subscriptions seek SNAPSHOT_NAME
Implemente um novo pipeline que consuma a subscrição.
Execute pipelines em paralelo
Se precisar de evitar interrupções no pipeline de streaming durante uma atualização, pode executar pipelines paralelos. Esta abordagem permite-lhe iniciar uma nova tarefa de streaming com o código do pipeline atualizado e executá-la em paralelo com a tarefa existente. Pode usar o fluxo de trabalho de implementação de atualização de pipeline paralelo automatizado do Dataflow ou executar os passos manualmente.
Vista geral dos pipelines paralelos
Quando criar o novo pipeline, use a mesma estratégia de janelas que usou para o pipeline existente. Para o fluxo de trabalho manual, deixe o pipeline existente continuar a ser executado até que a respetiva marca de água exceda a data/hora da janela completa mais antiga processada pelo pipeline atualizado. Em seguida, esvazie ou cancele o pipeline existente. Se usar o fluxo de trabalho automatizado, este trabalho é feito por si. O pipeline atualizado continua a ser executado no seu lugar e assume o processamento de forma eficaz.
O diagrama seguinte ilustra este processo.
No diagrama, Pipeline B é a tarefa atualizada que substitui Pipeline A. O valor t é a data/hora da janela completa mais antiga processada pelo pipeline B. O valor w é a marca de água para o Pipeline A. Para simplificar, assume-se uma marca cronológica perfeita sem dados tardios. O processamento e o tempo real são representados no eixo horizontal. Ambas as pipelines usam janelas fixas (rotativas) de 5 minutos. Os resultados são acionados depois de a marca de água passar o fim de cada janela.
Uma vez que a saída simultânea ocorre durante o período em que os dois pipelines se sobrepõem, configure os dois pipelines para escrever resultados em destinos diferentes. Os sistemas a jusante podem, então, usar uma abstração sobre os dois destinos de dados, como uma vista de base de dados, para consultar os resultados combinados. Estes sistemas também podem usar a abstração para remover resultados duplicados do período sobreposto. Para mais informações, consulte o artigo Processe a saída duplicada.
Limitações
A utilização de atualizações de pipelines paralelas automáticas ou manuais tem as seguintes limitações:
- Apenas atualizações automáticas: a nova tarefa paralela tem de ser uma tarefa do Streaming Engine.
- Os nomes das tarefas antigas e novas têm de ser diferentes porque não são permitidas tarefas concorrentes com o mesmo nome.
- A execução de dois pipelines em paralelo na mesma entrada pode originar dados duplicados, agregações parciais e potenciais problemas de ordenação quando os dados são inseridos no destino. O sistema a jusante tem de ser concebido para antecipar e gerir estes resultados.
- Quando lê a partir de uma origem do Pub/Sub, não é recomendável usar a mesma subscrição para vários pipelines, pois pode levar a problemas de correção. No entanto, em alguns exemplos de utilização, como pipelines de extração, transformação e carregamento (ETL), a utilização da mesma subscrição em dois pipelines pode reduzir a duplicação. É provável que existam problemas com o dimensionamento automático sempre que fornecer um valor diferente de zero para a duração sobreposta. Pode mitigar esta situação usando a funcionalidade de atualização de tarefas em curso. Para mais informações, consulte o artigo Ajuste a escala automática para os pipelines de streaming do Pub/Sub.
- Para o Apache Kafka, pode minimizar os duplicados ativando a confirmação de deslocamentos no Kafka. Para ativar a confirmação de desvio no Kafka, consulte o artigo Confirme novamente no Kafka.
Atualizações automatizadas de pipelines paralelos
O Dataflow oferece suporte de API para iniciar uma tarefa de substituição paralela. Esta API de estilo declarativo abstrai o trabalho manual de executar passos processuais. Declara a tarefa que quer atualizar e, em seguida, é executada uma nova tarefa em paralelo com a tarefa antiga. Depois de a nova tarefa ser executada durante o período especificado, a tarefa antiga é esgotada. Esta funcionalidade elimina as pausas de processamento durante as atualizações e reduz o esforço operacional necessário para atualizar pipelines incompatíveis.
Este método de atualização é mais adequado para pipelines que podem tolerar alguns duplicados ou agregações parciais e não requerem uma ordenação rigorosa durante a inserção de dados. É adequado para pipelines ETL, bem como pipelines que usam o modo de streaming, pelo menos, uma vez e a transformação Redistribute com a opção "Permitir duplicados" definida como true.
Envie um pedido de atualização de pipeline paralelo automatizado
Para usar o fluxo de trabalho automatizado, inicie uma nova tarefa de streaming com as seguintes opções de serviço. Tem de iniciar a nova tarefa com um nome diferente do da tarefa antiga.
Java
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Em alternativa, pode especificar o ID da tarefa antiga:
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Python
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Em alternativa, pode especificar o ID da tarefa antiga:
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Go
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Em alternativa, pode especificar o ID da tarefa antiga:
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
gcloud
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Em alternativa, pode especificar o ID da tarefa antiga:
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Substitua as seguintes variáveis:
- Tem de fornecer
parallel_replace_job_nameouparallel_replace_job_idpara identificar a tarefa a substituir.OLD_JOB_NAME: se usarparallel_replace_job_name, o nome da tarefa a substituir.OLD_JOB_ID: se usarparallel_replace_job_id, o ID da tarefa a substituir.
Tem de fornecer um valor.
parallel_replace_job_min_parallel_pipelines_durationDURATION: o período mínimo durante o qual os dois pipelines são executados em paralelo como um número inteiro ou de vírgula flutuante. Após este período, é enviado um sinal de esgotamento para a tarefa antiga.A duração tem de estar entre 0 segundos (
0s) e 31 dias (744h). Uses,mehpara especificar segundos, minutos e horas. Por exemplo,10mé 10 minutos.
Quando inicia a nova tarefa, o Dataflow aguarda que todos os trabalhadores sejam aprovisionados antes de começar a processar os dados. Para monitorizar o estado da implementação, verifique os registos de tarefas do Dataflow.
Execute pipelines paralelos manualmente
Para cenários mais complexos ou quando precisa de mais controlo sobre o processo de atualização, pode executar manualmente pipelines paralelos. Permita que o pipeline existente continue a ser executado até que a respetiva marca de água exceda a data/hora da janela completa mais antiga processada pelo pipeline atualizado. Em seguida, esvazie ou cancele o pipeline existente.
Faça a gestão de resultados duplicados
O exemplo seguinte descreve uma abordagem para processar a saída duplicada. Os dois pipelines escrevem a saída em destinos diferentes, usam sistemas a jusante para consultar os resultados e removem duplicados dos resultados do período de sobreposição. Este exemplo usa um pipeline que lê dados de entrada do Pub/Sub, realiza algum processamento e escreve os resultados no BigQuery.
No estado inicial, o pipeline de streaming existente (Pipeline A) está em execução e a ler mensagens de um tópico do Pub/Sub (Tópico) através de uma subscrição (Subscrição A). Os resultados são escritos numa tabela do BigQuery (tabela A). Os resultados são consumidos através de uma vista do BigQuery, que funciona como uma fachada para mascarar as alterações da tabela subjacente. Este processo é uma aplicação de um método de design denominado padrão de fachada. O diagrama seguinte mostra o estado inicial.
Crie uma nova subscrição (Subscrição B) para o pipeline atualizado. Implemente o pipeline atualizado (Pipeline B), que lê a partir do tópico do Pub/Sub (Tópico) através da Subscrição B e escreve numa tabela do BigQuery separada (Tabela B). O diagrama seguinte ilustra este fluxo.
Neste ponto, o pipeline A e o pipeline B estão a ser executados em paralelo e a escrever resultados em tabelas separadas. Regista a hora t como a data/hora da janela completa mais antiga processada pelo pipeline B.
Quando a marca de água do pipeline A exceder o tempo t, esvazie o pipeline A. Quando esvazia o pipeline, todas as janelas abertas são fechadas e o processamento dos dados em trânsito é concluído. Se o pipeline contiver janelas e as janelas completas forem importantes (partindo do princípio de que não existem dados atrasados), antes de esvaziar o pipeline A, deixe ambos os pipelines serem executados até ter janelas sobrepostas completas. Parar a tarefa de streaming para Pipeline A depois de todos os dados em trânsito serem processados e escritos na Tabela A. O diagrama seguinte mostra esta fase.
Neste ponto, apenas o Pipeline B está em execução. Pode consultar a partir de uma vista do BigQuery (vista de fachada), que funciona como uma fachada para a tabela A e a tabela B. Para as linhas que têm a mesma data/hora em ambas as tabelas, configure a vista para devolver as linhas da Tabela B ou, se as linhas não existirem na Tabela B, recorra à Tabela A. O diagrama seguinte mostra a vista (vista de fachada) a ler a partir da tabela A e da tabela B.
Neste momento, pode eliminar a Subscrição A.
Quando são detetados problemas com uma nova implementação de pipeline, ter pipelines paralelos pode simplificar o reverter. Neste exemplo, pode querer manter a Linha de processamento A em execução enquanto monitoriza a Linha de processamento B para verificar o funcionamento correto. Se ocorrerem problemas com o Pipeline B, pode reverter para o Pipeline A.
Faça a gestão de mutações de esquemas
Os sistemas de processamento de dados têm frequentemente de se adaptar às mutações do esquema ao longo do tempo, por vezes devido a alterações nos requisitos empresariais e outras vezes por motivos técnicos. Normalmente, a aplicação de atualizações de esquemas requer um planeamento e uma execução cuidadosos para evitar interrupções nos sistemas de informações empresariais.
Considere um pipeline que lê mensagens que contêm payloads JSON de um tópico do Pub/Sub. O pipeline converte cada mensagem numa instância de TableRow e, em seguida, escreve as linhas numa tabela do BigQuery. O esquema da tabela de saída é semelhante ao das mensagens processadas pelo pipeline.
No diagrama seguinte, o esquema é denominado Esquema A.
Ao longo do tempo, o esquema de mensagens pode sofrer mutações de formas não triviais. Por exemplo, os campos são adicionados, removidos ou substituídos. O esquema A evolui para um novo esquema. Na discussão que se segue, o novo esquema é denominado Esquema B. Neste caso, Pipeline A tem de ser atualizado e o esquema da tabela de saída tem de ser compatível com Schema B.
Para a tabela de saída, pode realizar algumas mutações de esquema sem tempo de inatividade.
Por exemplo, pode adicionar novos campos ou relaxar os modos de coluna, como alterar REQUIRED para NULLABLE, sem tempo de inatividade.
Normalmente, estas mutações não afetam as consultas existentes. No entanto, as mutações de esquema que modificam ou removem campos de esquema existentes interrompem as consultas ou resultam noutras interrupções. A seguinte abordagem acomoda as alterações sem
exigir tempo de inatividade.
Separe os dados escritos pelo pipeline numa tabela principal e numa ou mais tabelas de preparação. A tabela principal armazena dados do histórico escritos pelo pipeline. As tabelas de preparação armazenam a saída mais recente do pipeline. Pode definir uma vista de fachada do BigQuery sobre as tabelas principal e de preparação, o que permite aos consumidores consultar dados históricos e atualizados.
O diagrama seguinte revê o fluxo da pipeline anterior para incluir uma tabela de preparação (tabela de preparação A), uma tabela principal e uma vista de fachada.
No fluxo revisto, o pipeline A processa mensagens que usam o esquema A e escreve o resultado na tabela de preparação A, que tem um esquema compatível. A tabela principal contém dados do histórico escritos por versões anteriores do pipeline, bem como resultados que são periodicamente unidos a partir da tabela de preparação. Os consumidores podem consultar dados atualizados, incluindo dados históricos e em tempo real, através da vista de fachada.
Quando o esquema de mensagens muda de Esquema A para Esquema B, pode atualizar o código do pipeline para ser compatível com mensagens que usam o Esquema B. É necessário atualizar o pipeline existente com a nova implementação. Ao executar pipelines paralelos, pode garantir que o processamento de dados de streaming continua sem interrupções. A rescisão e a substituição de pipelines resultam numa interrupção do processamento, porque não é executada nenhuma pipeline durante um período de tempo.
O pipeline atualizado escreve numa tabela de preparação adicional (tabela de preparação B) que usa o esquema B. Pode usar um fluxo de trabalho orquestrado para criar a nova tabela de preparação antes de atualizar o pipeline. Atualizar a vista de fachada para incluir resultados da nova tabela de preparação, possivelmente usando um passo do fluxo de trabalho relacionado.
O diagrama seguinte mostra o fluxo atualizado que apresenta a tabela de preparação B com o esquema B e como a vista de fachada é atualizada para incluir conteúdo da tabela principal e de ambas as tabelas de preparação.
Como um processo separado da atualização do pipeline, pode unir as tabelas de preparação na tabela principal, periodicamente ou conforme necessário. O diagrama seguinte mostra como a tabela de preparação A é intercalada na tabela principal.
O que se segue?
- Encontre passos detalhados para atualizar um pipeline existente.