A E/S gerenciada permite que o Dataflow gerencie conectores de E/S específicos usados em pipelines do Apache Beam. A E/S gerenciada simplifica o gerenciamento de pipelines que se integram a origens e coletores compatíveis.
A E/S gerenciada consiste em dois componentes que funcionam juntos:
Uma transformação do Apache Beam que fornece uma API comum para criar conectores de E/S (origens e coletores).
Um serviço do Dataflow que gerencia esses conectores de E/S em seu nome, incluindo a capacidade de fazer upgrade deles de forma independente da versão do Apache Beam.
As vantagens da E/S gerenciada incluem o seguinte:
Upgrades automáticos. O Dataflow faz upgrade automático dos conectores de E/S gerenciados no pipeline. Isso significa que o pipeline recebe correções de segurança, melhorias de desempenho e correções de bugs para esses conectores, sem exigir mudanças no código. Para mais informações, consulte Upgrades automáticos.
API consistente. Tradicionalmente, os conectores de E/S no Apache Beam têm APIs distintas, e cada conector é configurado de uma maneira diferente. A E/S gerenciada fornece uma única API de configuração que usa propriedades de chave-valor, resultando em um código de pipeline mais simples e consistente. Para mais informações, consulte API de configuração.
Requisitos
Os SDKs a seguir oferecem suporte à E/S gerenciada:
- SDK do Apache Beam para Java versão 2.58.0 ou mais recente.
- SDK do Apache Beam para Python versão 2.61.0 ou mais recente.
O serviço de back-end requer o Dataflow Runner v2. Se o Runner v2 não estiver ativado, o pipeline ainda será executado, mas não terá os benefícios do serviço de E/S gerenciado.
Upgrades automáticos
Os pipelines do Dataflow com conectores de E/S gerenciados usam automaticamente a versão confiável mais recente do conector. Os upgrades automáticos ocorrem nos seguintes pontos do ciclo de vida do job:
Envio de jobs. Ao enviar um job em lote ou de streaming, o Dataflow usa a versão mais recente do conector de E/S gerenciado que foi testada e funciona bem.
Upgrades contínuos. Para jobs de streaming, o Dataflow faz upgrade dos conectores de E/S gerenciados em pipelines em execução à medida que novas versões ficam disponíveis. Não é necessário atualizar manualmente o conector ou a versão do Apache Beam do pipeline.
Por padrão, os upgrades contínuos acontecem em uma janela de 30 dias, ou seja, os upgrades são realizados aproximadamente a cada 30 dias. É possível ajustar a janela ou desativar os upgrades contínuos por job. Para mais informações, consulte Definir a janela de upgrade contínuo.
Uma semana antes do upgrade, o Dataflow grava uma notificação mensagem nos registros de mensagens do job.
Jobs de substituição. Para jobs de streaming, o Dataflow verifica se há atualizações sempre que você inicia um job de substituição, e usa automaticamente a versão mais recente conhecida. O Dataflow realiza essa verificação mesmo que você não mude nenhum código no job de substituição.
O diagrama a seguir mostra o processo de upgrade. O usuário cria um pipeline do Apache Beam usando a versão X do SDK. O Dataflow faz upgrade da versão de E/S gerenciada para a versão mais recente compatível. O upgrade acontece quando o usuário envia o job, após a janela de upgrade contínuo ou quando o usuário envia um job de substituição.

