Ricevi e analizza i messaggi Pub/Sub sui profili di dati

Questo documento fornisce esempi che mostrano come ricevere e analizzare le notifiche relative alle modifiche ai profili dati. Sensitive Data Protection invia questi aggiornamenti sotto forma di messaggi Pub/Sub.

Panoramica

Puoi configurare Sensitive Data Protection in modo che generi automaticamente profili sui dati di un'organizzazione, una cartella o un progetto. I profili dati contengono metriche e metadati sui dati e ti aiutano a determinare dove si trovano i dati sensibili e ad alto rischio. Sensitive Data Protection riporta queste metriche a vari livelli di dettaglio. Per informazioni sui tipi di dati che puoi profilare, consulta Risorse supportate.

Quando configuri il profiler di dati, puoi attivare l'opzione per pubblicare messaggi Pub/Sub ogni volta che si verificano modifiche significative nei profili dati. I messaggi ti aiutano ad agire immediatamente in risposta a queste modifiche. Di seguito sono riportati gli eventi che puoi ascoltare:

  • Un asset di dati viene profilato per la prima volta.
  • Un profilo viene aggiornato.
  • Il punteggio di rischio o sensibilità di un profilo aumenta.
  • Si è verificato un nuovo errore relativo ai profili dati.

I messaggi Pub/Sub pubblicati dal profiler di dati contengono un DataProfilePubSubMessage oggetto. Questi messaggi vengono sempre inviati in formato binario, quindi devi scrivere codice che li riceva e li analizzi.

Prezzi

Quando utilizzi Pub/Sub, ti viene addebitato l'importo in base a i prezzi di Pub/Sub.

Prima di iniziare

Questa pagina presuppone quanto segue:

Prima di iniziare a lavorare sugli esempi, segui questi passaggi:

  1. Crea un argomento Pub/Sub e aggiungi una sottoscrizione. Non assegnare uno schema all'argomento.

    Per semplicità, gli esempi in questa pagina ascoltano una sola sottoscrizione. Tuttavia, in pratica, puoi creare un argomento e una sottoscrizione per ogni evento supportato da Sensitive Data Protection.

  2. Se non l'hai ancora fatto, configura il profiler di dati in modo che pubblichi messaggi Pub/Sub:

    1. Modifica la configurazione di scansione.

    2. Nella pagina Modifica configurazione di scansione, attiva l'opzione Pubblica in Pub/Sub e seleziona gli eventi che vuoi ascoltare. Quindi, configura le impostazioni per ogni evento.

    3. Salva la configurazione di scansione.

  3. Concedi all'agente di servizio Sensitive Data Protection l'accesso di pubblicazione all'argomento Pub/Sub. Un esempio di ruolo con accesso di pubblicazione è il ruolo Publisher Pub/Sub (roles/pubsub.publisher). L'agente di servizio Sensitive Data Protection è un indirizzo email nel formato:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    Se utilizzi una configurazione di scansione a livello di organizzazione o cartella, il PROJECT_NUMBER è l'identificatore numerico del container dell'agente di servizio. Se utilizzi una configurazione di scansione a livello di progetto, il PROJECT_NUMBER è l'identificatore numerico del progetto.

  4. Installa e configura la libreria client Sensitive Data Protection per Java o Python.

Esempi

Gli esempi seguenti mostrano come ricevere e analizzare i messaggi Pub/Sub pubblicati dal profiler di dati. Puoi riutilizzare questi esempi ed eseguirne il deployment come funzioni Cloud Run attivate da eventi Pub/Sub. Per ulteriori informazioni, consulta il tutorial Pub/Sub (2ª generazione.).

Negli esempi seguenti, sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto che contiene la sottoscrizione Pub/Sub.
  • SUBSCRIPTION_ID: l'ID della sottoscrizione Pub/Sub.

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2


project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

Passaggi successivi