Présentation des fonctions définies par l'utilisateur (UDF)

Une fonction définie par l'utilisateur (UDF) JavaScript est un type de transformation de message unique (SMT). Les UDF offrent un moyen flexible d'implémenter une logique de transformation personnalisée dans Pub/Sub, semblable aux UDF JavaScript BigQuery.

Les UDF acceptent un seul message en entrée, effectuent les actions définies sur l'entrée et renvoient le résultat du processus.

Les UDF présentent les propriétés clés suivantes :

  • Code : code JavaScript qui définit la logique de transformation.

  • Nom de la fonction : nom de la fonction JavaScript dans le code fourni que Pub/Sub applique aux messages.

Créer la fonction JavaScript

Le code UDF doit contenir une fonction avec la signature suivante :

  /**
  * Transforms a Pub/Sub message.
  * @return {(Object<string, (string | Object<string, string>)>|* null)} - To
  * filter a message, return `null`. To transform a message, return a map with
  * the following keys:
  *   - (required) 'data' : {string}
  *   - (optional) 'attributes' : {Object<string, string>}
  * Returning empty `attributes` will remove all attributes from the message.
  *
  * @param  {(Object<string, (string | Object<string, string>)>} - Pub/Sub
  * message. Keys:
  *   - (required) 'data' : {string}
  *   - (required) 'attributes' : {Object<string, string>}
  *
  * @param  {Object<string, any>} metadata - Pub/Sub message metadata.
  * Keys:
  *   - (optional) 'message_id'  : {string}
  *   - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format
  *   - (optional) 'ordering_key': {string}
  */
  function <function_name>(message, metadata) {
    // Perform custom transformation logic
    return message; // to filter a message instead, return `null`
  }

Entrées

La fonction prend les entrées suivantes :

  • Argument message : objet JavaScript représentant le message Pub/Sub. Il contient les propriétés suivantes :

    • data : (String, obligatoire) Charge utile du message.

    • attributes : (Object<String, String>, facultatif) Carte de paires clé/valeur représentant les attributs du message.

  • Argument metadata : objet JavaScript contenant des métadonnées immuables sur le message Pub/Sub :

    • message_id : (String, facultatif) ID unique du message.

    • publish_time : (String, facultatif) Heure de publication du message au format RFC 3339 (AAAA-MM-JJTHH:mm:ssZ).

    • ordering_key : (String, facultatif) Clé de tri du message, le cas échéant.

Sorties

La fonction doit renvoyer l'une des valeurs suivantes :

  • Pour transformer un message, modifiez le contenu de message.data et message.attributes, puis renvoyez l'objet message modifié.

  • Pour filtrer un message, renvoyez null.

Exigences concernant les entrées / sorties

  • Si l'UDF transforme la charge utile du message, les entrées et sorties de la charge utile doivent être des chaînes encodées au format UTF-8.
  • Si la UDF ne transforme pas la charge utile du message, celle-ci peut utiliser n'importe quel encodage.
  • Les paires clé-valeur d'attribut doivent être des chaînes encodées au format UTF-8.

Comment les UDF transforment un message

Le résultat de l'exécution d'une UDF sur un message peut être l'un des suivants :

  • La fonction définie par l'utilisateur transforme un message.

  • La fonction définie par l'utilisateur renvoie null.

    • SMT de sujet : Pub/Sub renvoie une réponse positive à l'éditeur et inclut un ID de message dans la réponse pour les messages filtrés. Pub/Sub ne stocke pas le message et ne l'envoie à aucun abonné.

    • SMT d'abonnement : Pub/Sub accuse réception de la distribution du message sans l'envoyer à un abonné.

  • La fonction définie par l'utilisateur génère une erreur.

    • SMT de sujet : Pub/Sub renvoie l'erreur à l'éditeur et ne publie aucun message.

    • SMT d'abonnement : Pub/Sub envoie un accusé de réception négatif pour le message.

Créer un SMT UDF

