Características de desempenho dos pipelines do Pub/Sub para o BigQuery

Esta página descreve as características de desempenho dos jobs de streaming do Dataflow que leem do Pub/Sub e gravam no BigQuery. Ela apresenta os resultados de testes de comparativo para dois tipos de pipeline de streaming:

  • Somente mapa (transformação por mensagem): pipelines que realizam transformações por mensagem, sem rastrear o estado ou agrupar elementos no stream. Exemplos incluem ETL, validação de campo e mapeamento de esquema.

  • Agregação em janelas (GroupByKey): pipelines que realizam operações com estado e agrupam dados com base em uma chave e uma janela de tempo. Exemplos incluem contagem de eventos, cálculo de somas e coleta de registros para uma sessão de usuário.

A maioria das cargas de trabalho para integração de dados de streaming se enquadra nessas duas categorias. Se o pipeline seguir um padrão semelhante, use esses comparativos para avaliar o job do Dataflow em relação a uma configuração de referência com bom desempenho.

Metodologia de teste

Os comparativos foram realizados usando os seguintes recursos:

  • Um tópico do Pub/Sub pré-provisionado com uma carga de entrada constante. As mensagens foram geradas usando o modelo do gerador de dados de streaming.

    • Taxa de mensagens: aproximadamente 1.000.000 de mensagens por segundo
    • Carga de entrada: 1 GiB/s
    • Formato da mensagem: texto JSON gerado aleatoriamente com um esquema fixo
    • Tamanho da mensagem: aproximadamente 1 KiB por mensagem
  • Uma tabela padrão do BigQuery.

  • Pipelines de streaming do Dataflow com base no modelo do Pub/Sub para BigQuery. Esses pipelines realizam a análise e o mapeamento de esquema mínimos necessários. Nenhuma função definida pelo usuário (UDF) personalizada foi usada.

Depois que o escalonamento horizontal se estabilizou e o pipeline atingiu o estado estável, os pipelines foram executados por aproximadamente um dia. Depois disso, os resultados foram coletados e analisados.

Pipelines do Dataflow

Duas variantes de pipeline foram testadas:

Pipeline somente de mapa. Esse pipeline realiza um mapeamento e uma conversão simples de mensagens JSON. Para esse teste, o modelo do Pub/Sub para BigQuery foi usado sem modificação.

  • Semântica: o pipeline foi testado usando o modo "exatamente uma vez" e o modo "pelo menos uma vez". O processamento "pelo menos uma vez" oferece melhor capacidade de processamento. No entanto, ele só deve ser usado quando registros duplicados forem aceitáveis ou quando o coletor downstream processar a remoção de duplicação.

Pipeline de agregação em janelas. Esse pipeline agrupa mensagens por uma chave específica em janelas de tamanho fixo e grava os registros agregados no BigQuery. Para esse teste, um pipeline personalizado do Apache Beam com base no modelo do Pub/Sub para BigQuery foi usado.

  • Lógica de agregação: para cada janela fixa e não sobreposta de 1 minuto, as mensagens com a mesma chave foram coletadas e gravadas como um único registro agregado no BigQuery. Esse tipo de agregação é usado com frequência no processamento de registros para combinar eventos relacionados, como a atividade de um usuário, em um único registro para análise downstream.

  • Paralelismo de chaves: o comparativo usou 1.000.000 de chaves distribuídas uniformemente.

  • Semântica: o pipeline foi testado usando o modo "exatamente uma vez". As agregações exigem semântica "exatamente uma vez" para garantir a correção e evitar a contagem dupla em um grupo e janela.

Configuração do job

A tabela a seguir mostra como os jobs do Dataflow foram configurados.

