Fazer upgrade de um pipeline de streaming

Nesta página, você encontra orientações e recomendações para fazer um upgrade dos pipelines de streaming. Por exemplo, talvez seja necessário fazer um upgrade para uma versão mais recente do SDK do Apache Beam ou atualizar o código do pipeline. Diferentes opções são fornecidas para atender a diferentes cenários.

Ao contrário dos pipelines em lote, que são interrompidos quando o job é concluído, pipelines de streaming costumam ser executados continuamente para fornecer um processamento ininterrupto. Portanto, ao fazer o upgrade de pipelines de streaming, você precisa considerar o seguinte:

  • Talvez seja necessário minimizar ou evitar a interrupção do pipeline. Em alguns casos, é possível tolerar uma interrupção temporária no processamento enquanto uma nova versão de um pipeline é implantada. Em outros casos, o aplicativo pode não tolerar qualquer interrupção.
  • Os processos de atualização de pipeline precisam lidar com as alterações de esquema de uma maneira que minimize a interrupção do processamento de mensagens e para outros sistemas anexados. Por exemplo, se o esquema de mensagens em um pipeline de processamento de eventos for alterado, as alterações de esquema também poderão ser necessárias em coletores de dados downstream.

É possível usar um dos métodos a seguir para atualizar pipelines de streaming, dependendo do pipeline e dos requisitos de atualização:

Para mais informações sobre problemas que podem ser encontrados durante uma atualização e como evitá-los, consulte Validar um job substituto e Verificação de compatibilidade do job.

Práticas recomendadas

  • Faça o upgrade da versão do SDK do Apache Beam separadamente de qualquer alteração no código do pipeline.
  • Teste o pipeline após cada alteração antes de fazer outras atualizações.
  • Faça o upgrade regularmente da versão do SDK do Apache Beam usado pelo pipeline.
  • Use métodos automatizados sempre que possível, como atualizações em andamento ou atualizações automatizadas de pipeline paralelo.
  • Use E/S gerenciada sempre que possível para aproveitar os benefícios dos upgrades automáticos das versões do conector.

Realizar atualizações em andamento

É possível atualizar alguns pipelines de streaming em andamento sem interromper o job. Esse cenário é chamado de atualização de job em andamento. As atualizações de jobs em andamento estão disponíveis apenas em circunstâncias limitadas:

  • O job deve usar o Streaming Engine.
  • O job precisa estar no estado de execução.
  • Você está alterando apenas o número de workers usados pelo job

Para mais informações, consulte Definir o intervalo de escalonamento automático na página "Escalonamento automático horizontal".

Para instruções sobre como executar uma atualização de job em andamento, consulte Atualizar um pipeline atual.

Criação ou atualização automática (upsert) de modelos

Ao iniciar pipelines usando um modelo (modelos clássicos, modelos flexíveis, Terraform ou Config Connector), é possível usar o experimento create_or_update_job para usar a funcionalidade de criação ou atualização (upsert).

Quando você especifica create_or_update_job no parâmetro additional_experiments ou na flag additional-experiments:

  • Se já existir um job em execução ou em drenagem com o nome especificado, o serviço de modelos vai iniciar automaticamente o novo job como uma atualização do job atual.
  • Se não houver um job ativo com esse nome, o serviço de modelos vai iniciar o novo job como uma nova criação.

Esse experimento elimina a necessidade de determinar programaticamente se é preciso usar a ação de API de criação ou atualização ao iniciar um modelo.

Para exemplos de código do Terraform e do Config Connector que usam esse experimento, consulte as seções a seguir:

Iniciar um job de substituição

Se o job atualizado for compatível com o job atual, será possível atualizar o pipeline usando a opção update. Quando você substitui um job atual, um novo job executa o código atualizado do pipeline. O serviço do Dataflow retém o nome do job, mas executa o substituto com um código da tarefa atualizado. Esse processo pode causar inatividade enquanto o job atual é interrompido, a verificação de compatibilidade é executada e o novo job é iniciado. Para mais detalhes, consulte Os efeitos da substituição de um job.

O Dataflow executa uma verificação de compatibilidade para garantir que o código atualizado do pipeline possa ser implantado com segurança no pipeline em execução. Certas mudanças no código causam falhas na verificação de compatibilidade, como quando entradas secundárias são adicionadas ou removidas de uma etapa existente. Quando a verificação de compatibilidade falha, não é possível atualizar um job no local.

Para instruções explicando como iniciar um job substituto, consulte Iniciar um job substituto.

Se a atualização do pipeline for incompatível com o job atual, interrompa e substitua o pipeline. Se o pipeline não puder tolerar a inatividade, execute pipelines paralelos.

Interrupção e substituição manual