Les SMT peuvent être configurés sur des sujets ou des abonnements Pub/Sub.

  • Les SMT de sujet sont exécutées avant que Pub/Sub ne stocke le message, et les résultats sont disponibles pour tous les abonnés.
  • Les SMT d'abonnement sont exécutés avant la distribution du message, et les résultats ne sont disponibles que pour cet abonnement.

Console

  1. Dans la console Google Cloud , accédez à la page Sujets de Pub/Sub.

    Accéder aux sujets

  2. Créez un sujet ou un abonnement.

    • Pour créer un sujet, cliquez sur Créer un sujet. La page Créer un sujet s'ouvre.

    • Pour créer un abonnement :

      1. Cliquez sur le nom du sujet auquel vous souhaitez vous abonner.

      2. Cliquez sur Créer un abonnement. La page Ajouter un abonnement au sujet s'ouvre.

  3. Sous Transformations, cliquez sur Ajouter une transformation.

  4. Pour Type de transformation, sélectionnez Fonction JavaScript définie par l'utilisateur.

  5. Dans le champ Nom de la fonction, saisissez le nom de la fonction JavaScript appelée par le SMT. Exemple : redactSSN.

  6. Dans la zone de texte, saisissez le code de la UDF;utilisateur. Exemple :

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
    

    Le code doit contenir une fonction dont le nom correspond au champ Nom de la fonction.

  7. Si vous ne souhaitez pas que la SMT soit active immédiatement, sélectionnez Désactiver la transformation. Lorsque cette option est sélectionnée, le SMT est créé avec le sujet, mais n'est pas exécuté sur les messages entrants. Une fois le sujet créé, vous pouvez le modifier pour activer le SMT.

  8. Pour créer le thème ou l'abonnement, cliquez sur Créer.

gcloud

Créer un fichier de définition

Créez un fichier YAML ou JSON qui définit le SMT UDF.

YAML

- javascriptUdf:
    code: { FUNCTION_CODE }
    functionName: FUNCTION_NAME

JSON

{
  "javascriptUdf": {
    "code": {
      FUNCTION_CODE
    }
    "functionName": FUNCTION_NAME
  }
}

Remplacez les éléments suivants :

  • FUNCTION_CODE : code JavaScript de l'UDF. Le code doit contenir une fonction dont le nom correspond au champ functionName. Exemple :

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
    
  • FUNCTION_NAME : nom de la fonction JavaScript appelée par le SMT. Exemple : redactSSN.

Créer un sujet ou un abonnement

Pour créer un sujet, exécutez la commande gcloud pubsub topics create.

gcloud pubsub topics create TOPIC_ID \
  --message-transforms-file=TRANSFORMS_FILE

Remplacez les éléments suivants :

  • TOPIC_ID : ID ou nom de la rubrique que vous souhaitez créer.
  • TRANSFORMS_FILE : chemin d'accès au fichier de définition.

Pour créer un abonnement, exécutez la commande gcloud pubsub subscriptions create.

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=projects/PROJECT_ID/topics/TOPIC_ID \
  --message-transforms-file=TRANSFORMS_FILE

Remplacez les éléments suivants :

  • SUBSCRIPTION_ID : ID ou nom de l'abonnement à créer.

  • PROJECT_ID : ID du projet contenant le thème.

  • TOPIC_ID : ID du sujet auquel s'abonner.

  • TRANSFORMS_FILE : chemin d'accès au fichier de définition.

Si vous le souhaitez, vous pouvez valider et tester le SMT avant de le créer. Pour en savoir plus, consultez les pages suivantes :

Limites

Pub/Sub applique des limites de ressources aux UDF pour garantir des opérations de transformation efficaces. Voici quelques-unes de ces limites :

  • 20 Ko de code maximum par fonction définie par l'utilisateur
  • Un temps d'exécution maximal de 500 ms par message
  • Compatibilité uniquement avec les intégrations standards ECMAScript
  • Aucun appel à des API externes
  • Aucune importation de bibliothèques externes

Exemples de fonctions définies par l'utilisateur

Voici quelques exemples de fonctions définies par l'utilisateur pour la publication et l'abonnement. Vous trouverez d'autres exemples dans la bibliothèque UDF.