Configuração Somente mapa, exatamente uma vez Somente mapa, pelo menos uma vez Agregação em janelas, exatamente uma vez
Tipo de máquina do worker n1-standard-2 n1-standard-2 n1-standard-2
vCPUs da máquina do worker 2 2 2
RAM da máquina do worker 7,5 GiB 7,5 GiB 7,5 GiB
Persistent Disk da máquina do worker Disco permanente padrão (HDD), 30 GB Disco permanente padrão (HDD), 30 GB Disco permanente padrão (HDD), 30 GB
Workers iniciais 70 30 180
Número máximo de workers 100 100 250
Streaming Engine Sim Sim Sim
Escalonamento automático horizontal Sim Sim Sim
Modelo de faturamento Faturamento com base em recursos Faturamento com base em recursos Faturamento com base em recursos
A API Storage Write está ativada? Sim Sim Sim
Fluxos da API Storage Write 200 Não relevante 500
Frequência de acionamento da API Storage Write 5 segundos Não relevante 5 segundos

A API BigQuery Storage Write é recomendada para pipelines de streaming. Ao usar o modo "exatamente uma vez" com a API Storage Write, é possível ajustar as seguintes configurações:

  • Número de fluxos de gravação. Para garantir paralelismo de chaves suficiente no estágio de gravação, defina o número de fluxos da API Storage Write como um valor maior que o número de CPUs do worker, mantendo um nível razoável de capacidade de processamento do fluxo de gravação do BigQuery .

  • Frequência de acionamento. Um valor de segundo de um único dígito é adequado para pipelines de alta capacidade de processamento.

Para mais informações, consulte Gravar do Dataflow para o BigQuery.

Resultados da comparação

Esta seção descreve os resultados dos testes de comparativo.

Capacidade de processamento e uso de recursos

A tabela a seguir mostra os resultados dos testes de capacidade de processamento e uso de recursos do pipeline.

Resultado Somente mapa, exatamente uma vez Somente mapa, pelo menos uma vez Agregação em janelas, exatamente uma vez
Capacidade de processamento de entrada por worker Média: 17 MBps, n=3 Média: 21 MBps, n=3 Média: 6 MBps, n=3
Uso médio da CPU em todos os workers Média: 65%, n=3 Média: 69%, n=3 Média: 80%, n=3
Número de nós de trabalho Média: 57, n=3 Média: 48, n=3 Média: 169, n=3
Unidades de computação do Streaming Engine por hora Média: 125, n=3 Média: 46, n=3 Média: 354, n=3

O algoritmo de escalonamento automático pode afetar o nível de uso da CPU de destino. Para atingir um uso maior ou menor da CPU de destino, defina o intervalo de escalonamento automático ou a dica de uso do worker. Metas de uso mais altas podem levar a custos mais baixos, mas também a uma latência de cauda pior, especialmente para cargas variáveis.

Para um pipeline de agregação em janelas, o tipo de agregação, o tamanho da janela e o paralelismo de chaves podem ter um grande impacto no uso de recursos.

Latência

A tabela a seguir mostra os resultados do comparativo de latência do pipeline.

Latência total de ponta a ponta do estágio Somente mapa, exatamente uma vez Somente mapa, pelo menos uma vez Agregação em janelas, exatamente uma vez
P50 Média: 800 ms, n=3 Média: 160 ms, n=3 Média: 3.400 ms, n=3
P95 Média: 2.000 ms, n=3 Média: 250 ms, n=3 Média: 13.000 ms, n=3
P99 Média: 2.800 ms, n=3 Média: 410 ms, n=3 Média: 25.000 ms, n=3

Os testes mediram a latência de ponta a ponta por estágio (a job/streaming_engine/stage_end_to_end_latencies métrica) em três execuções de teste de longa duração. Essa métrica mede quanto tempo o Streaming Engine gasta em cada estágio do pipeline. Ela abrange todas as etapas internas do pipeline, como:

  • Embaralhamento e enfileiramento de mensagens para processamento
  • O tempo de processamento real; por exemplo, converter mensagens em objetos de linha
  • Gravação de estado persistente, bem como tempo gasto no enfileiramento para gravar o estado persistente

Outra métrica de latência é a atualização de dados. No entanto, a atualização de dados é afetada por fatores como janelas definidas pelo usuário e atrasos upstream na origem. A latência do sistema fornece uma linha de base mais objetiva para a eficiência e integridade do processamento interno de um pipeline sob carga.

