Características de desempenho dos pipelines do Pub/Sub para o BigQuery

Esta página descreve as caraterísticas de desempenho dos trabalhos de streaming do Dataflow que leem a partir do Pub/Sub e escrevem no BigQuery. Fornece resultados de testes de referência para dois tipos de pipeline de streaming:

  • Apenas mapeamento (transformação por mensagem): pipelines que realizam transformações por mensagem, sem monitorizar o estado nem agrupar elementos na stream. Os exemplos incluem ETL, validação de campos e mapeamento de esquemas.

  • Agregação em janelas (GroupByKey): pipelines que executam operações com estado e agrupam dados com base numa chave e num intervalo de tempo. Os exemplos incluem a contagem de eventos, o cálculo de somas e a recolha de registos para uma sessão do utilizador.

A maioria dos fluxos de trabalho para a integração de dados de streaming enquadra-se nestas duas categorias. Se o seu pipeline seguir um padrão semelhante, pode usar estas referências para avaliar o seu trabalho do Dataflow em comparação com uma configuração de referência com bom desempenho.

Metodologia de teste

Os testes de referência foram realizados com os seguintes recursos:

  • Um tópico Pub/Sub pré-aprovisionado com uma carga de entrada constante. As mensagens foram geradas através do modelo Gerador de dados de streaming.

    • Taxa de mensagens: aproximadamente 1 000 000 mensagens por segundo
    • Carga de entrada: 1 GiB/s
    • Formato da mensagem: texto JSON gerado aleatoriamente com um esquema fixo
    • Tamanho da mensagem: aproximadamente 1 KiB por mensagem
  • Uma tabela do BigQuery padrão.

  • Pipelines de streaming do Dataflow baseados no modelo do Pub/Sub para o BigQuery. Estes pipelines realizam a análise e o mapeamento de esquemas mínimos necessários. Não foi usada nenhuma função definida pelo utilizador (FDU) personalizada.

Depois de a escalabilidade horizontal se estabilizar e o pipeline atingir o estado estável, os pipelines puderam ser executados durante cerca de um dia. Após este período, os resultados foram recolhidos e analisados.

Pipelines do Dataflow

Foram testadas duas variantes da pipeline:

Map-only pipeline. Este pipeline executa um mapeamento e uma conversão simples de mensagens JSON. Para este teste, foi usado o modelo do Pub/Sub para o BigQuery sem modificações.

  • Semântica: o pipeline foi testado com o modo exatamente uma vez e o modo pelo menos uma vez. O processamento de, pelo menos, uma vez oferece um melhor débito. No entanto, só deve ser usado quando os registos duplicados forem aceitáveis ou o destino a jusante processar a remoção de duplicações.

Pipeline de agregação em janelas. Esta pipeline agrupa mensagens por uma chave específica em janelas de tamanho fixo e escreve os registos agregados no BigQuery. Para este teste, foi usado um pipeline do Apache Beam personalizado com base no modelo Pub/Sub para BigQuery.

  • Lógica de agregação: para cada janela fixa de 1 minuto sem sobreposição, as mensagens com a mesma chave foram recolhidas e escritas como um único registo agregado no BigQuery. Este tipo de agregação é usado frequentemente no processamento de registos para combinar eventos relacionados, como a atividade de um utilizador, num único registo para análise a jusante.

  • Paralelismo de chaves: o teste de referência usou 1 000 000 de chaves distribuídas uniformemente.

  • Semântica: o pipeline foi testado com o modo exatamente uma vez. As agregações requerem uma semântica exatamente uma vez para garantir a correção e evitar a contagem dupla num grupo e numa janela.

Configuração da tarefa

A tabela seguinte mostra como os trabalhos do Dataflow foram configurados.

