Características de rendimiento de las canalizaciones de Pub/Sub a BigQuery

En esta página, se describen las características de rendimiento de los trabajos de transmisión de Dataflow que leen datos de Pub/Sub y los escriben en BigQuery. Proporciona resultados de pruebas comparativas para dos tipos de canalizaciones de transmisión:

  • Solo mapa (transformación por mensaje): Son canalizaciones que realizan transformaciones por mensaje, sin hacer un seguimiento del estado ni agrupar elementos en el flujo. Algunos ejemplos son la ETL, la validación de campos y la asignación de esquemas.

  • Agregación con ventanas (GroupByKey): Son canalizaciones que realizan operaciones con estado y agrupan los datos según una clave y una ventana de tiempo. Entre los ejemplos, se incluyen el recuento de eventos, el cálculo de sumas y la recopilación de registros para una sesión del usuario.

La mayoría de las cargas de trabajo para la integración de datos de transmisión se dividen en estas dos categorías. Si tu canalización sigue un patrón similar, puedes usar estas comparativas para evaluar tu trabajo de Dataflow en comparación con una configuración de referencia con buen rendimiento.

Metodología de prueba

Las comparativas se realizaron con los siguientes recursos:

  • Un tema de Pub/Sub aprovisionado previamente con una carga de entrada constante. Los mensajes se generaron con la plantilla Streaming Data Generator.

    • Tasa de mensajes: Aproximadamente 1,000,000 de mensajes por segundo
    • Carga de entrada: 1 GiB/s
    • Formato del mensaje: Texto JSON generado de forma aleatoria con un esquema fijo
    • Tamaño del mensaje: Aproximadamente 1 KiB por mensaje
  • Una tabla de BigQuery estándar.

  • Canalizaciones de transmisión de Dataflow basadas en la plantilla de Pub/Sub a BigQuery. Estas canalizaciones realizan el análisis y la asignación de esquemas mínimos requeridos. No se usó ninguna función definida por el usuario (UDF) personalizada.

Después de que se estabilizó el ajuste de escala horizontal y la canalización alcanzó un estado estable, se permitió que las canalizaciones se ejecutaran durante aproximadamente un día, tras lo cual se recopilaron y analizaron los resultados.

Canalizaciones de Dataflow

Se probaron dos variantes de la canalización:

Canalización solo de mapa. Esta canalización realiza una asignación y conversión simples de mensajes JSON. Para esta prueba, se usó la plantilla de Pub/Sub a BigQuery sin modificaciones.

  • Semántica: La canalización se probó con el modo de exactamente una vez y el modo de al menos una vez. El procesamiento al menos una vez proporciona una mejor capacidad de procesamiento. Sin embargo, solo se debe usar cuando se aceptan registros duplicados o el receptor descendente controla la anulación de duplicación.

Canalización de agregación con ventanas. Esta canalización agrupa los mensajes por una clave específica en ventanas de tamaño fijo y escribe los registros agregados en BigQuery. Para esta prueba, se usó una canalización personalizada de Apache Beam basada en la plantilla de Pub/Sub a BigQuery.

  • Lógica de agregación: Para cada período fijo de 1 minuto sin superposiciones, se recopilaron los mensajes con la misma clave y se escribieron como un solo registro agregado en BigQuery. Este tipo de agregación se usa comúnmente en el procesamiento de registros para combinar eventos relacionados, como la actividad de un usuario, en un solo registro para el análisis posterior.

  • Paralelismo de claves: La comparativa usó 1,000,000 de claves distribuidas de manera uniforme.

  • Semántica: La canalización se probó con el modo de exactamente una vez. Las agregaciones requieren una semántica de exactamente una vez para garantizar la corrección y evitar el doble conteo dentro de un grupo y una ventana.

Configuración del trabajo

En la siguiente tabla, se muestra cómo se configuraron los trabajos de Dataflow.

Configuración Solo mapa, exactamente una vez Solo mapa, al menos una vez Agregación en ventanas, exactamente una vez
Tipo de máquina del trabajador n1-standard-2 n1-standard-2 n1-standard-2
CPU virtuales de la máquina de trabajo 2 2 2
RAM de la máquina del trabajador 7.5 GiB 7.5 GiB 7.5 GiB
Persistent Disk de la máquina de trabajo Disco persistente estándar (HDD), 30 GB Disco persistente estándar (HDD), 30 GB Disco persistente estándar (HDD), 30 GB
Trabajadores iniciales 70 30 180
Cantidad máxima de trabajadores 100 100 250
Streaming Engine
Ajuste de escala automático horizontal
Modelo de facturación Facturación basada en recursos Facturación basada en recursos Facturación basada en recursos
¿Está habilitada la API de Storage Write?
Transmisiones de la API de Storage Write 200 No aplicable 500
Frecuencia de activación de la API de Storage Write 5 segundos No aplicable 5 segundos

