Menerima dan mengurai pesan Pub/Sub tentang profil data

Dokumen ini memberikan contoh yang menunjukkan cara menerima dan mengurai notifikasi tentang perubahan pada profil data Anda. Perlindungan Data Sensitif mengirimkan pembaruan ini dalam bentuk pesan Pub/Sub.

Ringkasan

Anda dapat mengonfigurasi Perlindungan Data Sensitif untuk otomatis membuat profil tentang data di seluruh organisasi, folder, atau project. Profil data berisi metrik dan metadata tentang data Anda serta membantu menentukan lokasi data sensitif dan berisiko tinggi. Perlindungan Data Sensitif melaporkan metrik ini pada berbagai tingkat detail. Untuk mengetahui informasi tentang jenis data yang dapat Anda profilkan, lihat Referensi yang didukung.

Saat mengonfigurasi data profiler, Anda dapat mengaktifkan opsi untuk memublikasikan pesan Pub/Sub setiap kali terjadi perubahan signifikan dalam profil data Anda. Pesan ini membantu Anda mengambil tindakan segera sebagai respons terhadap perubahan tersebut. Berikut adalah peristiwa yang dapat Anda dengarkan:

  • Aset data diprofilkan untuk pertama kalinya.
  • Profil diperbarui.
  • Skor risiko atau sensitivitas profil meningkat.
  • Ada error baru terkait profil data Anda.

Pesan Pub/Sub yang dipublikasikan oleh data profiler berisi objek DataProfilePubSubMessage. Pesan ini selalu dikirim dalam format biner, sehingga Anda perlu menulis kode yang menerima dan mengurainya.

Harga

Saat menggunakan Pub/Sub, Anda akan ditagih sesuai dengan harga Pub/Sub.

Sebelum memulai

Halaman ini mengasumsikan hal berikut:

Sebelum mulai mengerjakan contoh, ikuti langkah-langkah berikut:

  1. Buat topik Pub/Sub dan tambahkan langganan untuk topik tersebut. Jangan tetapkan skema ke topik.

    Untuk mempermudah, contoh di halaman ini hanya mendengarkan satu langganan. Namun, dalam praktiknya, Anda dapat membuat topik dan langganan untuk setiap peristiwa yang didukung oleh Perlindungan Data Sensitif.

  2. Jika Anda belum melakukannya, konfigurasikan data profiler untuk memublikasikan pesan Pub/Sub:

    1. Edit konfigurasi pemindaian Anda.

    2. Di halaman Edit konfigurasi pemindaian, aktifkan opsi Publikasikan ke Pub/Sub dan pilih peristiwa yang ingin Anda dengarkan. Kemudian, konfigurasikan setelan untuk setiap peristiwa.

    3. Simpan konfigurasi pemindaian.

  3. Berikan akses publikasi agen layanan Perlindungan Data Sensitif pada topik Pub/Sub. Contoh peran yang memiliki akses publikasi adalah peran Pub/Sub Publisher (roles/pubsub.publisher). Agen layanan Perlindungan Data Sensitif adalah alamat email dalam format:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    Jika Anda menggunakan konfigurasi pemindaian tingkat organisasi atau folder, ID numerik PROJECT_NUMBER adalah penampung agen layanan. Jika Anda menggunakan konfigurasi pemindaian tingkat project, PROJECT_NUMBER adalah ID numerik project Anda.

  4. Instal dan siapkan library klien Perlindungan Data Sensitif untuk Java atau Python.

Contoh

Contoh berikut menunjukkan cara menerima dan mengurai pesan Pub/Sub yang dipublikasikan oleh data profiler. Anda dapat menggunakan kembali contoh ini dan men-deploy-nya sebagai fungsi Cloud Run yang dipicu oleh peristiwa Pub/Sub. Untuk mengetahui informasi selengkapnya, lihat Tutorial Pub/Sub (generasi ke-2).

Dalam contoh berikut, ganti hal berikut:

  • PROJECT_ID: ID project yang berisi langganan Pub/Sub.
  • SUBSCRIPTION_ID: ID langganan Pub/Sub.

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2


project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

Langkah berikutnya