Para fazer uma interrupção e substituição manual, cancele ou drene o pipeline e substitua-o pelo pipeline atualizado. O cancelamento de um pipeline faz com que o Dataflow interrompa imediatamente o processamento e desligue os recursos o mais rápido possível, o que pode causar perda de dados em processamento, conhecidos como dados em trânsito. Para evitar a perda de dados, na maioria dos casos, a drenagem é preferível. Também é possível usar snapshots do Dataflow para salvar o estado de um pipeline de streaming, o que permite iniciar uma nova versão do job do Dataflow sem perder o estado. Para mais informações, consulte Usar snapshots do Dataflow.

A drenagem de um pipeline imediatamente fecha qualquer janela em processamento e aciona todos os gatilhos (links em inglês). Embora os dados em trânsito não sejam perdidos, a drenagem pode fazer com que as janelas tenham dados incompletos. Se isso acontecer, as janelas em processo emitem resultados parciais ou incompletos. Para mais informações, consulte Efeitos da drenagem de um job. Depois que o job atual for concluído, inicie um novo job de streaming que contenha o código atualizado do pipeline, permitindo a retomada do processamento.

Com esse método, há um período de inatividade entre o momento em que o job de streaming atual é interrompido e o momento em que o pipeline substituto está pronto para retomar o processamento de dados. No entanto, cancelar ou drenar um pipeline atual e iniciar um novo job com o pipeline atualizado é menos complicado do que executar pipelines paralelos.

Para mais informações, consulte Drenar um job do Dataflow. Depois de drenar o job atual, inicie um novo com o mesmo nome.

Parada e substituição automatizadas

O Dataflow oferece suporte à API para iniciar uma atualização automatizada de parada e substituição. Esse fluxo de trabalho declarativo elimina as etapas procedimentais manuais. Você declara o job a ser substituído, e o novo job é iniciado e coordena a transição automaticamente.

Ao usar esse fluxo de trabalho, novos recursos de job são provisionados enquanto o job antigo ainda está em execução. O job antigo recebe automaticamente um sinal de drenagem. Depois que o job antigo termina a drenagem ou atinge um tempo limite especificado pelo usuário, o novo job começa imediatamente a processar dados. Use esse fluxo de trabalho para pipelines que não podem tolerar dados duplicados ou agregações parciais, mas podem aceitar uma breve pausa no processamento enquanto o job antigo é drenado.

Enviar um pedido de atualização automática de parada e substituição

Para usar esse fluxo de trabalho:

  • É necessário definir a opção parallel_replace_job_max_stop_duration.
  • Não defina a opção parallel_replace_job_min_parallel_pipelines_duration. A definição de uma duração paralela aciona o fluxo de trabalho de atualizações automáticas de pipeline paralelo.

Envie uma solicitação de atualização automática de parada e substituição usando as seguintes opções de serviço:

Java

Opção 1: atualizar usando o mesmo nome de job

--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
  • Para realizar uma atualização automatizada de parada e substituição usando o mesmo nome, use a flag --update e a opção update_strategy_parallel_job_update.
  • Para fazer uma atualização no local, use update_strategy_in_place_update.

Opção 2: atualizar usando um nome de job diferente

--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DURATION"
  • Para especificar o job antigo por ID em vez de nome, use --dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID".
  • Se você especificar um novo nome de job e usar a flag --update, o Dataflow vai procurar um job com o novo nome, o que causa um erro.

Opcional: desativar o cancelamento automático

O cancelamento automático é ativado por padrão quando você especifica a opção parallel_replace_job_max_stop_duration. Para desativar o cancelamento automático, defina a opção parallel_replace_job_cancel_on_drain_timeout como false.

--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=false"

Se você desativar o cancelamento automático e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.

Python

Opção 1: atualizar usando o mesmo nome de job

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Para realizar uma atualização automatizada de parada e substituição usando o mesmo nome, use a flag --update e a opção update_strategy_parallel_job_update.
  • Para fazer uma atualização no local, use update_strategy_in_place_update.

Opção 2: atualizar usando um nome de job diferente

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Para especificar o job antigo por ID em vez de nome, use --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Se você especificar um novo nome de job e usar a flag --update, o Dataflow vai procurar um job com o novo nome, o que causa um erro.

Opcional: desativar o cancelamento automático

O cancelamento automático é ativado por padrão quando você especifica a opção parallel_replace_job_max_stop_duration. Para desativar o cancelamento automático, defina a opção parallel_replace_job_cancel_on_drain_timeout como false.

--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"

Se você desativar o cancelamento automático e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.

Go

Opção 1: atualizar usando o mesmo nome de job

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Para realizar uma atualização automatizada de parada e substituição usando o mesmo nome, use a flag --update e a opção update_strategy_parallel_job_update.
  • Para fazer uma atualização no local, use update_strategy_in_place_update.