O processo de upgrade adiciona cerca de dois minutos ao tempo de inicialização do primeiro job (por projeto) que usa E/S gerenciada e pode ser de cerca de meio minuto para jobs subsequentes. Para upgrades contínuos, o serviço do Dataflow
inicia um
job de substituição. Isso pode resultar em inatividade temporária do pipeline, já que o pool de workers atual é encerrado e um novo pool de workers é iniciado. Para verificar o status das operações de E/S gerenciadas, procure
entradas de registro que incluam a string
"Managed Transform(s)".
Definir a janela de upgrade contínuo
Para especificar a janela de upgrade de um job de streaming do Dataflow, defina
a managed_transforms_rolling_upgrade_window
opção de serviço como o número
de dias. O valor precisa estar entre 10 e 90 dias, inclusive.
Java
--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=DAYS
Python
--dataflow_service_options=managed_transforms_rolling_upgrade_window=DAYS
gcloud
Use o
gcloud dataflow jobs run comando
com a additional-experiments opção. Se você estiver usando um modelo Flex que
usa E/S gerenciada, use o
gcloud dataflow flex-template run
comando.
--additional-experiments=managed_transforms_rolling_upgrade_window=DAYS
Para desativar os upgrades contínuos, defina a opção de serviço managed_transforms_rolling_upgrade_window como never. Ainda é possível acionar uma atualização iniciando um job de substituição.
Java
--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=never
Python
--dataflow_service_options=managed_transforms_rolling_upgrade_window=never
Go
--dataflow_service_options=managed_transforms_rolling_upgrade_window=never
gcloud
Use o
gcloud dataflow jobs run comando
com a additional-experiments opção. Se você estiver usando modelos Flex, use
o
gcloud dataflow flex-template run
comando.
--additional-experiments=managed_transforms_rolling_upgrade_window=never
API de configuração
A E/S gerenciada é uma transformação do Apache Beam pronta para uso que fornece uma API consistente para configurar origens e coletores.
Java
Para criar qualquer origem ou coletor com suporte à E/S gerenciada, use a
Managed classe. Especifique qual origem ou coletor instanciar e transmita um conjunto de parâmetros de configuração, semelhante ao seguinte:
Map config = ImmutableMap.<String, Object>builder()
.put("config1", "abc")
.put("config2", 1);
pipeline.apply(Managed.read(/*Which source to read*/).withConfig(config))
.getSinglePCollection();
Também é possível transmitir parâmetros de configuração como um arquivo YAML. Para um exemplo de código completo, consulte Ler do Apache Iceberg.
Python
Importe o módulo apache_beam.transforms.managed
e chame o método managed.Read ou managed.Write. Especifique qual origem ou coletor instanciar e transmita um conjunto de parâmetros de configuração, semelhante ao seguinte:
pipeline
| beam.managed.Read(
beam.managed.SOURCE, # Example: beam.managed.KAFKA
config={
"config1": "abc",
"config2": 1
}
)
Também é possível transmitir parâmetros de configuração como um arquivo YAML. Para um exemplo de código completo, consulte Ler do Apache Kafka.
Destinos dinâmicos
Para alguns coletores, o conector de E/S gerenciado pode selecionar dinamicamente um destino com base nos valores de campo nos registros recebidos.
Para usar destinos dinâmicos, forneça uma string de modelo para o destino. A
string de modelo pode incluir nomes de campos entre chaves, como
"tables.{field1}". No ambiente de execução, o conector substitui o valor do campo por cada registro recebido para determinar o destino desse registro.
Por exemplo, suponha que seus dados tenham um campo chamado airport. Você pode definir o
destino como "flights.{airport}". Se airport=SFO, o registro será gravado
em flights.SFO. Para campos aninhados, use a notação de ponto. Por exemplo: {top.middle.nested}.
Para um exemplo de código que mostra como usar destinos dinâmicos, consulte Gravar com destinos dinâmicos.
Filtragem
Talvez seja necessário filtrar determinados campos antes que eles sejam gravados na tabela de destino. Para coletores que oferecem suporte a destinos dinâmicos, é possível usar o parâmetro drop, keep ou only para essa finalidade. Esses parâmetros permitem incluir metadados de destino nos registros de entrada, sem gravar os metadados no destino.
É possível definir no máximo um desses parâmetros para um determinado coletor.
| Parâmetro de configuração | Tipo de dado | Descrição |
|---|---|---|
drop |
lista de strings | Uma lista de nomes de campos a serem descartados antes da gravação no destino. |
keep |
lista de strings | Uma lista de nomes de campos a serem mantidos ao gravar no destino. Outros campos são descartados. |
only |
string | O nome de exatamente um campo a ser usado como o registro de nível superior a ser gravado ao gravar no destino. Todos os outros campos são descartados. Esse campo precisa ser do tipo de linha. |
Fontes e coletores compatíveis
A E/S gerenciada oferece suporte às seguintes origens e coletores.
Para mais informações, consulte Conectores de E/S gerenciados na documentação do Apache Beam.