Se recomienda la API de BigQuery Storage Write para las canalizaciones de transmisión. Cuando usas el modo “exactamente una vez” con la API de Storage Write, puedes ajustar los siguientes parámetros de configuración:

  • Cantidad de transmisiones de escritura. Para garantizar un paralelismo de claves suficiente en la etapa de escritura, establece la cantidad de transmisiones de la API de Storage Write en un valor mayor que la cantidad de CPU de trabajadores, y mantén un nivel razonable de capacidad de procesamiento de transmisiones de escritura de BigQuery.

  • Frecuencia de activación. Un valor de segundo de un solo dígito es adecuado para las canalizaciones de alto rendimiento.

Para obtener más información, consulta Escribe desde Dataflow a BigQuery.

Resultados de comparativas

En esta sección, se describen los resultados de las pruebas de comparativas.

Capacidad de procesamiento y uso de recursos

En la siguiente tabla, se muestran los resultados de las pruebas de capacidad de procesamiento de la canalización y uso de recursos.

Resultado Solo mapa, exactamente una vez Solo mapa, al menos una vez Agregación en ventanas, exactamente una vez
Capacidad de procesamiento de entrada por trabajador Media: 17 MB/s, n=3 Media: 21 MB/s, n=3 Media: 6 MB/s, n=3
Uso promedio de CPU en todos los trabajadores Media: 65%, n=3 Media: 69%, n=3 Media: 80%, n=3
Cantidad de nodos trabajadores Media: 57, n=3 Media: 48, n=3 Media: 169, n=3
Unidades de procesamiento de Streaming Engine por hora Media: 125, n=3 Media: 46, n=3 Media: 354, n=3

El algoritmo de ajuste de escala automático puede afectar el nivel de uso objetivo de la CPU. Para lograr un uso de CPU objetivo más alto o más bajo, puedes establecer el rango de ajuste de escala automático o la sugerencia de uso del trabajador. Los objetivos de utilización más altos pueden generar costos más bajos, pero también una peor latencia final, en especial para cargas variables.

En el caso de un canal de agregación de ventanas, el tipo de agregación, el tamaño de la ventana y el paralelismo de claves pueden tener un gran impacto en el uso de recursos.

Latencia

En la siguiente tabla, se muestran los resultados de la comparativa de la latencia de la canalización.

Latencia total de extremo a extremo de la etapa Solo mapa, exactamente una vez Solo mapa, al menos una vez Agregación en ventanas, exactamente una vez
P50 Media: 800 ms, n=3 Media: 160 ms, n=3 Media: 3,400 ms, n=3
P95 Media: 2,000 ms, n=3 Media: 250 ms, n=3 Media: 13,000 ms, n=3
P99 Media: 2,800 ms, n=3 Media: 410 ms, n=3 Media: 25,000 ms, n=3

Las pruebas midieron la latencia de extremo a extremo por etapa (la métrica job/streaming_engine/stage_end_to_end_latencies) en tres ejecuciones de prueba de larga duración. Esta métrica mide cuánto tiempo dedica el motor de transmisión a cada etapa de la canalización. Abarca todos los pasos internos de la canalización, como los siguientes:

  • Mezcla y pone en cola los mensajes para su procesamiento
  • El tiempo de procesamiento real; por ejemplo, la conversión de mensajes en objetos de fila
  • Escritura de estado persistente, así como el tiempo dedicado a poner en cola la escritura de estado persistente

Otra métrica de latencia es la actualidad de los datos. Sin embargo, la actualización de los datos se ve afectada por factores como la ventana definida por el usuario y las demoras ascendentes en la fuente. La latencia del sistema proporciona un valor de referencia más objetivo para la eficiencia y el estado del procesamiento interno de una canalización bajo carga.

