在 Apache Spark 中存取中繼資料

本頁說明如何建立執行 Spark 的 Managed Service for Apache Spark 叢集。您可以使用這個叢集處理湖泊、區域和資產的 Knowledge Catalog (舊稱 Dataplex Universal Catalog) 中繼資料。

總覽

將 Dataproc Metastore 服務執行個體與 Knowledge Catalog 湖泊建立關聯後,您就可以建立叢集,確保叢集能依賴 Hive Metastore 端點存取 Knowledge Catalog 中繼資料。

您可以使用標準介面 (例如 Hive Metastore) 存取 Knowledge Catalog 中管理的中繼資料,以支援 Spark 查詢。查詢會在 Managed Service for Apache Spark 叢集上執行。

如果是 Parquet 資料,請將 Spark 屬性 spark.sql.hive.convertMetastoreParquet 設為 false,以免發生執行錯誤。詳情請參閱 Hive Metastore Parquet 資料表轉換

建立 Managed Service for Apache Spark 叢集

執行下列指令,建立 Managed Service for Apache Spark 叢集,並指定與 Knowledge Catalog Lake 相關聯的 Dataproc Metastore 服務:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

探索中繼資料

執行 DQL 查詢來探索中繼資料,並執行 Spark 查詢來查詢資料。

事前準備

  1. 在 Managed Service for Apache Spark 叢集的主要節點上開啟 SSH 工作階段。

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. 在主要節點的命令提示字元中,開啟新的 Python REPL。

    python3
    

可列出資料庫

湖泊中的每個 Knowledge Catalog 區域都會對應至中繼存放區資料庫。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

列出表格

列出其中一個區域中的資料表。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

查詢資料

查詢其中一個資料表中的資料。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

在中繼資料中建立資料表和分區

使用 Apache Spark 執行 DDL 查詢,在 Knowledge Catalog 中建立資料表和分區。

如要進一步瞭解支援的資料類型、檔案格式和列格式,請參閱「支援的值」。

事前準備

建立資料表前,請先建立對應至 Cloud Storage bucket 的 Knowledge Catalog 資產,該 bucket 包含基礎資料。詳情請參閱「新增資產」。

建立資料表

支援 Parquet、ORC、AVRO、CSV 和 JSON 資料表。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

變更資料表

知識目錄不允許變更資料表的位置,或編輯資料表的分區欄。變更表格不會自動將 userManaged 設為 true

在 Spark SQL 中,您可以重新命名資料表新增資料欄,以及設定資料表的檔案格式

重新命名資料表

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

新增欄

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

設定檔案格式

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

捨棄資料表

從 Dataplex 中繼資料 API 捨棄資料表,不會刪除 Cloud Storage 中的基礎資料。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

新增分區

知識目錄不允許在建立分割區後進行變更。不過,可以捨棄分區。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN=VALUE1) PARTITION (COLUMN=VALUE2)")
  df.show()

如上例所示,您可以新增多個相同分割區鍵和不同分割區值的分割區。

捨棄分區

如要捨棄分割區,請執行下列指令:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

查詢 Iceberg 資料表

您可以使用 Apache Spark 查詢 Iceberg 資料表。

事前準備

使用 Iceberg 設定 Spark SQL 工作階段。

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

建立 Iceberg 資料表

如要建立 Iceberg 資料表,請執行下列指令:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

探索 Iceberg 快照和記錄

您可以使用 Apache Spark 取得 Iceberg 資料表的快照和記錄。

事前準備

設定支援 Iceberg 的 PySpark 工作階段:

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

取得 Iceberg 資料表的記錄

如要取得 Iceberg 資料表的記錄,請執行下列指令:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

取得 Iceberg 資料表的快照

如要取得 Iceberg 資料表的快照,請執行下列指令:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

支援的資料類型和檔案格式

支援的資料類型定義如下:

資料類型
原始
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
陣列 ARRAY < DATA_TYPE >
結構 STRUCT < COLUMN : DATA_TYPE >

支援的檔案格式如下:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

如要進一步瞭解檔案格式,請參閱「儲存格式」。

支援的列格式如下:

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

後續步驟