Características de rendimiento de las canalizaciones de Pub/Sub a BigQuery

En esta página se describen las características de rendimiento de las tareas de streaming de Dataflow que leen datos de Pub/Sub y los escriben en BigQuery. Proporciona resultados de pruebas de rendimiento de dos tipos de canalizaciones de streaming:

  • Solo de asignación (transformación por mensaje): son las canalizaciones que realizan transformaciones por mensaje sin hacer un seguimiento del estado ni agrupar elementos en el flujo. Por ejemplo, ETL, validación de campos y asignación de esquemas.

  • Agregación con ventana (GroupByKey): canalizaciones que realizan operaciones con estado y agrupan datos en función de una clave y una ventana temporal. Por ejemplo, contar eventos, calcular sumas y recoger registros de una sesión de usuario.

La mayoría de las cargas de trabajo de integración de datos de streaming se incluyen en estas dos categorías. Si tu flujo de procesamiento sigue un patrón similar, puedes usar estas métricas para evaluar tu trabajo de Dataflow en comparación con una configuración de referencia con buen rendimiento.

Metodología de prueba

Las comparativas se han realizado con los siguientes recursos:

  • Un tema de Pub/Sub aprovisionado previamente con una carga de entrada constante. Los mensajes se han generado mediante la plantilla Generador de datos de streaming.

    • Velocidad de mensajes: aproximadamente 1.000.000 de mensajes por segundo
    • Carga de entrada: 1 GiB/s
    • Formato del mensaje: texto JSON generado aleatoriamente con un esquema fijo
    • Tamaño del mensaje: aproximadamente 1 KiB por mensaje
  • Una tabla de BigQuery estándar.

  • Flujos de procesamiento en streaming de Dataflow basados en la plantilla de Pub/Sub a BigQuery. Estas canalizaciones realizan el análisis y la asignación de esquemas mínimos necesarios. No se ha usado ninguna función definida por el usuario (UDF) personalizada.

Una vez que se estabilizó el escalado horizontal y la canalización alcanzó un estado estable, se permitió que las canalizaciones se ejecutaran durante aproximadamente un día, tras lo cual se recogieron y analizaron los resultados.

Pipelines de Dataflow

Se probaron dos variantes de la canalización:

Pipeline solo de mapa. Esta canalización realiza una asignación y conversión sencillas de mensajes JSON. En esta prueba, se ha usado la plantilla de Pub/Sub a BigQuery sin modificarla.

Pipeline de agregación por ventanas. Este flujo de procesamiento agrupa los mensajes por una clave específica en ventanas de tamaño fijo y escribe los registros agregados en BigQuery. Para esta prueba, se ha usado un flujo de procesamiento de Apache Beam personalizado basado en la plantilla de Pub/Sub a BigQuery.

  • Lógica de agregación: en cada ventana de 1 minuto fija y sin solapamiento, se recogieron los mensajes con la misma clave y se escribieron como un único registro agregado en BigQuery. Este tipo de agregación se usa habitualmente en el procesamiento de registros para combinar eventos relacionados, como la actividad de un usuario, en un solo registro para el análisis posterior.

  • Paralelismo de claves: la prueba de rendimiento usó 1.000.000 de claves distribuidas de forma uniforme.

  • Semántica: la canalización se ha probado con el modo de ejecución exactamente una vez. Las agregaciones requieren una semántica de exactamente una vez para asegurar la corrección y evitar que se cuenten dos veces los elementos de un grupo y una ventana.

Configuración de tarea

En la siguiente tabla se muestra cómo se configuraron los trabajos de Dataflow.

Ajuste Solo mapa, exactamente una vez Asignación solo una vez como mínimo Agregación por ventanas, exactamente una vez
Tipo de máquina de trabajador n1-standard-2 n1-standard-2 n1-standard-2
vCPUs de la máquina de trabajador 2 2 2
RAM de la máquina de trabajo 7,5 GiB 7,5 GiB 7,5 GiB
Persistent Disk de la máquina de trabajo Disco persistente estándar (HDD) de 30 GB Disco persistente estándar (HDD) de 30 GB Disco persistente estándar (HDD) de 30 GB
Trabajadores iniciales 70 30 180
Número máximo de trabajadores 100 100 250
Streaming Engine
Autoescalado horizontal
Modelo de facturación Facturación basada en recursos Facturación basada en recursos Facturación basada en recursos
¿Está habilitada la API Storage Write?
Streams de la API Storage Write 200 No aplicable 500
Frecuencia de activación de la API Storage Write 5 segundos No aplicable 5 segundos

