בדף הזה מוסבר איך ליצור אשכול Dataproc שמריץ Spark. אפשר להשתמש באשכול הזה כדי לעבוד עם מטא-נתונים של Dataplex Universal Catalog עבור אגמים, אזורים ונכסים.
סקירה כללית
יוצרים אשכול אחרי שמופע של שירות Dataproc Metastore משויך לאגם Dataplex Universal Catalog, כדי לוודא שהאשכול יכול להסתמך על נקודת הקצה של Hive Metastore כדי לקבל גישה למטא-נתונים של Dataplex Universal Catalog.
אפשר לגשת למטא-נתונים שמנוהלים ב-Dataplex Universal Catalog באמצעות ממשקי סטנדרט, כמו Hive Metastore, כדי להפעיל שאילתות Spark. השאילתות מורצות באשכול Dataproc.
כדי למנוע שגיאות בהרצה של נתוני Parquet, צריך להגדיר את מאפיין Spark spark.sql.hive.convertMetastoreParquet ל-false. פרטים נוספים
יצירת אשכול Dataproc
מריצים את הפקודות הבאות כדי ליצור אשכול Dataproc, ומציינים את שירות Dataproc Metastore שמשויך לאגם של Dataplex Universal Catalog:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
עיון במטא-נתונים
מריצים שאילתות DQL כדי לבדוק את המטא-נתונים ומריצים שאילתות Spark כדי לשלוף נתונים.
לפני שמתחילים
פותחים סשן SSH בצומת הראשי של אשכול Dataproc.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONEבשורת הפקודה של הצומת הראשי, פותחים Python REPL חדש.
python3
הצגת רשימה של מסדי נתונים
כל אזור ב-Dataplex Universal Catalog שמוגדר באגם ממופה למסד נתונים של חנות מטא-נתונים.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
הצגת רשימת הטבלאות
הצגת רשימת הטבלאות באחד מהאזורים.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
שאילתות על נתונים
מריצים שאילתה על הנתונים באחת מהטבלאות.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
יצירת טבלאות ומחיצות במטא-נתונים
הפעלת שאילתות DDL כדי ליצור טבלאות ומחיצות במטא-נתונים של Dataplex Universal Catalog באמצעות Apache Spark.
מידע נוסף על סוגי הנתונים, פורמטים של קבצים ופורמטים של שורות שנתמכים זמין במאמר ערכים נתמכים.
לפני שמתחילים
לפני שיוצרים טבלה, צריך ליצור נכס של Dataplex Universal Catalog שממופה לקטגוריית Cloud Storage שמכילה את נתוני הבסיס. מידע נוסף זמין במאמר בנושא הוספת נכס.
צור טבלה
יש תמיכה בטבלאות בפורמטים Parquet, ORC, AVRO, CSV ו-JSON.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
שינוי טבלה
ב-Dataplex Universal Catalog אי אפשר לשנות את המיקום של טבלה או לערוך את עמודות המחיצה של טבלה. שינוי טבלה לא מגדיר אוטומטית את userManaged ל-true.
ב-Spark SQL, אפשר לשנות את השם של טבלה, להוסיף עמודות ולהגדיר את פורמט הקובץ של טבלה.
שינוי שם של טבלה
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
הוספת עמודות
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
הגדרת פורמט הקובץ
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
הסרת טבלה
הסרת טבלה מ-Dataplex Universal Catalog metadata API לא מוחקת את הנתונים הבסיסיים ב-Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
הוספת מחיצה
ב-Dataplex Universal Catalog אי אפשר לשנות מחיצה אחרי שיוצרים אותה. אבל אפשר להסיר את המחיצה.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
אפשר להוסיף כמה מחיצות עם אותו מפתח מחיצה וערכי מחיצה שונים, כמו בדוגמה הקודמת.
הוספת מחיצה
כדי להסיר מחיצה, מריצים את הפקודה הבאה:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
שאילתות בטבלאות Iceberg
אפשר לשלוח שאילתות לטבלאות Iceberg באמצעות Apache Spark.
לפני שמתחילים
הגדרת סשן Spark SQL עם Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
יצירת טבלת Iceberg
כדי ליצור טבלת Iceberg, מריצים את הפקודה הבאה:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
עיון בתמונת מצב ובהיסטוריה של הקרחון
אפשר לקבל תמונות מצב והיסטוריה של טבלאות Iceberg באמצעות Apache Spark.
לפני שמתחילים
הגדרת סשן PySpark עם תמיכה ב-Iceberg:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
קבלת היסטוריה של טבלאות Iceberg
כדי לקבל את ההיסטוריה של טבלת Iceberg, מריצים את הפקודה הבאה:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
קבלת תמונות מצב של טבלאות Iceberg
כדי לקבל תמונת מצב של טבלת Iceberg, מריצים את הפקודה הבאה:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
סוגי נתונים ופורמטים של קבצים נתמכים
סוגי הנתונים הנתמכים מוגדרים כך:
| סוג נתונים | ערכים |
|---|---|
| פרימיטיבי |
|
| מערך | ARRAY < DATA_TYPE > |
| מבנה | STRUCT < COLUMN : DATA_TYPE > |
אלה פורמטים של קבצים שנתמכים:
TEXTFILEORCPARQUETAVROJSONFILE
מידע נוסף על פורמטים של קבצים זמין במאמר בנושא פורמטים של אחסון.
אלה הפורמטים הנתמכים של שורות:
- מופרד באמצעות תו [השדות מסתיימים ב-CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]