Este documento descreve como escrever dados de texto do Dataflow para o Pub/Sub através do PubSubIO
conetor I/O do Apache Beam.
Vista geral
Para escrever dados no Pub/Sub, use o conetor PubSubIO. Os elementos de entrada podem ser mensagens do Pub/Sub ou apenas os dados da mensagem.
Se os elementos de entrada forem mensagens do Pub/Sub, pode, opcionalmente,
definir atributos ou uma chave de ordenação em cada mensagem.
Pode usar a versão Java, Python ou Go do conector PubSubIO,
da seguinte forma:
Java
Para escrever num único tópico, chame o método
PubsubIO.writeMessages. Este método recebe uma coleção de entrada de objetos PubsubMessage. O conector também define métodos práticos para escrever strings, mensagens Avro codificadas em binário ou mensagens protobuf codificadas em binário. Estes métodos convertem a coleção de entrada em mensagens do Pub/Sub.
Para escrever num conjunto dinâmico de tópicos com base nos dados de entrada, chame writeMessagesDynamic. Especifique o tópico de destino para cada mensagem chamando PubsubMessage.withTopic na mensagem. Por exemplo, pode encaminhar mensagens para diferentes tópicos com base no valor de um campo específico nos seus dados de entrada.
Para mais informações, consulte a
PubsubIO
documentação de referência.
Python
Chame o método pubsub.WriteToPubSub.
Por predefinição, este método recebe uma coleção de entrada do tipo bytes, que representa a carga útil da mensagem. Se o parâmetro with_attributes for True, o método recebe uma coleção de objetos PubsubMessage.
Para mais informações, consulte a documentação de referência do
módulo pubsub.
Ir
Para escrever dados no Pub/Sub, chame o método pubsubio.Write. Este método recebe uma coleção de entrada de objetos PubSubMessage ou fatias de bytes que contêm os payloads das mensagens.
Para mais informações, consulte a documentação de referência do pacote pubsubio.
Para mais informações sobre as mensagens do Pub/Sub, consulte o Formato das mensagens na documentação do Pub/Sub.
Indicações de tempo
O Pub/Sub define uma data/hora em cada mensagem. Este carimbo de data/hora representa o momento em que a mensagem é publicada no Pub/Sub. Num cenário de streaming, também pode ter interesse na data/hora do evento, que é a hora em que os dados da mensagem foram gerados. Pode usar a
data/hora do elemento do Apache Beam
para representar a hora do evento. As origens que criam um PCollection ilimitado atribuem frequentemente a cada novo elemento uma data/hora que corresponde à hora do evento.
Para Java e Python, o conetor de E/S do Pub/Sub pode escrever a data/hora de cada elemento como um atributo de mensagem do Pub/Sub. Os consumidores de mensagens podem usar este atributo para obter a data/hora do evento.
Java
Chame PubsubIO.Write<T>.withTimestampAttribute e especifique o nome do atributo.
Python
Especifique o parâmetro timestamp_attribute quando chamar WriteToPubSub.
Entrega de mensagens
O Dataflow suporta o processamento exatamente uma vez de mensagens num pipeline. No entanto, o conetor de E/S do Pub/Sub não pode garantir a entrega de mensagens exatamente uma vez através do Pub/Sub.
Para Java e Python, pode configurar o conector de E/S do Pub/Sub para escrever o ID exclusivo de cada elemento como um atributo de mensagem. Os consumidores de mensagens podem, em seguida, usar este atributo para remover mensagens duplicadas.
Java
Chame PubsubIO.Write<T>.withIdAttribute e especifique o nome do atributo.
Python
Especifique o parâmetro id_label quando chamar WriteToPubSub.
Saída direta
Se ativar o modo de streaming pelo menos uma vez no seu pipeline, o conetor de E/S usa a saída direta. Neste modo, o conetor não regista mensagens, o que permite escritas mais rápidas. No entanto, as repetições neste modo podem causar mensagens duplicadas com IDs de mensagens diferentes, o que pode dificultar a eliminação da duplicação das mensagens por parte dos consumidores de mensagens.
Para pipelines que usam o modo exatamente uma vez, pode ativar a saída direta
definindo a streaming_enable_pubsub_direct_output
opção de serviço. A saída direta reduz a latência de escrita e resulta num processamento mais eficiente. Considere esta opção se os consumidores de mensagens puderem processar mensagens duplicadas com IDs de mensagens não exclusivos.
Exemplos
O exemplo seguinte cria um PCollection de mensagens Pub/Sub
e escreve-as num tópico Pub/Sub. O tópico é especificado como uma opção de pipeline. Cada mensagem contém dados de carga útil e um conjunto de atributos.
Java
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Python
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.