Definição Mapeamento apenas, exatamente uma vez Mapeamento apenas, pelo menos uma vez Agregação em janelas, exatamente uma vez
Tipo de máquina de trabalho n1-standard-2 n1-standard-2 n1-standard-2
CPUs virtuais da máquina de trabalho 2 2 2
RAM da máquina do trabalhador 7,5 GiB 7,5 GiB 7,5 GiB
Disco persistente da máquina de trabalho Disco persistente padrão (HDD), 30 GB Disco persistente padrão (HDD), 30 GB Disco persistente padrão (HDD), 30 GB
Trabalhadores iniciais 70 30 180
Número máximo de trabalhadores 100 100 250
Streaming Engine Sim Sim Sim
Escala automática horizontal Sim Sim Sim
Modelo de faturação Faturação baseada em recursos Faturação baseada em recursos Faturação baseada em recursos
A API Storage Write está ativada? Sim Sim Sim
Streams da API Storage Write 200 Não aplicável 500
Frequência de acionamento da API Storage Write 5 segundos Não aplicável 5 segundos

Recomendamos a API Storage Write do BigQuery para pipelines de streaming. Quando usa o modo de execução única com a API Storage Write, pode ajustar as seguintes definições:

  • Número de streams de gravação. Para garantir um paralelismo de chaves suficiente na fase de escrita, defina o número de streams da API Storage Write para um valor superior ao número de CPUs de trabalho, mantendo um nível razoável de débito da stream de escrita do BigQuery.

  • Frequência de acionamento. Um valor de segundos de um único dígito é adequado para pipelines de elevado débito.

Para mais informações, consulte o artigo Escreva do Dataflow para o BigQuery.

Resultados de testes de referência

Esta secção descreve os resultados dos testes de referência.

Débito e utilização de recursos

A tabela seguinte mostra os resultados dos testes para o débito da conduta e a utilização de recursos.

Resultado Mapeamento apenas, exatamente uma vez Mapeamento apenas, pelo menos uma vez Agregação em janelas, exatamente uma vez
Processamento de entrada por trabalhador Média: 17 MBps, n=3 Média: 21 MBps, n=3 Média: 6 MBps, n=3
Utilização média da CPU em todos os trabalhadores Média: 65%, n=3 Média: 69%, n=3 Média: 80%, n=3
Número de nós trabalhadores Média: 57, n=3 Média: 48, n=3 Média: 169, n=3
Unidades de computação do Streaming Engine por hora Média: 125, n=3 Média: 46, n=3 Média: 354, n=3

O algoritmo de dimensionamento automático pode afetar o nível de utilização da CPU de destino. Para alcançar uma utilização da CPU alvo mais elevada ou mais baixa, pode definir o intervalo de escalamento automático ou a sugestão de utilização do trabalhador. Os alvos de utilização mais elevados podem levar a custos mais baixos, mas também a uma latência de cauda pior, especialmente para cargas variáveis.

Para um pipeline de agregação de janelas, o tipo de agregação, o tamanho da janela e o paralelismo das chaves podem ter um grande impacto na utilização de recursos.

Latência

A tabela seguinte mostra os resultados de testes de referência para a latência do pipeline.

Latência ponto a ponto total do palco Mapeamento apenas, exatamente uma vez Mapeamento apenas, pelo menos uma vez Agregação em janelas, exatamente uma vez
P50 Média: 800 ms, n=3 Média: 160 ms, n=3 Média: 3400 ms, n=3
P95 Média: 2000 ms, n=3 Média: 250 ms, n=3 Média: 13 000 ms, n=3
P99 Média: 2800 ms, n=3 Média: 410 ms, n=3 Média: 25 000 ms, n=3

Os testes mediram a latência ponto a ponto por fase (a métrica job/streaming_engine/stage_end_to_end_latencies) em três execuções de testes de longa duração. Esta métrica mede o tempo que o Streaming Engine passa em cada fase do pipeline. Abrange todos os passos internos do pipeline, como:

  • Organizar e colocar mensagens em fila para processamento
  • O tempo de processamento real; por exemplo, converter mensagens em objetos de linhas
  • Escrever o estado persistente, bem como o tempo gasto na fila para escrever o estado persistente

Outra métrica de latência é a atualidade dos dados. No entanto, a atualização dos dados é afetada por fatores como a segmentação por período definida pelo utilizador e os atrasos a montante na origem. A latência do sistema oferece uma base mais objetiva para a eficiência de processamento interno e o estado de funcionamento de um pipeline sob carga.

