קבלת הודעות Pub/Sub וניתוח שלהן לגבי פרופילי נתונים

במסמך הזה מופיעות דוגמאות שממחישות איך לקבל ולנתח הודעות על שינויים בפרופילי הנתונים. העדכונים האלה נשלחים על ידי Sensitive Data Protection בתור הודעות Pub/Sub.

סקירה כללית

אתם יכולים להגדיר את Sensitive Data Protection כך שיפיק באופן אוטומטי פרופילים של נתונים בארגון, בתיקייה או בפרויקט. פרופילים של נתונים מכילים מדדים ומטא-נתונים על הנתונים שלכם, ועוזרים לכם לקבוע איפה נמצאים נתונים רגישים ונתונים בסיכון גבוה. הדוחות של Sensitive Data Protection כוללים את המדדים האלה ברמות שונות של פירוט. מידע על סוגי הנתונים שאפשר ליצור להם פרופיל זמין במאמר בנושא משאבים נתמכים.

כשמגדירים את הכלי ליצירת פרופילים של נתונים, אפשר להפעיל את האפשרות לפרסם הודעות Pub/Sub בכל פעם שמתרחשים שינויים משמעותיים בפרופילים של הנתונים. ההודעות עוזרות לכם לפעול באופן מיידי בתגובה לשינויים האלה. אלה האירועים שאפשר להאזין להם:

  • נכס נתונים עובר פרופיל בפעם הראשונה.
  • פרופיל עודכן.
  • ציון הסיכון או הרגישות של פרופיל עולה.
  • יש שגיאה חדשה שקשורה לפרופילי הנתונים שלכם.

ההודעות ב-Pub/Sub שמנתח הפרופילים של הנתונים מפרסם מכילות אובייקט DataProfilePubSubMessage. ההודעות האלה תמיד נשלחות בפורמט בינארי, ולכן צריך לכתוב קוד שמקבל ומנתח אותן.

תמחור

כשמשתמשים ב-Pub/Sub, החיוב מתבצע בהתאם למחירון של Pub/Sub.

לפני שמתחילים

בדף הזה אנחנו יוצאים מנקודת הנחה ש:

לפני שמתחילים לעבוד על הדוגמאות, צריך לבצע את הפעולות הבאות:

  1. יוצרים נושא Pub/Sub ומוסיפים לו מינוי. לא מקצים סכימה לנושא.

    כדי לפשט את הדברים, הדוגמאות בדף הזה מתייחסות רק למינוי אחד. עם זאת, בפועל, אפשר ליצור נושא ומינוי לכל אירוע שנתמך על ידי Sensitive Data Protection.

  2. אם עדיין לא עשיתם זאת, צריך להגדיר את הכלי ליצירת פרופילים של נתונים כך שיפרסם הודעות ב-Pub/Sub:

    1. עורכים את הגדרות הסריקה.

    2. בדף עריכת הגדרות הסריקה, מפעילים את האפשרות פרסום ב-Pub/Sub ובוחרים את האירועים שרוצים להאזין להם. לאחר מכן, מגדירים את ההגדרות לכל אירוע.

    3. שומרים את הגדרות הסריקה.

  3. נותנים לסוכן השירות של Sensitive Data Protection גישת פרסום בנושא Pub/Sub. דוגמה לתפקיד עם גישת פרסום היא התפקיד 'פרסום הודעות ב-Pub/Sub'‏ (roles/pubsub.publisher). סוכן השירות של Sensitive Data Protection הוא כתובת אימייל בפורמט:

    service-PROJECT_NUMBER@dlp-api.iam.gserviceaccount.com
    

    אם אתם עובדים עם הגדרת סריקה ברמת הארגון או התיקייה, PROJECT_NUMBER הוא המזהה המספרי של מאגר סוכן השירות. אם אתם עובדים עם הגדרת סריקה ברמת הפרויקט, PROJECT_NUMBER הוא המזהה המספרי של הפרויקט.

  4. מתקינים ומגדירים את ספריית הלקוח של Sensitive Data Protection ל-Java או ל-Python.

דוגמאות

בדוגמאות הבאות מוסבר איך לקבל ולנתח הודעות Pub/Sub שפרופיל הנתונים מפרסם. אפשר להשתמש בדוגמאות האלה למטרות אחרות ולפרוס אותן כפונקציות Cloud Run שמופעלות על ידי אירועי Pub/Sub. מידע נוסף זמין במאמר הדרכה בנושא Pub/Sub (דור שני).

בדוגמאות הבאות, מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט שמכיל את מינוי ה-Pub/Sub.
  • SUBSCRIPTION_ID: המזהה של המינוי ל-Pub/Sub.

Java

import com.google.api.core.ApiService;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.privacy.dlp.v2.DataProfilePubSubMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DataProfilePubSubMessageParser {

  public static void main(String... args) throws Exception {
    String projectId = "PROJECT_ID";
    String subscriptionId = "SUBSCRIPTION_ID";
    int timeoutSeconds = 5;

    // The `ProjectSubscriptionName.of` method creates a fully qualified identifier
    // in the form `projects/{projectId}/subscriptions/{subscriptionId}`.
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage pubsubMessage, AckReplyConsumer consumer) -> {
          try {
            DataProfilePubSubMessage message = DataProfilePubSubMessage.parseFrom(
                pubsubMessage.getData());
            System.out.println(
                "PubsubMessage with ID: " + pubsubMessage.getMessageId()
                    + "; message size: " + pubsubMessage.getData().size()
                    + "; event: " + message.getEvent()
                    + "; profile name: " + message.getProfile().getName()
                    + "; full resource: " + message.getProfile().getFullResource());
            consumer.ack();
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
    try {
      ApiService apiService = subscriber.startAsync();
      apiService.awaitRunning();
      System.out.printf("Listening for messages on %s for %d seconds.%n", subscriptionName,
          timeoutSeconds);
      subscriber.awaitTerminated(timeoutSeconds, TimeUnit.SECONDS);
    } catch (TimeoutException ignored) {
    } finally {
      subscriber.stopAsync();
    }
  }
}

Python

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
from google.cloud import dlp_v2


project_id = "PROJECT_ID"
subscription_id = "SUBSCRIPTION_ID"
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data}.")
    dlp_msg = dlp_v2.DataProfilePubSubMessage()
    dlp_msg._pb.ParseFromString(message.data)
    print("Parsed message: ", dlp_msg)
    print("--------")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path} for {timeout} seconds...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
        print("Done waiting.")

המאמרים הבאים