Os dados foram medidos em aproximadamente um dia por execução, com os períodos de inicialização descartados para refletir o desempenho estável e constante. Os resultados mostram dois fatores que introduzem latência adicional:

  • Modo "Exatamente uma vez". Para conseguir semântica "exatamente uma vez", o embaralhamento determinístico e as pesquisas de estado persistente são necessários para a remoção de duplicação. O modo "Pelo menos uma vez" é executado significativamente mais rápido, porque ignora essas etapas.

  • Agregação em janelas. As mensagens precisam ser totalmente embaralhadas, armazenadas em buffer e gravadas no estado persistente antes do fechamento da janela, o que aumenta a latência de ponta a ponta.

Os comparativos mostrados aqui representam uma linha de base. A latência é altamente sensível à complexidade do pipeline. UDFs personalizadas, transformações adicionais e lógica de janelas complexa podem aumentar a latência. Agregações simples e altamente redutoras, como soma e contagem, tendem a resultar em latência menor do que operações com estado pesado, como coletar elementos em uma lista.

Estimar custos

É possível estimar o custo de linha de base do seu próprio pipeline comparável com o faturamento com base em recursos usando a Google Cloud calculadora de preços, da seguinte maneira:

  1. Abra a calculadora de preços.
  2. Clique em Adicionar à estimativa.
  3. Selecione Dataflow.
  4. Em Tipo de serviço, selecione "Dataflow Classic".
  5. Selecione Configurações avançadas para mostrar o conjunto completo de opções.
  6. Escolha o local em que o job é executado.
  7. Em Tipo de serviço, selecione "Streaming".
  8. Selecione Ativar o Streaming Engine.
  9. Insira informações sobre as horas de execução do job, nós de trabalho, máquinas de worker e armazenamento em Persistent Disk.
  10. Insira o número estimado de unidades de computação do Streaming Engine.

O uso de recursos e o custo são dimensionados de forma aproximadamente linear com a capacidade de processamento de entrada, embora, para jobs pequenos com apenas alguns workers, o custo total seja dominado por custos fixos. Como ponto de partida, é possível extrapolar o número de nós de trabalho e o consumo de recursos dos resultados do comparativo.

Por exemplo, suponha que você execute um pipeline somente de mapa no modo "exatamente uma vez", com uma taxa de dados de entrada de 100 MiB/s. Com base nos resultados do comparativo de um pipeline de 1 GiB/s, é possível estimar os requisitos de recursos da seguinte maneira:

  • Fator de escalonamento: (100 MiB/s) / (1 GiB/s) = 0,1
  • Nós de trabalho projetados: 57 workers × 0,1 = 5,7 workers
  • Número projetado de unidades de computação do Streaming Engine por hora: 125 × 0,1 = 12,5 unidades por hora

Esse valor só deve ser usado como uma estimativa inicial. A capacidade de processamento e o custo reais podem variar significativamente, com base em fatores como tipo de máquina, distribuição de tamanho de mensagem, código do usuário, tipo de agregação, paralelismo de chaves e tamanho da janela. Para mais informações, consulte Práticas recomendadas para otimização de custos do Dataflow.

Executar um pipeline de teste

Esta seção mostra os gcloud dataflow flex-template run comandos que foram usados para executar o pipeline somente de mapa.

Modo "Exatamente uma vez"

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5

Modo "Pelo menos uma vez"

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 30 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
  --additional-experiments streaming_mode_at_least_once

Substitua:

  • JOB_ID: o ID do job do Dataflow
  • PROJECT_ID: o ID do projeto
  • SUBSCRIPTION_NAME: o nome da assinatura do Pub/Sub
  • DATASET: o nome do conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery

Gerar dados de teste

Para gerar dados de teste, use o comando a seguir para executar o modelo do gerador de dados de streaming:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION

Substitua:

  • JOB_ID: o ID do job do Dataflow
  • PROJECT_ID: o ID do projeto
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • SCHEMA_LOCATION: o caminho para um arquivo de esquema no Cloud Storage

O modelo do gerador de dados de streaming usa um arquivo do gerador de dados JSON para definir o esquema de mensagens. Os testes de comparativo usaram um esquema de mensagens semelhante ao seguinte:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

Próximas etapas