הרחבת הפרטיות הדיפרנציאלית
במאמר הזה מפורטות דוגמאות להרחבת הפרטיות הדיפרנציאלית ב-BigQuery.
BigQuery מאפשר להרחיב את הפרטיות הדיפרנציאלית למקורות נתונים מרובי עננים ולספריות חיצוניות של פרטיות דיפרנציאלית. במסמך הזה מובאות דוגמאות לאופן השימוש בפרטיות דיפרנציאלית במקורות נתונים מרובי עננים כמו AWS S3 עם BigQuery Omni, לאופן הקריאה לספריית פרטיות דיפרנציאלית חיצונית באמצעות פונקציה מרוחקת ולאופן ביצוע צבירות של פרטיות דיפרנציאלית באמצעות PipelineDP, ספריית Python שאפשר להריץ באמצעות Apache Spark ו-Apache Beam.
מידע נוסף על פרטיות דיפרנציאלית זמין במאמר שימוש בפרטיות דיפרנציאלית.
פרטיות דיפרנציאלית עם BigQuery Omni
הפרטיות הדיפרנציאלית ב-BigQuery תומכת בקריאות למקורות נתונים מרובי עננים כמו AWS S3. בדוגמה הבאה מוצגת שאילתה למקור נתונים חיצוני, foo.wikidata, שמוחל עליו פרטיות דיפרנציאלית. מידע נוסף על התחביר של סעיף הפרטיות הדיפרנציאלית זמין במאמר סעיף הפרטיות הדיפרנציאלית.
SELECT WITH DIFFERENTIAL_PRIVACY OPTIONS ( epsilon = 1, delta = 1e-5, privacy_unit_column = foo.wikidata.es_description) COUNT(*) AS results FROM foo.wikidata;
בדוגמה הזו, התוצאות שמוחזרות דומות לתוצאות הבאות:
-- These results will change each time you run the query. +----------+ | results | +----------+ | 3465 | +----------+
מידע נוסף על המגבלות של BigQuery Omni זמין במאמר מגבלות.
הפעלת ספריות חיצוניות של פרטיות דיפרנציאלית באמצעות פונקציות מרחוק
אפשר לקרוא לספריות חיצוניות של פרטיות דיפרנציאלית באמצעות פונקציות מרוחקות. הקישור הבא משתמש בפונקציה מרוחקת כדי לקרוא לספרייה חיצונית שמתארחת ב-Tumult Analytics, כדי להשתמש בפרטיות דיפרנציאלית עם ריכוז אפס על מערך נתונים של מכירות קמעונאיות.
מידע על עבודה עם Tumult Analytics זמין בפוסט על השקת Tumult Analytics {: .external}.
צבירות של פרטיות דיפרנציאלית באמצעות PipelineDP
PipelineDP היא ספריית Python שמבצעת צבירות של פרטיות דיפרנציאלית ויכולה לפעול עם Apache Spark ו-Apache Beam. ב-BigQuery אפשר להריץ תהליכים מאוחסנים של Apache Spark שנכתבו ב-Python. מידע נוסף על הפעלת פרוצדורות מאוחסנות של Apache Spark זמין במאמר עבודה עם פרוצדורות מאוחסנות של Apache Spark.
בדוגמה הבאה מבוצעת צבירה של פרטיות דיפרנציאלית באמצעות ספריית PipelineDP. הוא משתמש במערך הנתונים הציבורי של נסיעות במוניות בשיקגו ומחשב לכל מונית את מספר הנסיעות, ואת סכום הטיפים והממוצע שלהם לנסיעות האלה.
לפני שמתחילים
תמונה רגילה של Apache Spark לא כוללת את PipelineDP. לפני שמריצים פרוצדורה מאוחסנת של PipelineDP, צריך ליצור קובץ אימג' של Docker שמכיל את כל התלויות הנדרשות. בקטע הזה מוסבר איך ליצור קובץ אימג' של Docker ולהעביר אותו בדחיפה אל Google Cloud.
לפני שמתחילים, מוודאים ש-Docker מותקן במחשב המקומי ושהגדרתם אימות להעלאת קובצי אימג' של Docker אל gcr.io. מידע נוסף על שליחת קובצי אימג' של Docker זמין במאמר שליחה ומשיכה של קובצי אימג'.
יצירה והעברה בדחיפה של קובץ אימג' של Docker
כדי ליצור קובץ אימג' של Docker עם יחסי התלות הנדרשים ולדחוף אותו:
- יוצרים תיקייה מקומית
DIR. - מורידים את הקובץ להתקנת Miniconda עם גרסת Python 3.9 אל
DIR. שומרים את הטקסט הבא ב-Dockerfile.
# Debian 11 is recommended. FROM debian:11-slim # Suppress interactive prompts ENV DEBIAN_FRONTEND=noninteractive # (Required) Install utilities required by Spark scripts. RUN apt update && apt install -y procps tini libjemalloc2 # Enable jemalloc2 as default memory allocator ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 # Install and configure Miniconda3. ENV CONDA_HOME=/opt/miniconda3 ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python ENV PATH=${CONDA_HOME}/bin:${PATH} COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh . RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \ && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \ && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict # The following packages are installed in the default image, it is # strongly recommended to include all of them. RUN apt install -y python3 RUN apt install -y python3-pip RUN apt install -y libopenblas-dev RUN pip install \ cython \ fastavro \ fastparquet \ gcsfs \ google-cloud-bigquery-storage \ google-cloud-bigquery[pandas] \ google-cloud-bigtable \ google-cloud-container \ google-cloud-datacatalog \ google-cloud-dataproc \ google-cloud-datastore \ google-cloud-language \ google-cloud-logging \ google-cloud-monitoring \ google-cloud-pubsub \ google-cloud-redis \ google-cloud-spanner \ google-cloud-speech \ google-cloud-storage \ google-cloud-texttospeech \ google-cloud-translate \ google-cloud-vision \ koalas \ matplotlib \ nltk \ numba \ numpy \ orc \ pandas \ pyarrow \ pysal \ regex \ requests \ rtree \ scikit-image \ scikit-learn \ scipy \ seaborn \ sqlalchemy \ sympy \ tables \ virtualenv RUN pip install --no-input pipeline-dp==0.2.0 # (Required) Create the 'spark' group/user. # The GID and UID must be 1099. Home directory is required. RUN groupadd -g 1099 spark RUN useradd -u 1099 -g 1099 -d /home/spark -m spark USER sparkמריצים את הפקודה הבאה.
IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1 # Build and push the image. docker build -t "${IMAGE}" docker push "${IMAGE}"מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: הפרויקט שבו רוצים ליצור את קובץ אימג' של Docker. -
DOCKER_IMAGE: שם קובץ האימג' של Docker.
התמונה מועלית.
-
הפעלת תהליך מאוחסן של PipelineDP
כדי ליצור פרוצדורה מאוחסנת, משתמשים בהצהרה CREATE PROCEDURE.
CREATE OR REPLACE PROCEDURE `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`() WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID` OPTIONS ( engine = "SPARK", container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import pipeline_dp def compute_dp_metrics(data, spark_context): budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10, total_delta=1e-6) backend = pipeline_dp.SparkRDDBackend(spark_context) # Create a DPEngine instance. dp_engine = pipeline_dp.DPEngine(budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=pipeline_dp.NoiseKind.LAPLACE, metrics=[ pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN], max_partitions_contributed=1, max_contributions_per_partition=1, min_value=0, # Tips that are larger than 100 will be clipped to 100. max_value=100) # Specify how to extract privacy_id, partition_key and value from an # element of the taxi dataset. data_extractors = pipeline_dp.DataExtractors( partition_extractor=lambda x: x.taxi_id, privacy_id_extractor=lambda x: x.unique_key, value_extractor=lambda x: 0 if x.tips is None else x.tips) # Run aggregation. dp_result = dp_engine.aggregate(data, params, data_extractors) budget_accountant.compute_budgets() dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean)) return dp_result spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate() spark_context = spark.sparkContext # Load data from BigQuery. taxi_trips = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \ .load().rdd dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"]) # Saving the data to BigQuery dp_result.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("DATASET_ID.TABLE_NAME") """;
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: הפרויקט שבו רוצים ליצור את הפרוצדורה המאוחסנת. -
DATASET_ID: מערך הנתונים שבו רוצים ליצור את הפרוצדורה המאוחסנת. -
REGION: האזור שבו נמצא הפרויקט. -
DOCKER_IMAGE: שם קובץ האימג' של Docker. -
CONNECTION_ID: השם של החיבור. -
TABLE_NAME: שם הטבלה.
-
משתמשים בהצהרת CALL כדי לקרוא לפרוצדורה.
CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: הפרויקט שבו רוצים ליצור את הפרוצדורה המאוחסנת. -
DATASET_ID: מערך הנתונים שבו רוצים ליצור את הפרוצדורה המאוחסנת.
-
המאמרים הבאים
- איך משתמשים בפרטיות דיפרנציאלית
- מידע נוסף על סעיף הפרטיות הדיפרנציאלית
- איך משתמשים בפונקציות מצטברות עם פרטיות דיפרנציאלית