Esta página descreve as caraterísticas de desempenho dos trabalhos de streaming do Dataflow que leem a partir do Pub/Sub e escrevem no BigQuery. Fornece resultados de testes de referência para dois tipos de pipeline de streaming:
Apenas mapeamento (transformação por mensagem): pipelines que realizam transformações por mensagem, sem monitorizar o estado nem agrupar elementos na stream. Os exemplos incluem ETL, validação de campos e mapeamento de esquemas.
Agregação em janelas (
GroupByKey): pipelines que executam operações com estado e agrupam dados com base numa chave e num intervalo de tempo. Os exemplos incluem a contagem de eventos, o cálculo de somas e a recolha de registos para uma sessão do utilizador.
A maioria dos fluxos de trabalho para a integração de dados de streaming enquadra-se nestas duas categorias. Se o seu pipeline seguir um padrão semelhante, pode usar estas referências para avaliar o seu trabalho do Dataflow em comparação com uma configuração de referência com bom desempenho.
Metodologia de teste
Os testes de referência foram realizados com os seguintes recursos:
Um tópico Pub/Sub pré-aprovisionado com uma carga de entrada constante. As mensagens foram geradas através do modelo Gerador de dados de streaming.
- Taxa de mensagens: aproximadamente 1 000 000 mensagens por segundo
- Carga de entrada: 1 GiB/s
- Formato da mensagem: texto JSON gerado aleatoriamente com um esquema fixo
- Tamanho da mensagem: aproximadamente 1 KiB por mensagem
Uma tabela do BigQuery padrão.
Pipelines de streaming do Dataflow baseados no modelo do Pub/Sub para o BigQuery. Estes pipelines realizam a análise e o mapeamento de esquemas mínimos necessários. Não foi usada nenhuma função definida pelo utilizador (FDU) personalizada.
Depois de a escalabilidade horizontal se estabilizar e o pipeline atingir o estado estável, os pipelines puderam ser executados durante cerca de um dia. Após este período, os resultados foram recolhidos e analisados.
Pipelines do Dataflow
Foram testadas duas variantes da pipeline:
Map-only pipeline. Este pipeline executa um mapeamento e uma conversão simples de mensagens JSON. Para este teste, foi usado o modelo do Pub/Sub para o BigQuery sem modificações.
- Semântica: o pipeline foi testado com o modo exatamente uma vez e o modo pelo menos uma vez. O processamento de, pelo menos, uma vez oferece um melhor débito. No entanto, só deve ser usado quando os registos duplicados forem aceitáveis ou o destino a jusante processar a remoção de duplicações.
Pipeline de agregação em janelas. Esta pipeline agrupa mensagens por uma chave específica em janelas de tamanho fixo e escreve os registos agregados no BigQuery. Para este teste, foi usado um pipeline do Apache Beam personalizado com base no modelo Pub/Sub para BigQuery.
Lógica de agregação: para cada janela fixa de 1 minuto sem sobreposição, as mensagens com a mesma chave foram recolhidas e escritas como um único registo agregado no BigQuery. Este tipo de agregação é usado frequentemente no processamento de registos para combinar eventos relacionados, como a atividade de um utilizador, num único registo para análise a jusante.
Paralelismo de chaves: o teste de referência usou 1 000 000 de chaves distribuídas uniformemente.
Semântica: o pipeline foi testado com o modo exatamente uma vez. As agregações requerem uma semântica exatamente uma vez para garantir a correção e evitar a contagem dupla num grupo e numa janela.
Configuração da tarefa
A tabela seguinte mostra como os trabalhos do Dataflow foram configurados.
| Definição | Mapeamento apenas, exatamente uma vez | Mapeamento apenas, pelo menos uma vez | Agregação em janelas, exatamente uma vez |
|---|---|---|---|
| Tipo de máquina de trabalho | n1-standard-2 |
n1-standard-2 |
n1-standard-2 |
| CPUs virtuais da máquina de trabalho | 2 | 2 | 2 |
| RAM da máquina do trabalhador | 7,5 GiB | 7,5 GiB | 7,5 GiB |
| Disco persistente da máquina de trabalho | Disco persistente padrão (HDD), 30 GB | Disco persistente padrão (HDD), 30 GB | Disco persistente padrão (HDD), 30 GB |
| Trabalhadores iniciais | 70 | 30 | 180 |
| Número máximo de trabalhadores | 100 | 100 | 250 |
| Streaming Engine | Sim | Sim | Sim |
| Escala automática horizontal | Sim | Sim | Sim |
| Modelo de faturação | Faturação baseada em recursos | Faturação baseada em recursos | Faturação baseada em recursos |
| A API Storage Write está ativada? | Sim | Sim | Sim |
| Streams da API Storage Write | 200 | Não aplicável | 500 |
| Frequência de acionamento da API Storage Write | 5 segundos | Não aplicável | 5 segundos |
Recomendamos a API Storage Write do BigQuery para pipelines de streaming. Quando usa o modo de execução única com a API Storage Write, pode ajustar as seguintes definições:
Número de streams de gravação. Para garantir um paralelismo de chaves suficiente na fase de escrita, defina o número de streams da API Storage Write para um valor superior ao número de CPUs de trabalho, mantendo um nível razoável de débito da stream de escrita do BigQuery.
Frequência de acionamento. Um valor de segundos de um único dígito é adequado para pipelines de elevado débito.
Para mais informações, consulte o artigo Escreva do Dataflow para o BigQuery.
Resultados de testes de referência
Esta secção descreve os resultados dos testes de referência.
Débito e utilização de recursos
A tabela seguinte mostra os resultados dos testes para o débito da conduta e a utilização de recursos.
| Resultado | Mapeamento apenas, exatamente uma vez | Mapeamento apenas, pelo menos uma vez | Agregação em janelas, exatamente uma vez |
|---|---|---|---|
| Processamento de entrada por trabalhador | Média: 17 MBps, n=3 | Média: 21 MBps, n=3 | Média: 6 MBps, n=3 |
| Utilização média da CPU em todos os trabalhadores | Média: 65%, n=3 | Média: 69%, n=3 | Média: 80%, n=3 |
| Número de nós trabalhadores | Média: 57, n=3 | Média: 48, n=3 | Média: 169, n=3 |
| Unidades de computação do Streaming Engine por hora | Média: 125, n=3 | Média: 46, n=3 | Média: 354, n=3 |
O algoritmo de dimensionamento automático pode afetar o nível de utilização da CPU de destino. Para alcançar uma utilização da CPU alvo mais elevada ou mais baixa, pode definir o intervalo de escalamento automático ou a sugestão de utilização do trabalhador. Os alvos de utilização mais elevados podem levar a custos mais baixos, mas também a uma latência de cauda pior, especialmente para cargas variáveis.
Para um pipeline de agregação de janelas, o tipo de agregação, o tamanho da janela e o paralelismo das chaves podem ter um grande impacto na utilização de recursos.
Latência
A tabela seguinte mostra os resultados de testes de referência para a latência do pipeline.
| Latência ponto a ponto total do palco | Mapeamento apenas, exatamente uma vez | Mapeamento apenas, pelo menos uma vez | Agregação em janelas, exatamente uma vez |
|---|---|---|---|
| P50 | Média: 800 ms, n=3 | Média: 160 ms, n=3 | Média: 3400 ms, n=3 |
| P95 | Média: 2000 ms, n=3 | Média: 250 ms, n=3 | Média: 13 000 ms, n=3 |
| P99 | Média: 2800 ms, n=3 | Média: 410 ms, n=3 | Média: 25 000 ms, n=3 |
Os testes mediram a latência ponto a ponto por fase (a métrica job/streaming_engine/stage_end_to_end_latencies) em três execuções de testes de longa duração. Esta métrica mede o tempo que o Streaming Engine passa em cada fase do pipeline. Abrange todos os passos internos do pipeline, como:
- Organizar e colocar mensagens em fila para processamento
- O tempo de processamento real; por exemplo, converter mensagens em objetos de linhas
- Escrever o estado persistente, bem como o tempo gasto na fila para escrever o estado persistente
Outra métrica de latência é a atualidade dos dados. No entanto, a atualização dos dados é afetada por fatores como a segmentação por período definida pelo utilizador e os atrasos a montante na origem. A latência do sistema oferece uma base mais objetiva para a eficiência de processamento interno e o estado de funcionamento de um pipeline sob carga.
Os dados foram medidos durante aproximadamente um dia por execução, com os períodos de início iniciais rejeitados para refletir um desempenho estável e em estado estacionário. Os resultados mostram dois fatores que introduzem latência adicional:
Modo exatamente uma vez. Para alcançar a semântica exatamente uma vez, são necessárias misturas determinísticas e pesquisas de estado persistentes para a remoção de duplicados. O modo at-least-once tem um desempenho significativamente mais rápido, porque ignora estes passos.
Agregação com janelas. As mensagens têm de ser totalmente aleatorizadas, armazenadas em buffer e escritas no estado persistente antes do fecho da janela, o que aumenta a latência ponto a ponto.
Os testes de referência apresentados aqui representam uma base de referência. A latência é altamente sensível à complexidade do pipeline. As UDFs personalizadas, as transformações adicionais e a lógica de janelas complexas podem aumentar a latência. As agregações simples e altamente redutoras, como a soma e a contagem, tendem a resultar numa latência inferior à das operações com muitos estados, como a recolha de elementos numa lista.
Estime os custos
Pode estimar o custo de base da sua própria pipeline comparável com a faturação baseada em recursos através da calculadora de preços da Google Cloud Platform, da seguinte forma:
- Abra a calculadora de preços.
- Clique em Adicionar à estimativa.
- Selecione Fluxo de dados.
- Para Tipo de serviço, selecione "Dataflow Classic".
- Selecione Definições avançadas para mostrar o conjunto completo de opções.
- Escolha a localização onde a tarefa é executada.
- Para Tipo de tarefa, selecione "Streaming".
- Selecione Ativar motor de streaming.
- Introduza informações sobre as horas de execução da tarefa, os nós de trabalho, as máquinas de trabalho e o armazenamento do disco persistente.
- Introduza o número estimado de unidades de computação do Streaming Engine.
A utilização de recursos e o custo são aproximadamente lineares com o débito de entrada, embora, para tarefas pequenas com apenas alguns trabalhadores, o custo total seja dominado pelos custos fixos. Como ponto de partida, pode extrapolar o número de nós de trabalho e o consumo de recursos a partir dos resultados de testes de referência.
Por exemplo, suponha que executa um pipeline apenas de mapas no modo exatamente uma vez, com uma taxa de dados de entrada de 100 MiB/s. Com base nos resultados de testes de referência para um pipeline de 1 GiB/s, pode estimar os requisitos de recursos da seguinte forma:
- Fator de escalabilidade: (100 MiB/s) / (1 GiB/s) = 0,1
- Nós trabalhadores projetados: 57 trabalhadores × 0,1 = 5,7 trabalhadores
- Número projetado de unidades de computação do Streaming Engine por hora: 125 × 0,1 = 12,5 unidades por hora
Este valor deve ser usado apenas como uma estimativa inicial. A taxa de transferência real e o custo podem variar significativamente, com base em fatores como o tipo de máquina, a distribuição do tamanho das mensagens, o código do utilizador, o tipo de agregação, o paralelismo das chaves e o tamanho da janela. Para mais informações, consulte o artigo Práticas recomendadas para a otimização de custos do Dataflow.
Execute um pipeline de teste
Esta secção mostra os comandos
gcloud dataflow flex-template run
que foram usados para executar o pipeline apenas de mapas.
Modo exatamente uma vez
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 70 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5
Modo de pelo menos uma vez
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 30 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
--additional-experiments streaming_mode_at_least_once
Substitua o seguinte:
JOB_ID: o ID da tarefa do DataflowPROJECT_ID: o ID do projetoSUBSCRIPTION_NAME: o nome da subscrição do Pub/SubDATASET: o nome do conjunto de dados do BigQueryTABLE_NAME: o nome da tabela do BigQuery
Gere dados de teste
Para gerar dados de teste, use o seguinte comando para executar o modelo de gerador de dados de streaming:
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--num-workers 70 \
--max-workers 100 \
--parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION
Substitua o seguinte:
JOB_ID: o ID da tarefa do DataflowPROJECT_ID: o ID do projetoTOPIC_NAME: o nome do tópico do Pub/SubSCHEMA_LOCATION: o caminho para um ficheiro de esquema no armazenamento na nuvem
O modelo do gerador de dados de streaming usa um ficheiro do gerador de dados JSON para definir o esquema de mensagens. Os testes de referência usaram um esquema de mensagens semelhante ao seguinte:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
Passos seguintes
- Use a interface de monitorização de tarefas do Dataflow
- Práticas recomendadas para a otimização de custos do Dataflow
- Resolva problemas de tarefas de streaming lentas ou bloqueadas
- Leia do Pub/Sub para o Dataflow
- Escreva do Dataflow para o BigQuery