La API Storage Write de BigQuery es la opción recomendada para las canalizaciones de streaming. Cuando usas el modo de entrega única con la API Storage Write, puedes ajustar los siguientes ajustes:

  • Número de flujos de escritura. Para asegurar un paralelismo de claves suficiente en la fase de escritura, asigna a la API Storage Write un número de flujos superior al número de CPUs de los trabajadores, manteniendo un nivel razonable de rendimiento de flujo de escritura de BigQuery.

  • Frecuencia de activación. Un valor de segundos de un solo dígito es adecuado para pipelines de alto rendimiento.

Para obtener más información, consulta el artículo sobre cómo escribir datos de Dataflow en BigQuery.

Resultados de las comparativas

En esta sección se describen los resultados de las pruebas de rendimiento.

Rendimiento y uso de recursos

En la siguiente tabla se muestran los resultados de las pruebas de rendimiento de la canalización y uso de recursos.

Resultado Solo mapa, exactamente una vez Asignación solo una vez como mínimo Agregación por ventanas, exactamente una vez
Rendimiento de entrada por trabajador Media: 17 MB/s, n=3 Media: 21 MB/s, n=3 Media: 6 MB/s, n=3
Uso medio de CPU en todos los trabajadores Media: 65%, n=3 Media: 69%, n=3 Media: 80%, n=3
Número de nodos de trabajo Media: 57, n=3 Media: 48, n=3 Media: 169, n=3
Unidades de computación de Streaming Engine por hora Media: 125, n=3 Media: 46, n=3 Media: 354, n=3

El algoritmo de autoescalado puede afectar al nivel de uso de CPU objetivo. Para conseguir un uso de CPU objetivo más alto o más bajo, puedes definir el intervalo de autoescalado o la sugerencia de uso de los trabajadores. Los objetivos de utilización más altos pueden reducir los costes, pero también empeorar la latencia de cola, sobre todo en cargas variables.

En una canalización de agregación de ventanas, el tipo de agregación, el tamaño de la ventana y el paralelismo de claves pueden influir considerablemente en el uso de recursos.

Latencia

En la siguiente tabla se muestran los resultados de la prueba de latencia de la canalización.

Latencia total de extremo a extremo de la fase Solo mapa, exactamente una vez Asignación solo una vez como mínimo Agregación por ventanas, exactamente una vez
P50 Media: 800 ms, n=3 Media: 160 ms, n=3 Media: 3400 ms, n=3
P95 Media: 2000 ms, n=3 Media: 250 ms, n=3 Media: 13.000 ms, n=3
P99 Media: 2800 ms, n=3 Media: 410 ms, n=3 Media: 25.000 ms, n=3

Las pruebas midieron la latencia completa por fase (la métrica job/streaming_engine/stage_end_to_end_latencies) en tres ejecuciones de pruebas de larga duración. Esta métrica mide el tiempo que el Streaming Engine dedica a cada fase de la canalización. Abarca todos los pasos internos de la canalización, como los siguientes:

  • Barajar y poner en cola mensajes para procesarlos
  • El tiempo de procesamiento real; por ejemplo, convertir mensajes en objetos de fila
  • Escribir el estado persistente, así como el tiempo empleado en la cola para escribir el estado persistente

Otra métrica de latencia es la actualización de los datos. Sin embargo, la actualización de los datos se ve afectada por factores como las ventanas definidas por el usuario y los retrasos en la fuente. La latencia del sistema proporciona una base más objetiva para la eficiencia del procesamiento interno y el estado de una canalización bajo carga.

Los datos se midieron durante aproximadamente un día por ejecución. Los periodos de inicio iniciales se descartaron para reflejar un rendimiento estable. Los resultados muestran dos factores que introducen latencia adicional:

  • Modo de procesamiento exacto. Para conseguir una semántica de exactamente una vez, es necesario que se produzca una aleatorización determinista y que se realicen búsquedas de estado persistente para eliminar duplicados. El modo "Al menos una vez" es mucho más rápido porque omite estos pasos.

  • Agregación por ventanas. Los mensajes deben barajarse, almacenarse en búfer y escribirse por completo en el estado persistente antes de cerrar la ventana, lo que aumenta la latencia de extremo a extremo.

