Escreva do Dataflow para o Pub/Sub

Este documento descreve como escrever dados de texto do Dataflow para o Pub/Sub através do PubSubIO conetor I/O do Apache Beam.

Vista geral

Para escrever dados no Pub/Sub, use o conetor PubSubIO. Os elementos de entrada podem ser mensagens do Pub/Sub ou apenas os dados da mensagem. Se os elementos de entrada forem mensagens do Pub/Sub, pode, opcionalmente, definir atributos ou uma chave de ordenação em cada mensagem.

Pode usar a versão Java, Python ou Go do conector PubSubIO, da seguinte forma:

Java

Para escrever num único tópico, chame o método PubsubIO.writeMessages. Este método recebe uma coleção de entrada de objetos PubsubMessage. O conector também define métodos práticos para escrever strings, mensagens Avro codificadas em binário ou mensagens protobuf codificadas em binário. Estes métodos convertem a coleção de entrada em mensagens do Pub/Sub.

Para escrever num conjunto dinâmico de tópicos com base nos dados de entrada, chame writeMessagesDynamic. Especifique o tópico de destino para cada mensagem chamando PubsubMessage.withTopic na mensagem. Por exemplo, pode encaminhar mensagens para diferentes tópicos com base no valor de um campo específico nos seus dados de entrada.

Para mais informações, consulte a PubsubIO documentação de referência.

Python

Chame o método pubsub.WriteToPubSub. Por predefinição, este método recebe uma coleção de entrada do tipo bytes, que representa a carga útil da mensagem. Se o parâmetro with_attributes for True, o método recebe uma coleção de objetos PubsubMessage.

Para mais informações, consulte a documentação de referência do módulo pubsub.

Ir

Para escrever dados no Pub/Sub, chame o método pubsubio.Write. Este método recebe uma coleção de entrada de objetos PubSubMessage ou fatias de bytes que contêm os payloads das mensagens.

Para mais informações, consulte a documentação de referência do pacote pubsubio.

Para mais informações sobre as mensagens do Pub/Sub, consulte o Formato das mensagens na documentação do Pub/Sub.

Indicações de tempo

O Pub/Sub define uma data/hora em cada mensagem. Este carimbo de data/hora representa o momento em que a mensagem é publicada no Pub/Sub. Num cenário de streaming, também pode ter interesse na data/hora do evento, que é a hora em que os dados da mensagem foram gerados. Pode usar a data/hora do elemento do Apache Beam para representar a hora do evento. As origens que criam um PCollection ilimitado atribuem frequentemente a cada novo elemento uma data/hora que corresponde à hora do evento.

Para Java e Python, o conetor de E/S do Pub/Sub pode escrever a data/hora de cada elemento como um atributo de mensagem do Pub/Sub. Os consumidores de mensagens podem usar este atributo para obter a data/hora do evento.

Java

Chame PubsubIO.Write<T>.withTimestampAttribute e especifique o nome do atributo.

Python

Especifique o parâmetro timestamp_attribute quando chamar WriteToPubSub.

Entrega de mensagens

O Dataflow suporta o processamento exatamente uma vez de mensagens num pipeline. No entanto, o conetor de E/S do Pub/Sub não pode garantir a entrega de mensagens exatamente uma vez através do Pub/Sub.

Para Java e Python, pode configurar o conector de E/S do Pub/Sub para escrever o ID exclusivo de cada elemento como um atributo de mensagem. Os consumidores de mensagens podem, em seguida, usar este atributo para remover mensagens duplicadas.

Java

Chame PubsubIO.Write<T>.withIdAttribute e especifique o nome do atributo.

Python

Especifique o parâmetro id_label quando chamar WriteToPubSub.

Saída direta

Se ativar o modo de streaming pelo menos uma vez no seu pipeline, o conetor de E/S usa a saída direta. Neste modo, o conetor não regista mensagens, o que permite escritas mais rápidas. No entanto, as repetições neste modo podem causar mensagens duplicadas com IDs de mensagens diferentes, o que pode dificultar a eliminação da duplicação das mensagens por parte dos consumidores de mensagens.

Para pipelines que usam o modo exatamente uma vez, pode ativar a saída direta definindo a streaming_enable_pubsub_direct_output opção de serviço. A saída direta reduz a latência de escrita e resulta num processamento mais eficiente. Considere esta opção se os consumidores de mensagens puderem processar mensagens duplicadas com IDs de mensagens não exclusivos.

Exemplos

O exemplo seguinte cria um PCollection de mensagens Pub/Sub e escreve-as num tópico Pub/Sub. O tópico é especificado como uma opção de pipeline. Cada mensagem contém dados de carga útil e um conjunto de atributos.

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;



public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {"buyer": item["name"], "timestamp": str(item["ts"])}
    data = bytes(item["product"], "utf-8")

    return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {"name": "Robert", "product": "TV", "ts": 1613141590000},
        {"name": "Maria", "product": "Phone", "ts": 1612718280000},
        {"name": "Juan", "product": "Laptop", "ts": 1611618000000},
        {"name": "Rebeca", "product": "Video game", "ts": 1610000000000},
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(topic=options.topic, with_attributes=True)
        )

    print("Pipeline ran successfully.")