La E/S administrada permite que Dataflow administre conectores de E/S específicos que se usan en canalizaciones de Apache Beam. La E/S administrada simplifica la administración de las canalizaciones que se integran con las fuentes y los receptores admitidos.
La E/S administrada consta de dos componentes que funcionan en conjunto:
Es una transformación de Apache Beam que proporciona una API común para crear conectores de E/S (fuentes y receptores).
Un servicio de Dataflow que administra estos conectores de E/S en tu nombre, incluida la capacidad de actualizarlos de forma independiente de la versión de Apache Beam
Entre las ventajas de la E/S administrada, se incluyen las siguientes:
Actualizaciones automáticas. Dataflow actualiza automáticamente los conectores de E/S administrados en tu canalización. Esto significa que tu canalización recibe correcciones de seguridad, mejoras de rendimiento y correcciones de errores para estos conectores, sin necesidad de realizar cambios en el código. Para obtener más información, consulta Actualizaciones automáticas.
API coherente Tradicionalmente, los conectores de E/S en Apache Beam tienen APIs distintas, y cada conector se configura de una manera diferente. La E/S administrada proporciona una sola API de configuración que usa propiedades de clave-valor, lo que genera un código de canalización más simple y coherente. Para obtener más información, consulta la API de Configuration.
Requisitos
Los siguientes SDKs admiten E/S administrada:
- SDK de Apache Beam para Java, versión 2.58.0 o posterior
- SDK de Apache Beam para Python, versión 2.61.0 o posterior
El servicio de backend requiere Dataflow Runner v2. Si Runner v2 no está habilitado, tu canalización se seguirá ejecutando, pero no obtendrá los beneficios del servicio de E/S administrado.
Actualizaciones automáticas
Las canalizaciones de Dataflow con conectores de E/S administrados usan automáticamente la versión confiable más reciente del conector. Las actualizaciones automáticas se producen en los siguientes puntos del ciclo de vida del trabajo:
Envío de trabajo Cuando envías un trabajo por lotes o de transmisión, Dataflow usa la versión más reciente del conector de E/S administrado que se probó y funciona bien.
Actualizaciones progresivas. En el caso de los trabajos de transmisión, Dataflow actualiza tus conectores de E/S administrada en las canalizaciones en ejecución a medida que hay nuevas versiones disponibles. No tienes que preocuparte por actualizar manualmente el conector ni la versión de Apache Beam de tu canalización.
De forma predeterminada, las actualizaciones progresivas se realizan en un período de 30 días, es decir, se realizan actualizaciones aproximadamente cada 30 días. Puedes ajustar la ventana o inhabilitar las actualizaciones progresivas para cada trabajo. Para obtener más información, consulta Cómo establecer el período de actualización progresiva.
Una semana antes de la actualización, Dataflow escribe un mensaje de notificación en los registros de mensajes del trabajo.
Trabajos de reemplazo. En el caso de los trabajos de transmisión, Dataflow verifica si hay actualizaciones cada vez que inicias un trabajo de reemplazo y usa automáticamente la versión estable más reciente conocida. Dataflow realiza esta verificación incluso si no cambias ningún código en el trabajo de reemplazo.
En el siguiente diagrama, se muestra el proceso de actualización. El usuario crea una canalización de Apache Beam con la versión X del SDK. Dataflow actualiza la versión de E/S administrada a la versión compatible más reciente. La actualización se produce cuando el usuario envía el trabajo, después de la ventana de actualización continua o cuando el usuario envía un trabajo de reemplazo.