Opção 2: atualizar usando um nome de job diferente

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_max_stop_duration=DURATION"
  • Para especificar o job antigo por ID em vez de nome, use --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Se você especificar um novo nome de job e usar a flag --update, o Dataflow vai procurar um job com o novo nome, o que causa um erro.

Opcional: desativar o cancelamento automático

O cancelamento automático é ativado por padrão quando você especifica a opção parallel_replace_job_max_stop_duration. Para desativar o cancelamento automático, defina a opção parallel_replace_job_cancel_on_drain_timeout como false.

--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=false"

Se você desativar o cancelamento automático e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.

gcloud

Opção 1: atualizar usando o mesmo nome de job

--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
  • Para realizar uma atualização automatizada de parada e substituição usando o mesmo nome, use a flag --update e a opção update_strategy_parallel_job_update.
  • Para fazer uma atualização no local, use update_strategy_in_place_update.

Opção 2: atualizar usando um nome de job diferente

--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_max_stop_duration=DURATION"
  • Para especificar o job antigo por ID em vez de nome, use --additional-experiments="parallel_replace_job_id=OLD_JOB_ID".
  • Se você especificar um novo nome de job e usar a flag --update, o Dataflow vai procurar um job com o novo nome, o que causa um erro.

Opcional: desativar o cancelamento automático

O cancelamento automático é ativado por padrão quando você especifica a opção parallel_replace_job_max_stop_duration. Para desativar o cancelamento automático, defina a opção parallel_replace_job_cancel_on_drain_timeout como false.

--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=false"

Se você desativar o cancelamento automático e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.

Opcional: upsert (criar ou atualizar job)

Para ativar o comportamento de upsert (criar ou atualizar job):

--additional-experiments="create_or_update_job"

Terraform

additional_experiments = [
  "parallel_replace_job_max_stop_duration=DURATION",
  "parallel_replace_job_cancel_on_drain_timeout=true",
  "update_strategy_parallel_job_update",
  "parallel_replace_job_preallocate_compute_resources=true",
  "create_or_update_job"
]

Config Connector

metadata:
  annotations:
    # Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
    # https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
    alpha.cnrm.cloud.google.com/reconciler: direct
    # Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
    cnrm.cloud.google.com/on-delete: drain
    # Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
    cnrm.cloud.google.com/deletion-policy: abandon
spec:
  ...
  additionalExperiments:
    - "parallel_replace_job_max_stop_duration=DURATION"
    - "parallel_replace_job_cancel_on_drain_timeout=true"
    - "update_strategy_parallel_job_update"
    - "parallel_replace_job_preallocate_compute_resources=true"
    - "create_or_update_job"

Substitua as seguintes variáveis:

  • É necessário fornecer parallel_replace_job_name ou parallel_replace_job_id para identificar o job a ser substituído:
    • OLD_JOB_NAME: o nome do job a ser substituído.
    • OLD_JOB_ID: o ID do job a ser substituído.
  • Você precisa fornecer o valor parallel_replace_job_max_stop_duration para ativar a interrupção e substituição automáticas:
    • DURATION: o tempo máximo que o novo job aguarda a conclusão do job antigo. A duração precisa ser formatada como uma string que termina em s, m ou h (por exemplo, 30m, 1h).
  • Não defina a opção parallel_replace_job_min_parallel_pipelines_duration ao usar esse fluxo de trabalho. Definir essa opção aciona o fluxo de trabalho de atualizações automáticas de pipeline paralelo.
  • Opcional: configure a opção parallel_replace_job_cancel_on_drain_timeout. Como o cancelamento automático é ativado (o padrão é true) por padrão quando a opção parallel_replace_job_max_stop_duration é definida, não é necessário configurar explicitamente essa opção para ativá-la.
    • Para manter o comportamento padrão, omita essa opção ou defina como true.
    • Para desativar o cancelamento automático, defina essa opção como false. Se você definir essa opção como false e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.
  • Opcional: defina a configuração parallel_replace_job_preallocate_compute_resources:
    • Especifica se os workers são provisionados com antecedência para o novo job enquanto o antigo é esvaziado. Valores: true (padrão) ou false. Para o Terraform e o Config Connector, é recomendável definir essa opção como true para evitar tempos limite de provisionamento de recursos. Quando parallel_replace_job_preallocate_compute_resources é definido como false, o novo job permanece em um estado pendente até que o antigo seja esvaziado.

Reprocessamento de mensagens usando o Pub/Sub Snapshot e Seek

Em algumas situações, depois de substituir ou cancelar um pipeline drenado, talvez seja necessário reprocessar as mensagens do Pub/Sub entregues anteriormente. Por exemplo, talvez seja necessário usar a lógica de negócios atualizada para reprocessar dados. O Pub/Sub Seek é um recurso que permite reproduzir mensagens em um snapshot do Pub/Sub. Use o Pub/Sub Seek com o Dataflow para reprocessar mensagens a partir do momento em que o snapshot da assinatura é criado.

