Esta página descreve as características de desempenho dos jobs de streaming do Dataflow que leem do Pub/Sub e gravam no BigQuery. Ela apresenta os resultados de testes de comparativo para dois tipos de pipeline de streaming:
Somente mapa (transformação por mensagem): pipelines que realizam transformações por mensagem, sem rastrear o estado ou agrupar elementos no stream. Exemplos incluem ETL, validação de campo e mapeamento de esquema.
Agregação em janelas (
GroupByKey): pipelines que realizam operações com estado e agrupam dados com base em uma chave e uma janela de tempo. Exemplos incluem contagem de eventos, cálculo de somas e coleta de registros para uma sessão de usuário.
A maioria das cargas de trabalho para integração de dados de streaming se enquadra nessas duas categorias. Se o pipeline seguir um padrão semelhante, use esses comparativos para avaliar o job do Dataflow em relação a uma configuração de referência com bom desempenho.
Metodologia de teste
Os comparativos foram realizados usando os seguintes recursos:
Um tópico do Pub/Sub pré-provisionado com uma carga de entrada constante. As mensagens foram geradas usando o modelo do gerador de dados de streaming.
- Taxa de mensagens: aproximadamente 1.000.000 de mensagens por segundo
- Carga de entrada: 1 GiB/s
- Formato da mensagem: texto JSON gerado aleatoriamente com um esquema fixo
- Tamanho da mensagem: aproximadamente 1 KiB por mensagem
Uma tabela padrão do BigQuery.
Pipelines de streaming do Dataflow com base no modelo do Pub/Sub para BigQuery. Esses pipelines realizam a análise e o mapeamento de esquema mínimos necessários. Nenhuma função definida pelo usuário (UDF) personalizada foi usada.
Depois que o escalonamento horizontal se estabilizou e o pipeline atingiu o estado estável, os pipelines foram executados por aproximadamente um dia. Depois disso, os resultados foram coletados e analisados.
Pipelines do Dataflow
Duas variantes de pipeline foram testadas:
Pipeline somente de mapa. Esse pipeline realiza um mapeamento e uma conversão simples de mensagens JSON. Para esse teste, o modelo do Pub/Sub para BigQuery foi usado sem modificação.
- Semântica: o pipeline foi testado usando o modo "exatamente uma vez" e o modo "pelo menos uma vez". O processamento "pelo menos uma vez" oferece melhor capacidade de processamento. No entanto, ele só deve ser usado quando registros duplicados forem aceitáveis ou quando o coletor downstream processar a remoção de duplicação.
Pipeline de agregação em janelas. Esse pipeline agrupa mensagens por uma chave específica em janelas de tamanho fixo e grava os registros agregados no BigQuery. Para esse teste, um pipeline personalizado do Apache Beam com base no modelo do Pub/Sub para BigQuery foi usado.
Lógica de agregação: para cada janela fixa e não sobreposta de 1 minuto, as mensagens com a mesma chave foram coletadas e gravadas como um único registro agregado no BigQuery. Esse tipo de agregação é usado com frequência no processamento de registros para combinar eventos relacionados, como a atividade de um usuário, em um único registro para análise downstream.
Paralelismo de chaves: o comparativo usou 1.000.000 de chaves distribuídas uniformemente.
Semântica: o pipeline foi testado usando o modo "exatamente uma vez". As agregações exigem semântica "exatamente uma vez" para garantir a correção e evitar a contagem dupla em um grupo e janela.
Configuração do job
A tabela a seguir mostra como os jobs do Dataflow foram configurados.
| Configuração | Somente mapa, exatamente uma vez | Somente mapa, pelo menos uma vez | Agregação em janelas, exatamente uma vez |
|---|---|---|---|
| Tipo de máquina do worker | n1-standard-2 |
n1-standard-2 |
n1-standard-2 |
| vCPUs da máquina do worker | 2 | 2 | 2 |
| RAM da máquina do worker | 7,5 GiB | 7,5 GiB | 7,5 GiB |
| Persistent Disk da máquina do worker | Disco permanente padrão (HDD), 30 GB | Disco permanente padrão (HDD), 30 GB | Disco permanente padrão (HDD), 30 GB |
| Workers iniciais | 70 | 30 | 180 |
| Número máximo de workers | 100 | 100 | 250 |
| Streaming Engine | Sim | Sim | Sim |
| Escalonamento automático horizontal | Sim | Sim | Sim |
| Modelo de faturamento | Faturamento com base em recursos | Faturamento com base em recursos | Faturamento com base em recursos |
| A API Storage Write está ativada? | Sim | Sim | Sim |
| Fluxos da API Storage Write | 200 | Não relevante | 500 |
| Frequência de acionamento da API Storage Write | 5 segundos | Não relevante | 5 segundos |
A API BigQuery Storage Write é recomendada para pipelines de streaming. Ao usar o modo "exatamente uma vez" com a API Storage Write, é possível ajustar as seguintes configurações:
Número de fluxos de gravação. Para garantir paralelismo de chaves suficiente no estágio de gravação, defina o número de fluxos da API Storage Write como um valor maior que o número de CPUs do worker, mantendo um nível razoável de capacidade de processamento do fluxo de gravação do BigQuery .
Frequência de acionamento. Um valor de segundo de um único dígito é adequado para pipelines de alta capacidade de processamento.
Para mais informações, consulte Gravar do Dataflow para o BigQuery.
Resultados da comparação
Esta seção descreve os resultados dos testes de comparativo.
Capacidade de processamento e uso de recursos
A tabela a seguir mostra os resultados dos testes de capacidade de processamento e uso de recursos do pipeline.
| Resultado | Somente mapa, exatamente uma vez | Somente mapa, pelo menos uma vez | Agregação em janelas, exatamente uma vez |
|---|---|---|---|
| Capacidade de processamento de entrada por worker | Média: 17 MBps, n=3 | Média: 21 MBps, n=3 | Média: 6 MBps, n=3 |
| Uso médio da CPU em todos os workers | Média: 65%, n=3 | Média: 69%, n=3 | Média: 80%, n=3 |
| Número de nós de trabalho | Média: 57, n=3 | Média: 48, n=3 | Média: 169, n=3 |
| Unidades de computação do Streaming Engine por hora | Média: 125, n=3 | Média: 46, n=3 | Média: 354, n=3 |
O algoritmo de escalonamento automático pode afetar o nível de uso da CPU de destino. Para atingir um uso maior ou menor da CPU de destino, defina o intervalo de escalonamento automático ou a dica de uso do worker. Metas de uso mais altas podem levar a custos mais baixos, mas também a uma latência de cauda pior, especialmente para cargas variáveis.
Para um pipeline de agregação em janelas, o tipo de agregação, o tamanho da janela e o paralelismo de chaves podem ter um grande impacto no uso de recursos.
Latência
A tabela a seguir mostra os resultados do comparativo de latência do pipeline.
| Latência total de ponta a ponta do estágio | Somente mapa, exatamente uma vez | Somente mapa, pelo menos uma vez | Agregação em janelas, exatamente uma vez |
|---|---|---|---|
| P50 | Média: 800 ms, n=3 | Média: 160 ms, n=3 | Média: 3.400 ms, n=3 |
| P95 | Média: 2.000 ms, n=3 | Média: 250 ms, n=3 | Média: 13.000 ms, n=3 |
| P99 | Média: 2.800 ms, n=3 | Média: 410 ms, n=3 | Média: 25.000 ms, n=3 |
Os testes mediram a latência de ponta a ponta por estágio
(a job/streaming_engine/stage_end_to_end_latencies
métrica) em três execuções de teste de longa duração. Essa métrica mede quanto tempo o Streaming Engine gasta em cada estágio do pipeline. Ela abrange todas as etapas internas do pipeline, como:
- Embaralhamento e enfileiramento de mensagens para processamento
- O tempo de processamento real; por exemplo, converter mensagens em objetos de linha
- Gravação de estado persistente, bem como tempo gasto no enfileiramento para gravar o estado persistente
Outra métrica de latência é a atualização de dados. No entanto, a atualização de dados é afetada por fatores como janelas definidas pelo usuário e atrasos upstream na origem. A latência do sistema fornece uma linha de base mais objetiva para a eficiência e integridade do processamento interno de um pipeline sob carga.
Os dados foram medidos em aproximadamente um dia por execução, com os períodos de inicialização descartados para refletir o desempenho estável e constante. Os resultados mostram dois fatores que introduzem latência adicional:
Modo "Exatamente uma vez". Para conseguir semântica "exatamente uma vez", o embaralhamento determinístico e as pesquisas de estado persistente são necessários para a remoção de duplicação. O modo "Pelo menos uma vez" é executado significativamente mais rápido, porque ignora essas etapas.
Agregação em janelas. As mensagens precisam ser totalmente embaralhadas, armazenadas em buffer e gravadas no estado persistente antes do fechamento da janela, o que aumenta a latência de ponta a ponta.
Os comparativos mostrados aqui representam uma linha de base. A latência é altamente sensível à complexidade do pipeline. UDFs personalizadas, transformações adicionais e lógica de janelas complexa podem aumentar a latência. Agregações simples e altamente redutoras, como soma e contagem, tendem a resultar em latência menor do que operações com estado pesado, como coletar elementos em uma lista.
Estimar custos
É possível estimar o custo de linha de base do seu próprio pipeline comparável com o faturamento com base em recursos usando a Google Cloud calculadora de preços, da seguinte maneira:
- Abra a calculadora de preços.
- Clique em Adicionar à estimativa.
- Selecione Dataflow.
- Em Tipo de serviço, selecione "Dataflow Classic".
- Selecione Configurações avançadas para mostrar o conjunto completo de opções.
- Escolha o local em que o job é executado.
- Em Tipo de serviço, selecione "Streaming".
- Selecione Ativar o Streaming Engine.
- Insira informações sobre as horas de execução do job, nós de trabalho, máquinas de worker e armazenamento em Persistent Disk.
- Insira o número estimado de unidades de computação do Streaming Engine.
O uso de recursos e o custo são dimensionados de forma aproximadamente linear com a capacidade de processamento de entrada, embora, para jobs pequenos com apenas alguns workers, o custo total seja dominado por custos fixos. Como ponto de partida, é possível extrapolar o número de nós de trabalho e o consumo de recursos dos resultados do comparativo.
Por exemplo, suponha que você execute um pipeline somente de mapa no modo "exatamente uma vez", com uma taxa de dados de entrada de 100 MiB/s. Com base nos resultados do comparativo de um pipeline de 1 GiB/s, é possível estimar os requisitos de recursos da seguinte maneira:
- Fator de escalonamento: (100 MiB/s) / (1 GiB/s) = 0,1
- Nós de trabalho projetados: 57 workers × 0,1 = 5,7 workers
- Número projetado de unidades de computação do Streaming Engine por hora: 125 × 0,1 = 12,5 unidades por hora
Esse valor só deve ser usado como uma estimativa inicial. A capacidade de processamento e o custo reais podem variar significativamente, com base em fatores como tipo de máquina, distribuição de tamanho de mensagem, código do usuário, tipo de agregação, paralelismo de chaves e tamanho da janela. Para mais informações, consulte Práticas recomendadas para otimização de custos do Dataflow.
Executar um pipeline de teste
Esta seção mostra os
gcloud dataflow flex-template run
comandos que foram usados para executar o pipeline somente de mapa.
Modo "Exatamente uma vez"
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 70 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5
Modo "Pelo menos uma vez"
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 30 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
--additional-experiments streaming_mode_at_least_once
Substitua:
JOB_ID: o ID do job do DataflowPROJECT_ID: o ID do projetoSUBSCRIPTION_NAME: o nome da assinatura do Pub/SubDATASET: o nome do conjunto de dados do BigQueryTABLE_NAME: o nome da tabela do BigQuery
Gerar dados de teste
Para gerar dados de teste, use o comando a seguir para executar o modelo do gerador de dados de streaming:
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--num-workers 70 \
--max-workers 100 \
--parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION
Substitua:
JOB_ID: o ID do job do DataflowPROJECT_ID: o ID do projetoTOPIC_NAME: o nome do tópico do Pub/SubSCHEMA_LOCATION: o caminho para um arquivo de esquema no Cloud Storage
O modelo do gerador de dados de streaming usa um arquivo do gerador de dados JSON para definir o esquema de mensagens. Os testes de comparativo usaram um esquema de mensagens semelhante ao seguinte:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
Próximas etapas
- Usar a interface de monitoramento de jobs do Dataflow
- Práticas recomendadas para otimização de custos do Dataflow
- Resolver problemas de jobs de streaming lentos ou travados
- Ler do Pub/Sub para o Dataflow
- Gravar do Dataflow para o BigQuery