En esta página se describen las características de rendimiento de las tareas de streaming de Dataflow que leen datos de Pub/Sub y los escriben en BigQuery. Proporciona resultados de pruebas de rendimiento de dos tipos de canalizaciones de streaming:
Solo de asignación (transformación por mensaje): son las canalizaciones que realizan transformaciones por mensaje sin hacer un seguimiento del estado ni agrupar elementos en el flujo. Por ejemplo, ETL, validación de campos y asignación de esquemas.
Agregación con ventana (
GroupByKey): canalizaciones que realizan operaciones con estado y agrupan datos en función de una clave y una ventana temporal. Por ejemplo, contar eventos, calcular sumas y recoger registros de una sesión de usuario.
La mayoría de las cargas de trabajo de integración de datos de streaming se incluyen en estas dos categorías. Si tu flujo de procesamiento sigue un patrón similar, puedes usar estas métricas para evaluar tu trabajo de Dataflow en comparación con una configuración de referencia con buen rendimiento.
Metodología de prueba
Las comparativas se han realizado con los siguientes recursos:
Un tema de Pub/Sub aprovisionado previamente con una carga de entrada constante. Los mensajes se han generado mediante la plantilla Generador de datos de streaming.
- Velocidad de mensajes: aproximadamente 1.000.000 de mensajes por segundo
- Carga de entrada: 1 GiB/s
- Formato del mensaje: texto JSON generado aleatoriamente con un esquema fijo
- Tamaño del mensaje: aproximadamente 1 KiB por mensaje
Una tabla de BigQuery estándar.
Flujos de procesamiento en streaming de Dataflow basados en la plantilla de Pub/Sub a BigQuery. Estas canalizaciones realizan el análisis y la asignación de esquemas mínimos necesarios. No se ha usado ninguna función definida por el usuario (UDF) personalizada.
Una vez que se estabilizó el escalado horizontal y la canalización alcanzó un estado estable, se permitió que las canalizaciones se ejecutaran durante aproximadamente un día, tras lo cual se recogieron y analizaron los resultados.
Pipelines de Dataflow
Se probaron dos variantes de la canalización:
Pipeline solo de mapa. Esta canalización realiza una asignación y conversión sencillas de mensajes JSON. En esta prueba, se ha usado la plantilla de Pub/Sub a BigQuery sin modificarla.
- Semántica: la canalización se ha probado con el modo de ejecución exactamente una vez y el modo de ejecución al menos una vez. El procesamiento al menos una vez ofrece un mejor rendimiento. Sin embargo, solo se debe usar cuando se acepten registros duplicados o cuando el receptor de nivel inferior gestione la deduplicación.
Pipeline de agregación por ventanas. Este flujo de procesamiento agrupa los mensajes por una clave específica en ventanas de tamaño fijo y escribe los registros agregados en BigQuery. Para esta prueba, se ha usado un flujo de procesamiento de Apache Beam personalizado basado en la plantilla de Pub/Sub a BigQuery.
Lógica de agregación: en cada ventana de 1 minuto fija y sin solapamiento, se recogieron los mensajes con la misma clave y se escribieron como un único registro agregado en BigQuery. Este tipo de agregación se usa habitualmente en el procesamiento de registros para combinar eventos relacionados, como la actividad de un usuario, en un solo registro para el análisis posterior.
Paralelismo de claves: la prueba de rendimiento usó 1.000.000 de claves distribuidas de forma uniforme.
Semántica: la canalización se ha probado con el modo de ejecución exactamente una vez. Las agregaciones requieren una semántica de exactamente una vez para asegurar la corrección y evitar que se cuenten dos veces los elementos de un grupo y una ventana.
Configuración de tarea
En la siguiente tabla se muestra cómo se configuraron los trabajos de Dataflow.
| Ajuste | Solo mapa, exactamente una vez | Asignación solo una vez como mínimo | Agregación por ventanas, exactamente una vez |
|---|---|---|---|
| Tipo de máquina de trabajador | n1-standard-2 |
n1-standard-2 |
n1-standard-2 |
| vCPUs de la máquina de trabajador | 2 | 2 | 2 |
| RAM de la máquina de trabajo | 7,5 GiB | 7,5 GiB | 7,5 GiB |
| Persistent Disk de la máquina de trabajo | Disco persistente estándar (HDD) de 30 GB | Disco persistente estándar (HDD) de 30 GB | Disco persistente estándar (HDD) de 30 GB |
| Trabajadores iniciales | 70 | 30 | 180 |
| Número máximo de trabajadores | 100 | 100 | 250 |
| Streaming Engine | Sí | Sí | Sí |
| Autoescalado horizontal | Sí | Sí | Sí |
| Modelo de facturación | Facturación basada en recursos | Facturación basada en recursos | Facturación basada en recursos |
| ¿Está habilitada la API Storage Write? | Sí | Sí | Sí |
| Streams de la API Storage Write | 200 | No aplicable | 500 |
| Frecuencia de activación de la API Storage Write | 5 segundos | No aplicable | 5 segundos |
La API Storage Write de BigQuery es la opción recomendada para las canalizaciones de streaming. Cuando usas el modo de entrega única con la API Storage Write, puedes ajustar los siguientes ajustes:
Número de flujos de escritura. Para asegurar un paralelismo de claves suficiente en la fase de escritura, asigna a la API Storage Write un número de flujos superior al número de CPUs de los trabajadores, manteniendo un nivel razonable de rendimiento de flujo de escritura de BigQuery.
Frecuencia de activación. Un valor de segundos de un solo dígito es adecuado para pipelines de alto rendimiento.
Para obtener más información, consulta el artículo sobre cómo escribir datos de Dataflow en BigQuery.
Resultados de las comparativas
En esta sección se describen los resultados de las pruebas de rendimiento.
Rendimiento y uso de recursos
En la siguiente tabla se muestran los resultados de las pruebas de rendimiento de la canalización y uso de recursos.
| Resultado | Solo mapa, exactamente una vez | Asignación solo una vez como mínimo | Agregación por ventanas, exactamente una vez |
|---|---|---|---|
| Rendimiento de entrada por trabajador | Media: 17 MB/s, n=3 | Media: 21 MB/s, n=3 | Media: 6 MB/s, n=3 |
| Uso medio de CPU en todos los trabajadores | Media: 65%, n=3 | Media: 69%, n=3 | Media: 80%, n=3 |
| Número de nodos de trabajo | Media: 57, n=3 | Media: 48, n=3 | Media: 169, n=3 |
| Unidades de computación de Streaming Engine por hora | Media: 125, n=3 | Media: 46, n=3 | Media: 354, n=3 |
El algoritmo de autoescalado puede afectar al nivel de uso de CPU objetivo. Para conseguir un uso de CPU objetivo más alto o más bajo, puedes definir el intervalo de autoescalado o la sugerencia de uso de los trabajadores. Los objetivos de utilización más altos pueden reducir los costes, pero también empeorar la latencia de cola, sobre todo en cargas variables.
En una canalización de agregación de ventanas, el tipo de agregación, el tamaño de la ventana y el paralelismo de claves pueden influir considerablemente en el uso de recursos.
Latencia
En la siguiente tabla se muestran los resultados de la prueba de latencia de la canalización.
| Latencia total de extremo a extremo de la fase | Solo mapa, exactamente una vez | Asignación solo una vez como mínimo | Agregación por ventanas, exactamente una vez |
|---|---|---|---|
| P50 | Media: 800 ms, n=3 | Media: 160 ms, n=3 | Media: 3400 ms, n=3 |
| P95 | Media: 2000 ms, n=3 | Media: 250 ms, n=3 | Media: 13.000 ms, n=3 |
| P99 | Media: 2800 ms, n=3 | Media: 410 ms, n=3 | Media: 25.000 ms, n=3 |
Las pruebas midieron la latencia completa por fase (la métrica job/streaming_engine/stage_end_to_end_latencies) en tres ejecuciones de pruebas de larga duración. Esta métrica mide el tiempo que el Streaming Engine dedica a cada fase de la canalización. Abarca todos los pasos internos de la canalización, como los siguientes:
- Barajar y poner en cola mensajes para procesarlos
- El tiempo de procesamiento real; por ejemplo, convertir mensajes en objetos de fila
- Escribir el estado persistente, así como el tiempo empleado en la cola para escribir el estado persistente
Otra métrica de latencia es la actualización de los datos. Sin embargo, la actualización de los datos se ve afectada por factores como las ventanas definidas por el usuario y los retrasos en la fuente. La latencia del sistema proporciona una base más objetiva para la eficiencia del procesamiento interno y el estado de una canalización bajo carga.
Los datos se midieron durante aproximadamente un día por ejecución. Los periodos de inicio iniciales se descartaron para reflejar un rendimiento estable. Los resultados muestran dos factores que introducen latencia adicional:
Modo de procesamiento exacto. Para conseguir una semántica de exactamente una vez, es necesario que se produzca una aleatorización determinista y que se realicen búsquedas de estado persistente para eliminar duplicados. El modo "Al menos una vez" es mucho más rápido porque omite estos pasos.
Agregación por ventanas. Los mensajes deben barajarse, almacenarse en búfer y escribirse por completo en el estado persistente antes de cerrar la ventana, lo que aumenta la latencia de extremo a extremo.
Los valores de referencia que se muestran aquí representan una base. La latencia es muy sensible a la complejidad de la canalización. Las funciones definidas por el usuario personalizadas, las transformaciones adicionales y la lógica de ventanas compleja pueden aumentar la latencia. Las agregaciones sencillas que reducen mucho los datos, como las funciones sum y count, suelen dar lugar a una latencia menor que las operaciones que requieren mucho estado, como la recopilación de elementos en una lista.
Calcular los costes
Puedes estimar el coste base de tu propia canalización comparable con la facturación basada en recursos mediante la calculadora de precios de Google Cloud Platform, de la siguiente manera:
- Abre la calculadora de precios.
- Haz clic en Añadir a la estimación.
- Selecciona Dataflow.
- En Tipo de servicio, selecciona "Dataflow Classic".
- Selecciona Configuración avanzada para ver todas las opciones.
- Elige la ubicación en la que se ejecutará el trabajo.
- En Tipo de trabajo, selecciona "Streaming".
- Selecciona Habilitar Streaming Engine.
- Introduce información sobre las horas de ejecución de la tarea, los nodos de trabajador, las máquinas de trabajador y el almacenamiento de disco persistente.
- Introduce el número estimado de unidades de computación de Streaming Engine.
El uso de recursos y el coste se escalan de forma aproximadamente lineal con el rendimiento de entrada, aunque, en el caso de las tareas pequeñas con solo unos pocos trabajadores, el coste total está dominado por los costes fijos. Como punto de partida, puedes extrapolar el número de nodos de trabajo y el consumo de recursos a partir de los resultados de la prueba comparativa.
Por ejemplo, supongamos que ejecutas una canalización solo de asignación en el modo de ejecución exactamente una vez, con una velocidad de datos de entrada de 100 MiB/s. Según los resultados de la prueba de rendimiento de una canalización de 1 GiB/s, puede estimar los requisitos de recursos de la siguiente manera:
- Factor de escala: (100 MiB/s) / (1 GiB/s) = 0,1
- Nodos de trabajador proyectados: 57 trabajadores × 0,1 = 5,7 trabajadores
- Número previsto de unidades de computación de Streaming Engine por hora: 125 × 0,1 = 12,5 unidades por hora
Este valor solo debe usarse como estimación inicial. El rendimiento y el coste reales pueden variar significativamente en función de factores como el tipo de máquina, la distribución del tamaño de los mensajes, el código de usuario, el tipo de agregación, el paralelismo de las claves y el tamaño de la ventana. Para obtener más información, consulta las prácticas recomendadas para optimizar los costes de Dataflow.
Ejecutar una canalización de prueba
En esta sección se muestran los comandos de gcloud dataflow flex-template run
que se han usado para ejecutar la canalización solo de mapa.
Modo de procesamiento exacto
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 70 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5
Modo "Al menos una vez"
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 30 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
--additional-experiments streaming_mode_at_least_once
Haz los cambios siguientes:
JOB_ID: el ID de la tarea de DataflowPROJECT_ID: el ID del proyectoSUBSCRIPTION_NAME: el nombre de la suscripción de Pub/SubDATASET: el nombre del conjunto de datos de BigQueryTABLE_NAME: el nombre de la tabla de BigQuery
Generar datos de prueba
Para generar datos de prueba, usa el siguiente comando para ejecutar la plantilla Generador de datos de streaming:
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--num-workers 70 \
--max-workers 100 \
--parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION
Haz los cambios siguientes:
JOB_ID: el ID de la tarea de DataflowPROJECT_ID: el ID del proyectoTOPIC_NAME: el nombre del tema de Pub/SubSCHEMA_LOCATION: la ruta a un archivo de esquema en Cloud Storage.
La plantilla Generador de datos de streaming usa un archivo JSON Data Generator para definir el esquema de mensajes. En las pruebas de rendimiento se usó un esquema de mensajes similar al siguiente:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
Pasos siguientes
- Usar la interfaz de monitorización de trabajos de Dataflow
- Prácticas recomendadas para optimizar los costes de Dataflow
- Solucionar problemas de trabajos de streaming lentos o bloqueados
- Leer de Pub/Sub a Dataflow
- Escribir desde Dataflow en BigQuery