Halaman ini menjelaskan cara membuat cluster Managed Service for Apache Spark yang menjalankan Spark. Anda dapat menggunakan cluster ini untuk bekerja dengan metadata Dataplex Universal Catalog untuk data lake, zona, dan aset.
Ringkasan
Anda membuat cluster setelah instance layanan Dataproc Metastore dikaitkan dengan data lake Dataplex Universal Catalog untuk memastikan bahwa cluster dapat mengandalkan endpoint Hive Metastore untuk mendapatkan akses ke metadata Dataplex Universal Catalog.
Metadata yang dikelola dalam Dataplex Universal Catalog dapat diakses menggunakan antarmuka standar, seperti Hive Metastore, untuk mendukung kueri Spark. Kueri dijalankan di cluster Managed Service for Apache Spark.
Untuk data Parquet, tetapkan properti Spark spark.sql.hive.convertMetastoreParquet ke
false untuk menghindari error eksekusi. Untuk mengetahui informasi selengkapnya, lihat
Konversi tabel Parquet metastore Hive.
Membuat cluster Managed Service untuk Apache Spark
Jalankan perintah berikut untuk membuat cluster Managed Service for Apache Spark, dengan menentukan layanan Dataproc Metastore yang terkait dengan lake Dataplex Universal Catalog:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Menjelajahi metadata
Jalankan kueri DQL untuk menjelajahi metadata dan jalankan kueri Spark untuk membuat kueri data.
Sebelum memulai
Buka sesi SSH di node utama cluster Managed Service for Apache Spark.
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONEPada command prompt node utama, buka REPL Python baru.
python3
Mencantumkan database
Setiap zona Dataplex Universal Catalog dalam data lake dipetakan ke database metastore.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
Membuat daftar tabel
Mencantumkan tabel di salah satu zona.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
Data kueri
Buat kueri data di salah satu tabel.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
Membuat tabel dan partisi dalam metadata
Jalankan kueri DDL untuk membuat tabel dan partisi dalam metadata Dataplex Universal Catalog menggunakan Apache Spark.
Untuk mengetahui informasi selengkapnya tentang jenis data, format file, dan format baris yang didukung, lihat Nilai yang didukung.
Sebelum memulai
Sebelum membuat tabel, buat aset Dataplex Universal Catalog yang dipetakan ke bucket Cloud Storage yang berisi data pokok. Untuk mengetahui informasi selengkapnya, lihat Menambahkan aset.
Membuat tabel
Tabel Parquet, ORC, AVRO, CSV, dan JSON didukung.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
Mengubah tabel
Dataplex Universal Catalog tidak memungkinkan Anda mengubah lokasi tabel atau mengedit kolom partisi untuk tabel. Mengubah tabel tidak otomatis
menetapkan
userManaged
ke true.
Di Spark SQL, Anda dapat mengganti nama tabel, menambahkan kolom, dan menetapkan format file tabel.
Mengganti nama tabel
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
Tambah kolom
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
Menetapkan format file
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
Menghapus tabel
Menghapus tabel dari metadata API Katalog Universal Dataplex tidak akan menghapus data pokok di Cloud Storage.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
Menambahkan partisi
Dataplex Universal Catalog tidak mengizinkan pengubahan partisi setelah dibuat. Namun, partisi dapat dihentikan.
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN=VALUE1) PARTITION (COLUMN=VALUE2)")
df.show()
Anda dapat menambahkan beberapa partisi dari kunci partisi yang sama dan nilai partisi yang berbeda seperti yang ditunjukkan pada contoh sebelumnya.
Menjatuhkan partisi
Untuk menghapus partisi, jalankan perintah berikut:
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Membuat kueri tabel Iceberg
Anda dapat membuat kueri tabel Iceberg menggunakan Apache Spark.
Sebelum memulai
Siapkan sesi Spark SQL dengan Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Membuat tabel Iceberg
Untuk membuat tabel Iceberg, jalankan perintah berikut:
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Menjelajahi snapshot dan histori Iceberg
Anda bisa mendapatkan snapshot dan histori tabel Iceberg menggunakan Apache Spark.
Sebelum memulai
Siapkan sesi PySpark dengan dukungan Iceberg:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Mendapatkan histori tabel Iceberg
Untuk mendapatkan histori tabel Iceberg, jalankan perintah berikut:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Mendapatkan snapshot tabel Iceberg
Untuk mendapatkan snapshot tabel Iceberg, jalankan perintah berikut:
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
Jenis data dan format file yang didukung
Jenis data yang didukung didefinisikan sebagai berikut:
| Jenis data | Nilai |
|---|---|
| Primitif |
|
| Array | ARRAY < DATA_TYPE > |
| Struktur | STRUCT < COLUMN : DATA_TYPE > |
Berikut adalah format file yang didukung:
TEXTFILEORCPARQUETAVROJSONFILE
Untuk mengetahui informasi selengkapnya tentang format file, lihat Format penyimpanan.
Berikut adalah format baris yang didukung:
- DIBATASI [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
Langkah berikutnya
- Pelajari lebih lanjut cara mengelola metadata untuk lake, zona, dan aset.