Los valores de referencia que se muestran aquí representan una base. La latencia es muy sensible a la complejidad de la canalización. Las funciones definidas por el usuario personalizadas, las transformaciones adicionales y la lógica de ventanas compleja pueden aumentar la latencia. Las agregaciones sencillas que reducen mucho los datos, como las funciones sum y count, suelen dar lugar a una latencia menor que las operaciones que requieren mucho estado, como la recopilación de elementos en una lista.

Calcular los costes

Puedes estimar el coste base de tu propia canalización comparable con la facturación basada en recursos mediante la calculadora de precios de Google Cloud Platform, de la siguiente manera:

  1. Abre la calculadora de precios.
  2. Haz clic en Añadir a la estimación.
  3. Selecciona Dataflow.
  4. En Tipo de servicio, selecciona "Dataflow Classic".
  5. Selecciona Configuración avanzada para ver todas las opciones.
  6. Elige la ubicación en la que se ejecutará el trabajo.
  7. En Tipo de trabajo, selecciona "Streaming".
  8. Selecciona Habilitar Streaming Engine.
  9. Introduce información sobre las horas de ejecución de la tarea, los nodos de trabajador, las máquinas de trabajador y el almacenamiento de disco persistente.
  10. Introduce el número estimado de unidades de computación de Streaming Engine.

El uso de recursos y el coste se escalan de forma aproximadamente lineal con el rendimiento de entrada, aunque, en el caso de las tareas pequeñas con solo unos pocos trabajadores, el coste total está dominado por los costes fijos. Como punto de partida, puedes extrapolar el número de nodos de trabajo y el consumo de recursos a partir de los resultados de la prueba comparativa.

Por ejemplo, supongamos que ejecutas una canalización solo de asignación en el modo de ejecución exactamente una vez, con una velocidad de datos de entrada de 100 MiB/s. Según los resultados de la prueba de rendimiento de una canalización de 1 GiB/s, puede estimar los requisitos de recursos de la siguiente manera:

  • Factor de escala: (100 MiB/s) / (1 GiB/s) = 0,1
  • Nodos de trabajador proyectados: 57 trabajadores × 0,1 = 5,7 trabajadores
  • Número previsto de unidades de computación de Streaming Engine por hora: 125 × 0,1 = 12,5 unidades por hora

Este valor solo debe usarse como estimación inicial. El rendimiento y el coste reales pueden variar significativamente en función de factores como el tipo de máquina, la distribución del tamaño de los mensajes, el código de usuario, el tipo de agregación, el paralelismo de las claves y el tamaño de la ventana. Para obtener más información, consulta las prácticas recomendadas para optimizar los costes de Dataflow.

Ejecutar una canalización de prueba

En esta sección se muestran los comandos de gcloud dataflow flex-template run que se han usado para ejecutar la canalización solo de mapa.

Modo de procesamiento exacto

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5

Modo "Al menos una vez"

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 30 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
  --additional-experiments streaming_mode_at_least_once

Haz los cambios siguientes:

  • JOB_ID: el ID de la tarea de Dataflow
  • PROJECT_ID: el ID del proyecto
  • SUBSCRIPTION_NAME: el nombre de la suscripción de Pub/Sub
  • DATASET: el nombre del conjunto de datos de BigQuery
  • TABLE_NAME: el nombre de la tabla de BigQuery

Generar datos de prueba

Para generar datos de prueba, usa el siguiente comando para ejecutar la plantilla Generador de datos de streaming:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION

Haz los cambios siguientes:

  • JOB_ID: el ID de la tarea de Dataflow
  • PROJECT_ID: el ID del proyecto
  • TOPIC_NAME: el nombre del tema de Pub/Sub
  • SCHEMA_LOCATION: la ruta a un archivo de esquema en Cloud Storage.

La plantilla Generador de datos de streaming usa un archivo JSON Data Generator para definir el esquema de mensajes. En las pruebas de rendimiento se usó un esquema de mensajes similar al siguiente:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

Pasos siguientes