El proceso de actualización agrega aproximadamente dos minutos al tiempo de inicio del primer trabajo (por proyecto) que usa E/S administradas y puede tardar alrededor de medio minuto para los trabajos posteriores. En el caso de las actualizaciones continuas, el servicio de Dataflow inicia un trabajo de reemplazo. Esto puede provocar un tiempo de inactividad temporal en tu canalización, ya que se detiene el grupo de trabajadores existente y se inicia uno nuevo. Para verificar el estado de las operaciones de E/S administradas, busca entradas de registro que incluyan la cadena "Managed Transform(s)".
Cómo establecer el período de actualización progresiva
Para especificar el período de actualización de un trabajo de Dataflow de transmisión, configura la opción de servicio managed_transforms_rolling_upgrade_window igual a la cantidad de días. El valor debe estar entre 10 y 90 días, inclusive.
Java
--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=DAYS
Python
--dataflow_service_options=managed_transforms_rolling_upgrade_window=DAYS
gcloud
Usa el comando gcloud dataflow jobs run con la opción additional-experiments. Si usas una plantilla de Flex que utiliza E/S administrada, usa el comando gcloud dataflow flex-template run.
--additional-experiments=managed_transforms_rolling_upgrade_window=DAYS
Para inhabilitar las actualizaciones progresivas, configura la opción de servicio managed_transforms_rolling_upgrade_window en never. Aún puedes activar una actualización si inicias un trabajo de reemplazo.
Java
--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=never
Python
--dataflow_service_options=managed_transforms_rolling_upgrade_window=never
Go
--dataflow_service_options=managed_transforms_rolling_upgrade_window=never
gcloud
Usa el comando gcloud dataflow jobs run con la opción additional-experiments. Si usas plantillas de Flex, usa el comando gcloud dataflow flex-template run.
--additional-experiments=managed_transforms_rolling_upgrade_window=never
API de configuración
La E/S administrada es una transformación de Apache Beam lista para usar que proporciona una API coherente para configurar fuentes y receptores.
Java
Para crear cualquier fuente o receptor compatible con E/S administradas, usa la clase Managed. Especifica qué fuente o receptor crear como instancia y pasa un conjunto de parámetros de configuración, de manera similar a lo siguiente:
Map config = ImmutableMap.<String, Object>builder()
.put("config1", "abc")
.put("config2", 1);
pipeline.apply(Managed.read(/*Which source to read*/).withConfig(config))
.getSinglePCollection();
También puedes pasar parámetros de configuración como un archivo YAML. Para ver un ejemplo de código completo, consulta Lee desde Apache Iceberg.
Python
Importa el módulo apache_beam.transforms.managed y llama al método managed.Read o managed.Write. Especifica qué fuente o receptor crear como instancia y pasa un conjunto de parámetros de configuración, de manera similar a lo siguiente:
pipeline
| beam.managed.Read(
beam.managed.SOURCE, # Example: beam.managed.KAFKA
config={
"config1": "abc",
"config2": 1
}
)
También puedes pasar parámetros de configuración como un archivo YAML. Para ver un ejemplo de código completo, consulta Lee desde Apache Kafka.
Destinos dinámicos
En algunos receptores, el conector de E/S administrado puede seleccionar de forma dinámica un destino según los valores de los campos en los registros entrantes.
Para usar destinos dinámicos, proporciona una cadena de plantilla para el destino. La cadena de plantilla puede incluir nombres de campos entre corchetes, como "tables.{field1}". En el tiempo de ejecución, el conector sustituye el valor del campo por cada registro entrante para determinar el destino de ese registro.
Por ejemplo, supongamos que tus datos tienen un campo llamado airport. Podrías establecer el destino en "flights.{airport}". Si airport=SFO, el registro se escribe en flights.SFO. Para los campos anidados, usa la notación de puntos. Por ejemplo: {top.middle.nested}.
Para ver un código de ejemplo que muestra cómo usar destinos dinámicos, consulta Escribe con destinos dinámicos.
Filtros
Es posible que desees filtrar ciertos campos antes de que se escriban en la tabla de destino. En el caso de los receptores que admiten destinos dinámicos, puedes usar los parámetros drop, keep o only para este propósito. Estos parámetros te permiten incluir metadatos de destino en los registros de entrada sin escribir los metadatos en el destino.
Puedes establecer, como máximo, uno de estos parámetros para un receptor determinado.
| Parámetro de configuración | Tipo de datos | Descripción |
|---|---|---|
drop |
lista de cadenas | Es una lista de nombres de campos que se quitarán antes de escribir en el destino. |
keep |
lista de cadenas | Es una lista de nombres de campos que se conservarán cuando se escriba en el destino. Se descartan otros campos. |
only |
cadena | Nombre de exactamente un campo para usar como registro de nivel superior cuando se escribe en el destino. Se descartan todos los demás campos. Este campo debe ser de tipo fila. |
Fuentes y receptores compatibles
La E/S administrada admite las siguientes fuentes y receptores.
Para obtener más información, consulta Conectores de E/S administrados en la documentación de Apache Beam.