Ce document explique comment écrire des données textuelles depuis Dataflow vers Cloud Storage à l'aide du Connecteur d'E/S PubSubIO d'Apache Beam.
Présentation
Pour écrire des données dans Pub/Sub, utilisez le connecteur PubSubIO. Les éléments d'entrée peuvent être des messages Pub/Sub ou simplement les données de message.
Si les éléments d'entrée sont des messages Pub/Sub, vous pouvez éventuellement définir des attributs ou une clé de commande sur chaque message.
Vous pouvez utiliser la version Java, Python ou Go du connecteur PubSubIO, comme suit :
Java
Pour écrire dans un seul sujet, appelez la
PubsubIO.writeMessages méthode. Cette méthode utilise une collection d'entrée d'objets PubsubMessage. Le connecteur définit également des méthodes pratiques pour écrire des chaînes, des messages Avro encodés au format binaire ou des messages Protobuf encodés au format binaire. Ces méthodes convertissent la collection d'entrée en messages Pub/Sub.
Pour écrire dans un ensemble dynamique de sujets en fonction des données d'entrée, appelez
writeMessagesDynamic. Spécifiez le sujet de destination pour chaque message en appelant PubsubMessage.withTopic sur le message. Par exemple, vous pouvez acheminer les messages vers différents sujets en fonction de la valeur d'un champ particulier dans vos données d'entrée.
Pour plus d'informations, consultez la documentation de référence de PubsubIO.
Python
Appelez la méthode pubsub.WriteToPubSub.
Par défaut, cette méthode utilise une collection d'entrée de type bytes, qui représente la charge utile du message. Si le paramètre with_attributes est True, la méthode utilise une collection d'objets PubsubMessage.
Pour en savoir plus, consultez la documentation de référence du module pubsub.
Go
Pour écrire des données dans Pub/Sub, appelez la
pubsubio.Write méthode. Cette méthode utilise une collection d'entrée d'objets PubSubMessage ou de tranches d'octets contenant les charges utiles des messages.
Pour plus d'informations, consultez la documentation de référence du package pubsubio.
Pour en savoir plus sur les messages Pub/Sub, consultez Format des messages dans la documentation Pub/Sub.
Horodatages
Pub/Sub définit un horodatage sur chaque message. Cet horodatage représente l'heure à laquelle le message est publié dans Pub/Sub. Dans un scénario de streaming, vous pouvez également vous intéresser à l'horodatage de l'événement, qui correspond à l'heure à laquelle les données du message ont été générées. Vous pouvez utiliser l'horodatage de l'élément Apache Beam
pour représenter l'heure de l'événement. Les sources qui créent une PCollection illimitée attribuent souvent à chaque nouvel élément un horodatage qui correspond à l'heure de l'événement.
Pour Java et Python, le connecteur d'E/S Pub/Sub peut écrire l'horodatage de chaque élément en tant qu'attribut de message Pub/Sub. Les consommateurs de messages peuvent utiliser cet attribut pour obtenir l'horodatage de l'événement.
Java
Appelez PubsubIO.Write<T>.withTimestampAttribute et spécifiez le nom de l'
attribut.
Python
Spécifiez le paramètre timestamp_attribute lorsque vous appelez WriteToPubSub.
Distribution des messages
Dataflow prend en charge le traitement de type "exactement une fois" des messages dans un pipeline. Toutefois, le connecteur d'E/S Pub/Sub ne peut pas garantir la distribution de type "exactement une fois" des messages via Pub/Sub.
Pour Java et Python, vous pouvez configurer le connecteur d'E/S Pub/Sub pour qu'il écrive l'ID unique de chaque élément en tant qu'attribut de message. Les consommateurs de messages peuvent ensuite utiliser cet attribut pour dédupliquer les messages.
Java
Appelez PubsubIO.Write<T>.withIdAttribute et spécifiez le nom de l'
attribut.
Python
Spécifiez le paramètre id_label lorsque vous appelez WriteToPubSub.
Sortie directe
Si vous activez le mode de traitement en flux continu de type "au moins une fois" dans votre pipeline, le connecteur d'E/S utilise une sortie directe. Dans ce mode, le connecteur ne vérifie pas les messages, ce qui permet des écritures plus rapides. Toutefois, les nouvelles tentatives dans ce mode peuvent entraîner la duplication de messages avec des ID différents, ce qui peut rendre plus difficile la déduplication des messages par les consommateurs.
Pour les pipelines qui utilisent le mode "exactement une fois", vous pouvez activer la sortie directe en
définissant l'streaming_enable_pubsub_direct_output
option de service. La sortie directe réduit la latence d'écriture et permet un traitement plus efficace. Envisagez cette option si vos consommateurs de messages peuvent gérer les messages en double avec des ID de message non uniques.
Exemples
L'exemple suivant crée une PCollection de messages Pub/Sub et les écrit dans un sujet Pub/Sub. Le sujet est spécifié en tant qu'option de pipeline. Chaque message contient des données de charge utile et un ensemble d'attributs.
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Python
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.