גישה למטא-נתונים ב-Apache Spark

בדף הזה מוסבר איך ליצור אשכול Dataproc שמריץ Spark. אפשר להשתמש באשכול הזה כדי לעבוד עם מטא-נתונים של Dataplex Universal Catalog עבור אגמים, אזורים ונכסים.

סקירה כללית

יוצרים אשכול אחרי שמופע של שירות Dataproc Metastore משויך לאגם Dataplex Universal Catalog, כדי לוודא שהאשכול יכול להסתמך על נקודת הקצה של Hive Metastore כדי לקבל גישה למטא-נתונים של Dataplex Universal Catalog.

אפשר לגשת למטא-נתונים שמנוהלים ב-Dataplex Universal Catalog באמצעות ממשקי סטנדרט, כמו Hive Metastore, כדי להפעיל שאילתות Spark. השאילתות מורצות באשכול Dataproc.

כדי למנוע שגיאות בהרצה של נתוני Parquet, צריך להגדיר את מאפיין Spark‏ spark.sql.hive.convertMetastoreParquet ל-false. פרטים נוספים

יצירת אשכול Dataproc

מריצים את הפקודות הבאות כדי ליצור אשכול Dataproc, ומציינים את שירות Dataproc Metastore שמשויך לאגם של Dataplex Universal Catalog:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

עיון במטא-נתונים

מריצים שאילתות DQL כדי לבדוק את המטא-נתונים ומריצים שאילתות Spark כדי לשלוף נתונים.

לפני שמתחילים

  1. פותחים סשן SSH בצומת הראשי של אשכול Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. בשורת הפקודה של הצומת הראשי, פותחים Python REPL חדש.

    python3
    

הצגת רשימה של מסדי נתונים

כל אזור ב-Dataplex Universal Catalog שמוגדר באגם ממופה למסד נתונים של חנות מטא-נתונים.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

הצגת רשימת הטבלאות

הצגת רשימת הטבלאות באחד מהאזורים.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

שאילתות על נתונים

מריצים שאילתה על הנתונים באחת מהטבלאות.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

יצירת טבלאות ומחיצות במטא-נתונים

הפעלת שאילתות DDL כדי ליצור טבלאות ומחיצות במטא-נתונים של Dataplex Universal Catalog באמצעות Apache Spark.

מידע נוסף על סוגי הנתונים, פורמטים של קבצים ופורמטים של שורות שנתמכים זמין במאמר ערכים נתמכים.

לפני שמתחילים

לפני שיוצרים טבלה, צריך ליצור נכס של Dataplex Universal Catalog שממופה לקטגוריית Cloud Storage שמכילה את נתוני הבסיס. מידע נוסף זמין במאמר בנושא הוספת נכס.

צור טבלה

יש תמיכה בטבלאות בפורמטים Parquet,‏ ORC,‏ AVRO,‏ CSV ו-JSON.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

שינוי טבלה

ב-Dataplex Universal Catalog אי אפשר לשנות את המיקום של טבלה או לערוך את עמודות המחיצה של טבלה. שינוי טבלה לא מגדיר אוטומטית את userManaged ל-true.

ב-Spark SQL, אפשר לשנות את השם של טבלה, להוסיף עמודות ולהגדיר את פורמט הקובץ של טבלה.

שינוי שם של טבלה

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

הוספת עמודות

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

הגדרת פורמט הקובץ

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

הסרת טבלה

הסרת טבלה מ-Dataplex Universal Catalog metadata API לא מוחקת את הנתונים הבסיסיים ב-Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

הוספת מחיצה

ב-Dataplex Universal Catalog אי אפשר לשנות מחיצה אחרי שיוצרים אותה. אבל אפשר להסיר את המחיצה.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

אפשר להוסיף כמה מחיצות עם אותו מפתח מחיצה וערכי מחיצה שונים, כמו בדוגמה הקודמת.

הוספת מחיצה

כדי להסיר מחיצה, מריצים את הפקודה הבאה:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

שאילתות בטבלאות Iceberg

אפשר לשלוח שאילתות לטבלאות Iceberg באמצעות Apache Spark.

לפני שמתחילים

הגדרת סשן Spark SQL עם Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

יצירת טבלת Iceberg

כדי ליצור טבלת Iceberg, מריצים את הפקודה הבאה:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

עיון בתמונת מצב ובהיסטוריה של הקרחון

אפשר לקבל תמונות מצב והיסטוריה של טבלאות Iceberg באמצעות Apache Spark.

לפני שמתחילים

הגדרת סשן PySpark עם תמיכה ב-Iceberg:

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

קבלת היסטוריה של טבלאות Iceberg

כדי לקבל את ההיסטוריה של טבלת Iceberg, מריצים את הפקודה הבאה:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

קבלת תמונות מצב של טבלאות Iceberg

כדי לקבל תמונת מצב של טבלת Iceberg, מריצים את הפקודה הבאה:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

סוגי נתונים ופורמטים של קבצים נתמכים

סוגי הנתונים הנתמכים מוגדרים כך:

סוג נתונים ערכים
פרימיטיבי
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
מערך ARRAY < DATA_TYPE >
מבנה STRUCT < COLUMN : DATA_TYPE >

אלה פורמטים של קבצים שנתמכים:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

מידע נוסף על פורמטים של קבצים זמין במאמר בנושא פורמטים של אחסון.

אלה הפורמטים הנתמכים של שורות:

  • מופרד באמצעות תו [השדות מסתיימים ב-CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

המאמרים הבאים