Os dados foram medidos durante aproximadamente um dia por execução, com os períodos de início iniciais rejeitados para refletir um desempenho estável e em estado estacionário. Os resultados mostram dois fatores que introduzem latência adicional:

  • Modo exatamente uma vez. Para alcançar a semântica exatamente uma vez, são necessárias misturas determinísticas e pesquisas de estado persistentes para a remoção de duplicados. O modo at-least-once tem um desempenho significativamente mais rápido, porque ignora estes passos.

  • Agregação com janelas. As mensagens têm de ser totalmente aleatorizadas, armazenadas em buffer e escritas no estado persistente antes do fecho da janela, o que aumenta a latência ponto a ponto.

Os testes de referência apresentados aqui representam uma base de referência. A latência é altamente sensível à complexidade do pipeline. As UDFs personalizadas, as transformações adicionais e a lógica de janelas complexas podem aumentar a latência. As agregações simples e altamente redutoras, como a soma e a contagem, tendem a resultar numa latência inferior à das operações com muitos estados, como a recolha de elementos numa lista.

Estime os custos

Pode estimar o custo de base da sua própria pipeline comparável com a faturação baseada em recursos através da calculadora de preços da Google Cloud Platform, da seguinte forma:

  1. Abra a calculadora de preços.
  2. Clique em Adicionar à estimativa.
  3. Selecione Fluxo de dados.
  4. Para Tipo de serviço, selecione "Dataflow Classic".
  5. Selecione Definições avançadas para mostrar o conjunto completo de opções.
  6. Escolha a localização onde a tarefa é executada.
  7. Para Tipo de tarefa, selecione "Streaming".
  8. Selecione Ativar motor de streaming.
  9. Introduza informações sobre as horas de execução da tarefa, os nós de trabalho, as máquinas de trabalho e o armazenamento do disco persistente.
  10. Introduza o número estimado de unidades de computação do Streaming Engine.

A utilização de recursos e o custo são aproximadamente lineares com o débito de entrada, embora, para tarefas pequenas com apenas alguns trabalhadores, o custo total seja dominado pelos custos fixos. Como ponto de partida, pode extrapolar o número de nós de trabalho e o consumo de recursos a partir dos resultados de testes de referência.

Por exemplo, suponha que executa um pipeline apenas de mapas no modo exatamente uma vez, com uma taxa de dados de entrada de 100 MiB/s. Com base nos resultados de testes de referência para um pipeline de 1 GiB/s, pode estimar os requisitos de recursos da seguinte forma:

  • Fator de escalabilidade: (100 MiB/s) / (1 GiB/s) = 0,1
  • Nós trabalhadores projetados: 57 trabalhadores × 0,1 = 5,7 trabalhadores
  • Número projetado de unidades de computação do Streaming Engine por hora: 125 × 0,1 = 12,5 unidades por hora

Este valor deve ser usado apenas como uma estimativa inicial. A taxa de transferência real e o custo podem variar significativamente, com base em fatores como o tipo de máquina, a distribuição do tamanho das mensagens, o código do utilizador, o tipo de agregação, o paralelismo das chaves e o tamanho da janela. Para mais informações, consulte o artigo Práticas recomendadas para a otimização de custos do Dataflow.

Execute um pipeline de teste

Esta secção mostra os comandos gcloud dataflow flex-template run que foram usados para executar o pipeline apenas de mapas.

Modo exatamente uma vez

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5

Modo de pelo menos uma vez

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 30 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
  --additional-experiments streaming_mode_at_least_once

Substitua o seguinte:

  • JOB_ID: o ID da tarefa do Dataflow
  • PROJECT_ID: o ID do projeto
  • SUBSCRIPTION_NAME: o nome da subscrição do Pub/Sub
  • DATASET: o nome do conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery

Gere dados de teste

Para gerar dados de teste, use o seguinte comando para executar o modelo de gerador de dados de streaming:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION

Substitua o seguinte:

  • JOB_ID: o ID da tarefa do Dataflow
  • PROJECT_ID: o ID do projeto
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • SCHEMA_LOCATION: o caminho para um ficheiro de esquema no armazenamento na nuvem

O modelo do gerador de dados de streaming usa um ficheiro do gerador de dados JSON para definir o esquema de mensagens. Os testes de referência usaram um esquema de mensagens semelhante ao seguinte:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

Passos seguintes