Durante o desenvolvimento e o teste, também é possível usar o Pub/Sub Seek para reproduzir as mensagens conhecidas repetidamente para verificar a saída do pipeline. Ao usar a busca do Pub/Sub Seek, não pesquise um snapshot de assinatura quando a assinatura estiver sendo consumida por um pipeline. Se você fizer isso, ele poderá invalidar a lógica de marca d'água do Dataflow e afetar exatamente o processamento de mensagens do Pub/Sub.

Veja a seguir um fluxo de trabalho recomendado da CLI gcloud para usar o Pub/Sub Seek com pipelines do Dataflow em uma janela de terminal:

  1. Para criar um snapshot da assinatura, use o comando gcloud pubsub snapshots create:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. Para drenar ou cancelar o pipeline, use o comando gcloud dataflow jobs drain ou gcloud dataflow jobs cancel:

    gcloud dataflow jobs drain JOB_ID
    

    ou

    gcloud dataflow jobs cancel JOB_ID
    
  3. Para buscar o snapshot, use o comando gcloud pubsub subscriptions seek:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. Implante um novo pipeline que consuma a assinatura.

Executar pipelines paralelos

Se for necessário evitar a interrupção do pipeline de streaming durante uma atualização, você pode executar pipelines paralelos. Essa abordagem permite iniciar um novo job de streaming com o código de pipeline atualizado e executá-lo em paralelo com o job atual. Você pode usar o fluxo de trabalho de implantação de atualização de pipeline paralelo automatizado do Dataflow ou realizar as etapas manualmente.

Visão geral dos pipelines paralelos

Ao criar o novo pipeline, use a mesma estratégia de janelamento usada para o pipeline atual. Para o fluxo de trabalho manual, deixe o pipeline atual continuar em execução até que a marca d'água exceda o carimbo de data/hora da primeira janela completa processada pelo pipeline atualizado. Em seguida, esvazie ou cancele o pipeline existente. Se você estiver usando o fluxo de trabalho automatizado, esse trabalho será feito para você. O pipeline atualizado continua em execução no lugar e assume o processamento de maneira efetiva por conta própria.

O diagrama a seguir ilustra esse processo:

O pipeline B se sobrepõe ao pipeline B por uma janela de cinco minutos.

No diagrama, o pipeline B é o job atualizado que assume o pipeline A. O valor t é o carimbo de data/hora da janela completa mais antiga processada pelo pipeline B. O valor w é a marca d'água do pipeline A. Para simplificar, uma marca d'água perfeita é presumida sem dados atrasados. O processamento e o tempo decorrido são representados no eixo horizontal. Os dois pipelines usam janelas fixas de cinco minutos. Os resultados são acionados depois que a marca d'água passa o fim de cada janela.

Como a saída simultânea ocorre durante o período em que os dois pipelines se sobrepõem, configure-os para gravar resultados em destinos diferentes. Os sistemas downstream podem usar uma abstração sobre os dois coletores de destino, como uma visualização do banco de dados, para consultar os resultados combinados. Esses sistemas também podem usar a abstração para eliminar a duplicação dos resultados do período sobreposto. Para mais informações, consulte Como lidar com saídas duplicadas.

Limitações

O uso de atualizações de pipeline paralelas automáticas ou manuais tem as seguintes limitações:

  • Somente atualizações automáticas: o novo job paralelo precisa ser um job do Streaming Engine.
  • Não é permitido ter jobs simultâneos com o mesmo nome. No entanto, ao realizar uma atualização automática de parada e substituição ou de pipeline paralelo usando o mesmo nome de job, é possível reutilizar o nome. Nesse caso, o novo job precisa começar pelo menos dois minutos após o início do job anterior. Essa restrição evita várias atualizações paralelas de novas tentativas repetidas da biblioteca de cliente ou chamadas de procedimento remoto desatualizadas.
  • Executar dois pipelines em paralelo na mesma entrada pode levar a dados duplicados, agregações parciais e possíveis problemas de ordenação quando os dados são inseridos no coletor. O sistema downstream precisa ser projetado para antecipar e gerenciar esses resultados.
  • Ao ler de uma origem do Pub/Sub, o uso da mesma assinatura para vários pipelines não é recomendado e pode levar a problemas de correção. No entanto, em alguns casos de uso, como pipelines de extração, transformação e carregamento (ETL), o uso da mesma assinatura em dois pipelines pode reduzir a duplicação. Problemas com o escalonamento automático provavelmente vão ocorrer sempre que você fornecer um valor diferente de zero para a duração da sobreposição. Isso pode ser atenuado usando o recurso de atualização de jobs em andamento. Para mais informações, consulte Ajustar o escalonamento automático para seus pipelines de streaming do Pub/Sub.
  • Para o Apache Kafka, é possível minimizar duplicatas ativando a confirmação de deslocamento no Kafka. Para ativar a confirmação de deslocamento no Kafka, consulte Confirmação de volta ao Kafka.

