Esta página descreve as características de performance dos jobs de streaming do Dataflow que leem do Pub/Sub e gravam no BigQuery. Ele fornece resultados de testes comparativos para dois tipos de pipeline de streaming:
Somente mapa (transformação por mensagem): pipelines que realizam transformações por mensagem, sem rastrear o estado ou agrupar elementos no fluxo. Exemplos incluem ETL, validação de campo e mapeamento de esquema.
Agregação em janelas (
GroupByKey): pipelines que executam operações com estado e agrupam dados com base em uma chave e uma janela de tempo. Por exemplo, contagem de eventos, cálculo de somas e coleta de registros para uma sessão do usuário.
A maioria das cargas de trabalho para integração de dados de streaming se enquadra nessas duas categorias. Se o pipeline seguir um padrão semelhante, use esses comparativos para avaliar seu job do Dataflow em relação a uma configuração de referência com bom desempenho.
Metodologia de teste
Os comparativos de mercado foram realizados usando os seguintes recursos:
Um tópico do Pub/Sub pré-provisionado com uma carga de entrada constante. As mensagens foram geradas usando o modelo Gerador de dados de streaming.
- Taxa de mensagens: aproximadamente 1 milhão de mensagens por segundo
- Carga de entrada: 1 GiB/s
- Formato da mensagem: texto JSON gerado aleatoriamente com um esquema fixo
- Tamanho da mensagem: aproximadamente 1 KiB por mensagem
Uma tabela padrão do BigQuery.
Pipelines de streaming do Dataflow com base no modelo do Pub/Sub para o BigQuery. Esses pipelines realizam a análise e o mapeamento de esquema mínimos necessários. Nenhuma função definida pelo usuário (UDF) personalizada foi usada.
Depois que o escalonamento horizontal se estabilizou e o pipeline atingiu o estado estável, os pipelines puderam ser executados por aproximadamente um dia. Depois disso, os resultados foram coletados e analisados.
Pipelines do Dataflow
Duas variantes de pipeline foram testadas:
Pipeline somente de mapa. Esse pipeline realiza um mapeamento e uma conversão simples de mensagens JSON. Para este teste, o modelo do Pub/Sub para BigQuery foi usado sem modificação.
- Semântica: o pipeline foi testado usando o modo exatamente uma vez e o modo pelo menos uma vez. O processamento de pelo menos uma vez oferece uma capacidade de processamento melhor. No entanto, ele só deve ser usado quando registros duplicados são aceitáveis ou o coletor downstream processa a eliminação de duplicação.
Pipeline de agregação em janelas. Esse pipeline agrupa mensagens por uma chave específica em janelas de tamanho fixo e grava os registros agregados no BigQuery. Para este teste, foi usado um pipeline personalizado do Apache Beam baseado no modelo do Pub/Sub para o BigQuery.
Lógica de agregação: para cada janela fixa de um minuto sem sobreposição, as mensagens com a mesma chave foram coletadas e gravadas como um único registro agregado no BigQuery. Esse tipo de agregação é usado com frequência no processamento de registros para combinar eventos relacionados, como a atividade de um usuário, em um único registro para análise downstream.
Paralelismo de chaves: o comparativo de mercado usou 1.000.000 de chaves distribuídas uniformemente.
Semântica: o pipeline foi testado usando o modo "exatamente uma vez". As agregações exigem semântica de execução única para garantir a correção e evitar a contagem dupla em um grupo e uma janela.
Configuração do job
A tabela a seguir mostra como os jobs do Dataflow foram configurados.
| Configuração | Map only, exatamente uma vez | Somente mapa, pelo menos uma vez | Agregação em janela, exatamente uma vez |
|---|---|---|---|
| Tipo de máquina do worker | n1-standard-2 |
n1-standard-2 |
n1-standard-2 |
| vCPUs da máquina worker | 2 | 2 | 2 |
| RAM da máquina do worker | 7,5 GiB | 7,5 GiB | 7,5 GiB |
| Persistent Disk da máquina worker | Disco permanente padrão (HDD), 30 GB | Disco permanente padrão (HDD), 30 GB | Disco permanente padrão (HDD), 30 GB |
| Workers iniciais | 70 | 30 | 180 |
| Número máximo de workers | 100 | 100 | 250 |
| Streaming Engine | Sim | Sim | Sim |
| Escalonamento automático horizontal | Sim | Sim | Sim |
| Modelo de faturamento | Faturamento baseado em recursos | Faturamento baseado em recursos | Faturamento baseado em recursos |
| A API Storage Write está ativada? | Sim | Sim | Sim |
| Streams da API Storage Write | 200 | Não relevante | 500 |
| Frequência de acionamento da API Storage Write | 5 segundos | Não relevante | 5 segundos |
A API BigQuery Storage Write é recomendada para pipelines de streaming. Ao usar o modo "exatamente uma vez" com a API Storage Write, é possível ajustar as seguintes configurações:
Número de streams de gravação. Para garantir paralelismo de chave suficiente na etapa de gravação, defina o número de fluxos da API Storage Write como um valor maior que o número de CPUs de worker, mantendo um nível razoável de capacidade de gravação de fluxo do BigQuery.
Frequência de acionamento. Um valor de segundo de um único dígito é adequado para pipelines de alta taxa de transferência.
Para mais informações, consulte Gravar do Dataflow para o BigQuery.
Resultados da comparação
Esta seção descreve os resultados dos testes comparativos.
Taxa de transferência e uso de recursos
A tabela a seguir mostra os resultados do teste para capacidade de processamento do pipeline e uso de recursos.
| Resultado | Map only, exatamente uma vez | Somente mapa, pelo menos uma vez | Agregação em janela, exatamente uma vez |
|---|---|---|---|
| Capacidade de entrada por worker | Média: 17 MBps, n=3 | Média: 21 MBps, n=3 | Média: 6 MBps, n=3 |
| Uso médio da CPU em todos os workers | Média: 65%, n=3 | Média: 69%, n=3 | Média: 80%, n=3 |
| Número de nós de trabalho | Média: 57, n=3 | Média: 48, n=3 | Média: 169, n=3 |
| Unidades de computação do Streaming Engine por hora | Média: 125, n=3 | Média: 46, n=3 | Média: 354, n=3 |
O algoritmo de escalonamento automático pode afetar o nível de utilização da CPU desejado. Para atingir uma meta de utilização da CPU maior ou menor, defina o intervalo de escalonamento automático ou a dica de utilização do worker. Metas de utilização mais altas podem levar a custos mais baixos, mas também a uma latência de cauda pior, especialmente para cargas variáveis.
Para um pipeline de agregação de janelas, o tipo de agregação, o tamanho da janela e o paralelismo de chaves podem ter um grande impacto no uso de recursos.
Latência
A tabela a seguir mostra os resultados do comparativo de mercado para a latência do pipeline.
| Latência total de ponta a ponta do estágio | Map only, exatamente uma vez | Somente mapa, pelo menos uma vez | Agregação em janela, exatamente uma vez |
|---|---|---|---|
| P50 | Média: 800 ms, n=3 | Média: 160 ms, n=3 | Média: 3.400 ms, n=3 |
| P95 | Média: 2.000 ms, n=3 | Média: 250 ms, n=3 | Média: 13.000 ms, n=3 |
| P99 | Média: 2.800 ms, n=3 | Média: 410 ms, n=3 | Média: 25.000 ms, n=3 |
Os testes mediram a latência completa por estágio (a métrica job/streaming_engine/stage_end_to_end_latencies) em três execuções de teste de longa duração. Essa métrica mede quanto tempo
o Streaming Engine passa em cada etapa do pipeline. Ele abrange todas as etapas internas do pipeline, como:
- Organizar e enfileirar mensagens para processamento
- O tempo real de processamento, por exemplo, a conversão de mensagens em objetos de linha
- Gravação do estado persistente e tempo gasto na fila para gravar o estado persistente
Outra métrica de latência é a atualização de dados. No entanto, a atualização de dados é afetada por fatores como janelas definidas pelo usuário e atrasos upstream na origem. A latência do sistema fornece um valor de referência mais objetivo para a eficiência e a integridade do processamento interno de um pipeline sob carga.
Os dados foram medidos em aproximadamente um dia por execução, e os períodos iniciais de inicialização foram descartados para refletir uma performance estável e de estado constante. Os resultados mostram dois fatores que introduzem mais latência:
Modo "Exatamente uma vez". Para conseguir semântica exatamente uma vez, é necessário usar embaralhamento determinístico e pesquisas de estado persistente para eliminar a duplicação. O modo "pelo menos uma vez" funciona muito mais rápido porque ignora essas etapas.
Agregação em janela. As mensagens precisam ser totalmente embaralhadas, armazenadas em buffer e gravadas em estado persistente antes do fechamento da janela, aumentando a latência de ponta a ponta.
Os comparativos de mercado mostrados aqui representam um valor de referência. A latência é altamente sensível à complexidade do pipeline. UDFs personalizadas, transformações adicionais e lógica de janela complexa podem aumentar a latência. Agregações simples e altamente redutoras, como soma e contagem, tendem a resultar em menor latência do que operações com muitos estados, como coleta de elementos em uma lista.
Estimar custos
É possível estimar o custo de base do seu próprio pipeline comparável com o faturamento baseado em recursos usando a calculadora de preços do Google Cloud Platform, da seguinte forma:
- Abra a calculadora de preços.
- Clique em Adicionar à estimativa.
- Selecione "Dataflow".
- Em Tipo de serviço, selecione "Dataflow Classic".
- Selecione Configurações avançadas para mostrar todas as opções.
- Escolha o local em que o job será executado.
- Em Tipo de job, selecione "Streaming".
- Selecione Ativar o Streaming Engine.
- Insira informações sobre as horas de execução do job, os nós de trabalho, as máquinas de trabalho e o armazenamento em Persistent Disk.
- Insira o número estimado de unidades de computação do Streaming Engine.
O uso de recursos e o custo são dimensionados de forma aproximadamente linear com a taxa de transferência de entrada. No entanto, para jobs pequenos com apenas alguns workers, o custo total é dominado por custos fixos. Para começar, extrapole o número de nós de trabalho e o consumo de recursos dos resultados do comparativo.
Por exemplo, suponha que você execute um pipeline somente de mapa no modo exatamente uma vez, com uma taxa de dados de entrada de 100 MiB/s. Com base nos resultados do comparativo de mercado para um pipeline de 1 GiB/s, é possível estimar os requisitos de recursos da seguinte forma:
- Fator de escalonamento: (100 MiB/s) / (1 GiB/s) = 0,1
- Nós de trabalho projetados: 57 workers × 0,1 = 5,7 workers
- Número estimado de unidades de computação do Streaming Engine por hora: 125 × 0,1 = 12,5 unidades por hora
Esse valor só deve ser usado como uma estimativa inicial. A taxa de transferência e o custo reais podem variar muito com base em fatores como tipo de máquina, distribuição de tamanho da mensagem, código do usuário, tipo de agregação, paralelismo de chave e tamanho da janela. Para mais informações, consulte Práticas recomendadas para otimização de custos do Dataflow.
Executar um pipeline de teste
Esta seção mostra os comandos
gcloud dataflow flex-template run
usados para executar o pipeline somente de mapa.
Modo de execução única
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 70 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5
Modo "Pelo menos uma vez"
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 30 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
--additional-experiments streaming_mode_at_least_once
Substitua:
JOB_ID: o ID do job do DataflowPROJECT_ID: o ID do projeto;SUBSCRIPTION_NAME: o nome da assinatura do Pub/SubDATASET: o nome do conjunto de dados do BigQueryTABLE_NAME: o nome da tabela do BigQuery
Gerar dados de teste
Para gerar dados de teste, use o comando a seguir para executar o modelo Gerador de dados de streaming:
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--num-workers 70 \
--max-workers 100 \
--parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION
Substitua:
JOB_ID: o ID do job do DataflowPROJECT_ID: o ID do projeto;TOPIC_NAME: o nome do tópico do Pub/SubSCHEMA_LOCATION: o caminho para um arquivo de esquema no Cloud Storage
O modelo "Gerador de dados de streaming" usa um arquivo do gerador de dados JSON para definir o esquema de mensagens. Os testes de comparativo de mercado usaram um esquema de mensagem semelhante a este:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
Próximas etapas
- Usar a interface de monitoramento de jobs do Dataflow
- Práticas recomendadas para otimização de custos do Dataflow
- Resolver problemas de jobs de streaming lentos ou travados
- Ler do Pub/Sub para o Dataflow
- Gravar do Dataflow para o BigQuery