Fonction : convertir un entier correspondant à un jour de la semaine en chaîne correspondante

Lorsque vous ajoutez la UDF suivante à un sujet ou à un abonnement, les modifications suivantes sont apportées lors de la publication ou de la distribution des messages :

  1. Pub/Sub applique la fonction au message. Si le message ne comporte pas de charge utile JSON, la UDF génère une erreur.

  2. La fonction UDF recherche un champ appelé dayOfWeek et, si la valeur de ce champ est un nombre compris entre 0 et 6, elle le convertit en jour de la semaine correspondant, par exemple Monday. Si le champ n'existe pas ou si le nombre n'est pas compris entre 0 et 6, le code définit le champ dayOfWeek sur Unknown.

  3. La fonction définie par l'utilisateur sérialise la charge utile modifiée dans le message.

  4. Pub/Sub transmet le message mis à jour à l'étape suivante de votre pipeline.

function intToString(message, metadata) {
  const data = JSON.parse(message.data);
  switch(`data["dayOfWeek"]`) {
    case 0:
      data["dayOfWeek"] = "Sunday";
      break;
    case 1:
      data["dayOfWeek"] = "Monday";
      break;
    case 2:
      data["dayOfWeek"] = "Tuesday";
      break;
    case 3:
      data["dayOfWeek"] = "Wednesday";
      break;
    case 4:
      data["dayOfWeek"] = "Thursday";
      break;
    case 5:
      data["dayOfWeek"] = "Friday";
      break;
    case 6:
      data["dayOfWeek"] = "Saturday";
      break;
    default:
      data["dayOfWeek"] = "Unknown";
  }
  message.data = JSON.stringify(data);
  return message;
}

Fonction : masquer un numéro de sécurité sociale

Lorsque vous ajoutez la UDF suivante à un sujet ou à un abonnement, les modifications suivantes sont apportées lors de la publication ou de la distribution des messages :

  1. Pub/Sub applique la fonction au message. Si le message ne comporte pas de charge utile JSON, la UDF génère une erreur.

  2. La UDF supprime le champ ssn de la charge utile du message (s'il existe).

  3. La fonction définie par l'utilisateur sérialise la charge utile modifiée dans le message.

  4. Pub/Sub transmet le message mis à jour à l'étape suivante de votre pipeline.

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

Fonction : filtrer et confirmer automatiquement des messages spécifiques

Lorsque vous ajoutez la UDF suivante à un sujet ou à un abonnement, les modifications suivantes sont apportées lors de la publication ou de la distribution des messages :

  1. Pub/Sub applique la fonction au message. Si le message ne comporte pas de charge utile JSON, la UDF génère une erreur.

  2. La fonction UDF vérifie si la charge utile contient un champ appelé region.

  3. Si la valeur du champ region n'est pas US, la fonction renvoie la valeur nulle, ce qui entraîne le filtrage du message par Pub/Sub.

  4. Si la valeur du champ region est US, Pub/Sub transmet le message d'origine à l'étape suivante de votre pipeline.

function filterForUSRegion(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["region"] !== "US") {
    return null;
  }
  return message;
}

Fonction : valider le contenu du message pour s'assurer que le montant n'est pas supérieur à 100

Lorsque vous ajoutez la UDF suivante à un sujet ou à un abonnement, les modifications suivantes sont apportées lors de la publication ou de la distribution des messages :

  1. Pub/Sub applique la fonction au message. Si le message ne comporte pas de charge utile JSON, la UDF génère une erreur.

  2. La fonction définie par l'utilisateur vérifie si le message contient un champ appelé amount.

  3. Si la valeur du champ amount est supérieure à 100, la fonction génère une erreur.

  4. Si la valeur du champ amount n'est pas supérieure à 100, la fonction renvoie le message d'origine.

  5. Pub/Sub marque ensuite le message comme ayant échoué ou transmet le message d'origine à l'étape suivante de votre pipeline.

function validateAmount(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["amount"] > 100) {
    throw new Error("Amount is invalid");
  }
  return message;
}

Étapes suivantes