Atualizações automáticas de pipeline paralelo

O Dataflow oferece suporte à API para iniciar um job de substituição paralela. Essa API de estilo declarativo abstrai o trabalho manual de execução de etapas procedimentais. Você declara o job que quer atualizar, e um novo job é executado em paralelo com o antigo. Depois que o novo job é executado pelo período especificado, o job antigo é esgotado. Esse recurso elimina pausas no processamento durante as atualizações e reduz o esforço operacional necessário para atualizar pipelines incompatíveis.

Esse método de atualização é mais adequado para pipelines que podem tolerar alguns duplicados ou agregações parciais e não exigem uma ordenação estrita ao inserir dados. Ele é adequado para pipelines de ETL, bem como pipelines que usam o modo de streaming pelo menos uma vez e a transformação Redistribute com a opção "permitir duplicados" definida como true.

Opções de serviço de pipeline paralelo automatizado

Use as seguintes opções de serviço para atualizações automatizadas de pipeline paralelo:

Opção de serviço Opcional ou obrigatório Descrição Dependências ou exclusões
update_strategy_parallel_job_update Obrigatório (opção 1: atualizar usando o mesmo nome de job) Comando para realizar uma atualização paralela, que executa os dois pipelines simultaneamente para minimizar o tempo de inatividade, ao atualizar com o mesmo nome de job. Precisa ser definido com a flag --update e parallel_replace_job_min_parallel_pipelines_duration.
update_strategy_in_place_update Opcional Alternativa à atualização paralela. Realiza uma atualização padrão no local de um job. Precisa ser definido com a flag --update.

Mutuamente exclusivo com update_strategy_parallel_job_update.

Quando essa opção é definida, outras opções relacionadas a jobs paralelos são ignoradas.

parallel_replace_job_min_parallel_pipelines_duration Obrigatório Especifica a duração mínima em que os dois pipelines são executados simultaneamente. Depois desse período, um sinal de drenagem é enviado ao job antigo. Os valores aceitáveis variam de 0s (recomendado para sobreposição zero) a 744h (31 dias). Precisa ser pareado com uma maneira de segmentar o job antigo. Uma das seguintes opções:
  • Opção 1: usar o mesmo nome de job: update_strategy_parallel_job_update ou
  • Opção 2: usar um nome de job diferente: parallel_replace_job_name (ou, como alternativa, parallel_replace_job_id para identificar o job)
parallel_replace_job_name ou parallel_replace_job_id (escolha uma opção) Obrigatório (opção 2: atualizar usando um nome de job diferente) Identifica o job antigo por nome ou ID para ser substituído durante uma atualização de nome diferente. Exige que parallel_replace_job_min_parallel_pipelines_duration seja definido.

Não use a flag --update ou parallel_replace_job_id com essa opção.

parallel_replace_job_max_stop_duration Opcional A duração máxima permitida para o job antigo antes que o cancelamento automático seja acionado. Por exemplo, 30m ou 1h. Requer a definição de um fluxo de trabalho de atualização paralela (opção 1 ou 2).
parallel_replace_job_cancel_on_drain_timeout Opcional

O padrão é true se uma duração máxima de parada for definida.

Opção booleana que especifica se o job antigo precisa ser cancelado se a duração do despejo exceder parallel_replace_job_max_stop_duration. Usado com parallel_replace_job_max_stop_duration.

Defina como false para desativar o cancelamento automático. Se você desativar o cancelamento automático e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.

Enviar uma solicitação de atualização de pipeline paralela automatizada

Para usar o fluxo de trabalho automatizado, inicie um novo job de streaming. É possível atualizar um job usando o mesmo nome ou um nome diferente.

Java

Opção 1: atualizar usando o mesmo nome de job

--update \
--dataflowServiceOptions="update_strategy_parallel_job_update" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Para fazer uma atualização paralela usando o mesmo nome, use a flag --update e a opção update_strategy_parallel_job_update.
  • Para fazer uma atualização no local sem remover opções relacionadas a jobs paralelos, use update_strategy_in_place_update em vez de update_strategy_parallel_job_update.

Opção 2: atualizar usando um nome de job diferente

--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Para especificar o job antigo por ID em vez de nome, use --dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID".
  • Se você especificar um novo nome de job e usar a flag --update, o Dataflow vai procurar um job com o novo nome, o que causa um erro.

Opcional: configure o tempo limite de diminuição e o cancelamento automático

É possível anexar as seguintes opções a qualquer uma das configurações para definir um tempo limite de drenagem e cancelar automaticamente o job antigo se ele ficar preso.

--dataflowServiceOptions="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflowServiceOptions="parallel_replace_job_cancel_on_drain_timeout=true"