Los datos se midieron durante aproximadamente un día por ejecución, y se descartaron los períodos iniciales de inicio para reflejar un rendimiento estable en estado estacionario. Los resultados muestran dos factores que introducen latencia adicional:

  • Modo exactamente una vez. Para lograr una semántica de una y solo una vez, se requieren búsquedas de estado persistente y aleatorización determinística para la anulación de duplicación. El modo de entrega al menos una vez es mucho más rápido, ya que omite estos pasos.

  • Es la agregación en ventanas. Los mensajes deben mezclarse, almacenarse en búfer y escribirse por completo en el estado persistente antes de que se cierre la ventana, lo que aumenta la latencia de extremo a extremo.

Los comparativos que se muestran aquí representan un valor de referencia. La latencia es muy sensible a la complejidad de la canalización. Las UDF personalizadas, las transformaciones adicionales y la lógica de ventanas compleja pueden aumentar la latencia. Las agregaciones simples y altamente reductoras, como la suma y el recuento, tienden a generar una latencia más baja que las operaciones con mucha carga de estado, como la recopilación de elementos en una lista.

Estimación de costos

Puedes estimar el costo de referencia de tu propia canalización comparable con la facturación basada en recursos usando la calculadora de precios de Google Cloud Platform de la siguiente manera:

  1. Abre la calculadora de precios.
  2. Haz clic en Agregar a la estimación.
  3. Selecciona Dataflow.
  4. En Tipo de servicio, selecciona "Dataflow Classic".
  5. Selecciona Configuración avanzada para mostrar el conjunto completo de opciones.
  6. Elige la ubicación en la que se ejecutará el trabajo.
  7. En Tipo de trabajo, selecciona "Transmisión".
  8. Selecciona Habilitar Streaming Engine.
  9. Ingresa información sobre las horas de ejecución del trabajo, los nodos de trabajador, las máquinas de trabajador y el almacenamiento en Persistent Disk.
  10. Ingresa la cantidad estimada de unidades de procesamiento de Streaming Engine.

El uso de recursos y el costo se ajustan de forma casi lineal con el rendimiento de entrada, aunque, para los trabajos pequeños con solo unos pocos trabajadores, los costos fijos dominan el costo total. Como punto de partida, puedes extrapolar la cantidad de nodos trabajadores y el consumo de recursos a partir de los resultados de la comparativa.

Por ejemplo, supongamos que ejecutas una canalización solo de mapa en modo exactamente una vez, con una tasa de datos de entrada de 100 MiB/s. Según los resultados de la comparativa para una canalización de 1 GiB/s, puedes estimar los requisitos de recursos de la siguiente manera:

  • Factor de ajuste: (100 MiB/s) / (1 GiB/s) = 0.1
  • Nodos trabajadores proyectados: 57 trabajadores × 0.1 = 5.7 trabajadores
  • Cantidad proyectada de unidades de procesamiento de Streaming Engine por hora: 125 × 0.1 = 12.5 unidades por hora

Este valor solo debe usarse como una estimación inicial. El rendimiento y el costo reales pueden variar significativamente según factores como el tipo de máquina, la distribución del tamaño de los mensajes, el código del usuario, el tipo de agregación, el paralelismo de claves y el tamaño de la ventana. Para obtener más información, consulta Prácticas recomendadas para la optimización de costos de Dataflow.

Ejecuta una canalización de prueba

En esta sección, se muestran los comandos de gcloud dataflow flex-template run que se usaron para ejecutar la canalización solo de mapa.

Modo “exactamente una vez”

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5

Modo de entrega al menos una vez

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 30 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
  --additional-experiments streaming_mode_at_least_once

Reemplaza lo siguiente:

  • JOB_ID: Es el ID del trabajo de Dataflow.
  • PROJECT_ID: Es el ID del proyecto.
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • DATASET: El nombre del conjunto de datos de BigQuery
  • TABLE_NAME: El nombre de la tabla de BigQuery

Genera datos de prueba

Para generar datos de prueba, usa el siguiente comando para ejecutar la plantilla Streaming Data Generator:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION

Reemplaza lo siguiente:

  • JOB_ID: Es el ID del trabajo de Dataflow.
  • PROJECT_ID: Es el ID del proyecto.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • SCHEMA_LOCATION: Es la ruta de acceso a un archivo de esquema en Cloud Storage.

La plantilla Streaming Data Generator usa un archivo JSON Data Generator para definir el esquema del mensaje. Las pruebas comparativas usaron un esquema de mensajes similar al siguiente:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

Próximos pasos