Se você desativar o cancelamento automático e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.

Python

Opção 1: atualizar usando o mesmo nome de job

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Para fazer uma atualização paralela usando o mesmo nome, use a flag --update e a opção update_strategy_parallel_job_update.
  • Para fazer uma atualização no local sem remover opções relacionadas a jobs paralelos, use update_strategy_in_place_update em vez de update_strategy_parallel_job_update.

Opção 2: atualizar usando um nome de job diferente

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Para especificar o job antigo por ID em vez de nome, use --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Se você especificar um novo nome de job e usar a flag --update, o Dataflow vai procurar um job com o novo nome, o que causa um erro.

Opcional: configure o tempo limite de diminuição e o cancelamento automático

É possível anexar as seguintes opções a qualquer uma das configurações para definir um tempo limite de drenagem e cancelar automaticamente o job antigo se ele ficar preso.

--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"

Se você desativar o cancelamento automático e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.

Go

Opção 1: atualizar usando o mesmo nome de job

--update \
--dataflow_service_options="update_strategy_parallel_job_update" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Para fazer uma atualização paralela usando o mesmo nome, use a flag --update e a opção update_strategy_parallel_job_update.
  • Para fazer uma atualização no local sem remover opções relacionadas a jobs paralelos, use update_strategy_in_place_update em vez de update_strategy_parallel_job_update.

Opção 2: atualizar usando um nome de job diferente

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Para especificar o job antigo por ID em vez de nome, use --dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID".
  • Se você especificar um novo nome de job e usar a flag --update, o Dataflow vai procurar um job com o novo nome, o que causa um erro.

Opcional: configure o tempo limite de diminuição e o cancelamento automático

É possível anexar as seguintes opções a qualquer uma das configurações para definir um tempo limite de drenagem e cancelar automaticamente o job antigo se ele ficar preso.

--dataflow_service_options="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--dataflow_service_options="parallel_replace_job_cancel_on_drain_timeout=true"

Se você desativar o cancelamento automático e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.

gcloud

Opção 1: atualizar usando o mesmo nome de job

--update \
--additional-experiments="update_strategy_parallel_job_update" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Para fazer uma atualização paralela usando o mesmo nome, use a flag --update e a opção update_strategy_parallel_job_update.
  • Para fazer uma atualização no local sem remover opções relacionadas a jobs paralelos, use update_strategy_in_place_update em vez de update_strategy_parallel_job_update.

Opção 2: atualizar usando um nome de job diferente

--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
  • Para especificar o job antigo por ID em vez de nome, use --additional-experiments="parallel_replace_job_id=OLD_JOB_ID".
  • Se você especificar um novo nome de job e usar a flag --update, o Dataflow vai procurar um job com o novo nome, o que vai causar um erro.

Opcional: configure o tempo limite de diminuição e o cancelamento automático

É possível anexar as seguintes opções a qualquer uma das configurações para definir um tempo limite de drenagem e cancelar automaticamente o job antigo se ele ficar preso.

--additional-experiments="parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION" \
--additional-experiments="parallel_replace_job_cancel_on_drain_timeout=true"

Se você desativar o cancelamento automático e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.

Opcional: upsert (criar ou atualizar job)

Para ativar o comportamento de upsert (criar ou atualizar job):

--additional-experiments="create_or_update_job"

Terraform

additional_experiments = [
  "parallel_replace_job_min_parallel_pipelines_duration=DURATION",
  "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION",
  "update_strategy_parallel_job_update",
  "create_or_update_job"
]

Config Connector

metadata:
  annotations:
    # Force to use only direct controller. Multiple controllers can cause a Dataflow job to enter into a continuous update loop.
    # https://docs.cloud.google.com/config-connector/docs/concepts/controller-types#underlying-controller-types
    alpha.cnrm.cloud.google.com/reconciler: direct
    # Optional but recommended: Dataflow batch jobs do not support the drain operation. But for Dataflow streaming jobs, prefer "drain" over "cancel" as an on-delete option.
    cnrm.cloud.google.com/on-delete: drain
    # Optional but recommended: Set deletion-policy to "abandon" to avoid accidental deletion, this will ignore the on-delete option.
    cnrm.cloud.google.com/deletion-policy: abandon
spec:
  ...
  additionalExperiments:
    - "parallel_replace_job_min_parallel_pipelines_duration=DURATION"
    - "parallel_replace_job_max_stop_duration=DRAIN_TIMEOUT_DURATION"
    - "update_strategy_parallel_job_update"
    - "create_or_update_job"

Substitua as seguintes variáveis:

  • Se você estiver atualizando usando um nome de job diferente (opção 2), forneça parallel_replace_job_name ou parallel_replace_job_id para identificar o job a ser substituído. A atualização usando um nome de job diferente não é compatível com o Terraform ou o Config Connector.
    • OLD_JOB_NAME: o nome do job a ser substituído.
    • OLD_JOB_ID: o ID do job a ser substituído.
  • DURATION: o período mínimo em que os dois pipelines são executados em paralelo como um número inteiro ou de ponto flutuante. Uma duração de 0s é recomendada para sobreposição zero. Depois desse período, o job antigo recebe um sinal de drenagem.

    A duração precisa estar entre 0 segundo (0s) e 31 dias (744h). Use s, m e h para especificar segundos, minutos e horas. Por exemplo, 10m é 10 minutos.

  • DRAIN_TIMEOUT_DURATION: opcional. O tempo máximo que o job antigo tem para ser esgotado antes que o cancelamento automático seja acionado. A duração precisa ser formatada como uma string que termina em s, m ou h (por exemplo, 30m, 1h).

  • parallel_replace_job_cancel_on_drain_timeout: opcional. Se o job anterior precisa ser cancelado caso não termine a drenagem antes da duração máxima de parada. O padrão é true se uma duração de tempo limite de drenagem for fornecida. Para desativar o cancelamento automático, defina essa opção como false. Se você definir essa opção como false e o job antigo ficar preso no estado de drenagem, os jobs antigo e novo vão continuar sendo executados em paralelo.

Ao iniciar o novo job, o Dataflow aguarda o provisionamento de todos os workers antes de começar a processar os dados. Para monitorar o status da implantação, verifique os registros de job do Dataflow.

Executar pipelines paralelos manualmente

Para cenários mais complexos ou quando você precisa de mais controle sobre o processo de atualização, é possível executar pipelines paralelos manualmente. Deixe o pipeline atual continuar em execução até que a marca d'água exceda o carimbo de data/hora da primeira janela completa processada pelo pipeline atualizado. Em seguida, esvazie ou cancele o pipeline atual.

Processar saída duplicada

O exemplo a seguir descreve uma abordagem para lidar com saídas duplicadas. Os dois pipelines gravam a saída em destinos diferentes, usam sistemas downstream para consultar resultados e eliminam a duplicação de resultados do período sobreposto. Este exemplo usa um pipeline que lê dados de entrada do Pub/Sub, realiza um pouco do processamento e grava os resultados no BigQuery.

  1. No estado inicial, o pipeline de streaming atual (Pipeline A) está em execução e lendo mensagens de um tópico do Pub/Sub (Tópico) usando uma assinatura. (Assinatura A). Os resultados são gravados em uma tabela do BigQuery (tabela A). Os resultados são consumidos por uma visualização do BigQuery, que atua como uma fachada para mascarar alterações de tabela subjacentes. Esse processo é uma aplicação de um método de design chamado padrão de fachada. O diagrama a seguir mostra o estado inicial.

    Um pipeline com uma assinatura e gravação em uma única tabela do BigQuery.

  2. Você cria uma nova assinatura (assinatura B) para o pipeline atualizado. Implante o pipeline atualizado (pipeline B ), que lê o tópico do Pub/Sub (tópico) usando a assinatura B e grava em uma tabela separada do BigQuery (tabela B). Veja esse fluxo no diagrama a seguir.

    Dois pipelines, cada um com uma assinatura. Cada pipeline grava em uma tabela separada do BigQuery. Uma visualização de fachadas lê as duas tabelas.

    Neste ponto, o pipeline A e o pipeline B estão sendo executados em paralelo e gravando resultados em tabelas separadas. Registre o tempo t como o carimbo de data/hora da janela completa mais antiga processada pelo pipeline B.

  3. Quando a marca d'água do pipeline A exceder o tempo t, drene o pipeline A. Quando você drena o pipeline, todas as janelas abertas são fechadas e o processamento dos dados em trânsito é concluído. Se o pipeline contiver janelas e aquelas completas forem importantes (supondo que não haja dados atrasados), antes de drenar o pipeline A, deixe que os dois pipelines sejam executados até que você tenha janelas sobrepostas completas. Você interrompe o job de streaming do pipeline A depois que todos os dados em trânsito são processados e gravados na tabela A. O diagrama a seguir mostra esse estágio.

    O pipeline A é drenado e não lê mais a assinatura A, além de não enviar mais dados para a tabela A após a conclusão da drenagem. Todo o processamento é feito pelo segundo pipeline.

  4. Neste momento, apenas o pipeline B está em execução. É possível consultar em uma visualização do BigQuery (Visualização de Fachada), que atua como uma fachada para a tabela A e a tabela B. Para linhas com o mesmo carimbo de data/hora em ambas as tabelas, configure a visualização para retornar as linhas da Tabela B ou, se as linhas não existirem na Tabela B , volte para a Tabela A , O diagrama a seguir mostra a visualização (Visualização de Fachada)lendo na Tabela A e na Tabela B.

    O pipeline A desapareceu e apenas o pipeline B é executado.

    Nesse momento, você pode excluir a assinatura A.

Quando problemas são detectados com uma nova implantação de pipeline, ter pipelines paralelos pode simplificar a reversão. No exemplo anterior, convém manter o pipeline A em execução enquanto você monitora o pipeline B para a operação correta. Se ocorrer algum problema com o pipeline B, você pode reverter para o pipeline A.

Gerenciar mutações de esquema

Os sistemas de gerenciamento de dados geralmente precisam acomodar mutações de esquema ao longo do tempo, às vezes devido a mudanças nos requisitos de negócios e outras vezes por motivos técnicos. A aplicação de atualizações de esquema normalmente requer planejamento e execução cuidadosos para evitar interrupções nos sistemas de informações comerciais.

Considere um pipeline simples que leia mensagens com payloads JSON de um tópico do Pub/Sub. O pipeline converte cada mensagem em uma instância TableRow e depois grava as linhas em uma tabela do BigQuery. O esquema da tabela de saída é semelhante às mensagens processadas pelo pipeline. No diagrama a seguir, o esquema é chamado de esquema A.

Pipeline que lê uma assinatura e grava em uma tabela de saída do BigQuery usando o esquema A.

Com o tempo, o esquema da mensagem pode passar por mudanças não triviais. Por exemplo, os campos são adicionados, removidos ou substituídos. O esquema A evolui para um novo esquema. Na discussão a seguir, o novo esquema é chamado de esquema B. Nesse caso, o pipeline A precisa ser atualizado, e o esquema da tabela de saída precisa ser compatível com o esquema B.

Para a tabela de saída, é possível executar algumas mutações de esquema sem o conteúdo central. Por exemplo, é possível adicionar novos campos ou relaxar modos de colunas, como alterar REQUIRED para NULLABLE, sem inatividade. Essas mutações geralmente não afetam as consultas atuais. No entanto, as mutações de esquema que modificam ou removem campos de esquema atuais interrompem as consultas ou resultam em outras interrupções. A abordagem a seguir acomoda alterações sem exigir tempo de inatividade.

Separe os dados gravados pelo pipeline em uma tabela principal e em uma ou mais tabelas de preparo. A tabela principal armazena dados históricos gravados pelo pipeline. As tabelas de preparo armazenam a última saída do pipeline. É possível definir uma visualização de fachada do BigQuery sobre as tabelas principais e de preparo, o que permite aos consumidores consultar dados históricos e atualizados.

No diagrama a seguir, o fluxo de pipeline anterior é revisado para incluir uma tabela de preparo (tabela de preparo A), uma tabela principal e uma visualização de fachada.

Pipeline que lê uma assinatura e grava em uma tabela de preparo do BigQuery. Uma segunda tabela (principal) tem saída de uma versão anterior do esquema. Uma visualização de fachada faz a leitura tanto da tabela de preparo quanto da tabela principal.

No fluxo revisado, o pipeline A processa mensagens que usam o esquema A e grava a saída na tabela de preparo A, que tem um esquema compatível. A tabela principal contém dados históricos gravados por versões anteriores do pipeline, além de resultados que são periodicamente mesclados na tabela de preparo. Os consumidores podem usar a visualização de fachada para consultar dados atualizados, incluindo dados históricos e em tempo real.

Quando o esquema da mensagem é alterado do esquema A para esquema B, é possível atualizar o código do pipeline para que seja compatível com mensagens que usam o esquema B. O pipeline atual precisa ser atualizado com a nova implementação. Ao executar pipelines paralelos, você garante que o processamento de dados de streaming continue sem interrupções. Encerrar e substituir pipelines resulta em uma interrupção no processamento, porque nenhum pipeline está em execução por um período.

O pipeline atualizado grava em uma tabela de preparo extra (tabela de preparo B) que usa o esquema B. É possível usar um fluxo de trabalho orquestrado para criar a nova tabela de preparo antes de atualizar o pipeline. Atualize a visualização de fachada para incluir os resultados da nova tabela de preparo, possivelmente usando uma etapa relacionada ao fluxo de trabalho.

No diagrama a seguir, mostramos o fluxo atualizado que mostra a tabela de preparo B com o esquema B e como a visualização de fachada foi atualizada para incluir conteúdo da tabela principal e das duas tabelas de preparo.

O pipeline agora usa o Esquema B e grava na Tabela de preparo B. Uma visualização de fachada lendo da tabela principal, da tabela de preparo A e da tabela de preparo B.

Como um processo separado da atualização do pipeline, é possível mesclar as tabelas de preparo na tabela principal, periodicamente ou conforme necessário. O diagrama a seguir mostra como a tabela de preparo A é mesclada na tabela principal.

A tabela de preparo A está mesclada com a tabela principal. A visualização de fachada lê da tabela de preparação B e da tabela principal.

A seguir