Auf Metadaten in Apache Spark zugreifen

Auf dieser Seite wird beschrieben, wie Sie einen Dataproc-Cluster erstellen, auf dem Spark ausgeführt wird. Mit diesem Cluster können Sie mit Dataplex Universal Catalog-Metadaten für Lakes, Zonen und Assets arbeiten.

Übersicht

Sie erstellen einen Cluster, nachdem die Dataproc Metastore-Dienstinstanz dem Dataplex Universal Catalog-Lake zugeordnet wurde. So kann der Cluster den Hive-Metastore-Endpunkt verwenden, um auf Dataplex Universal Catalog-Metadaten zuzugreifen.

Auf Metadaten, die in Dataplex Universal Catalog verwaltet werden, kann über Standardschnittstellen wie Hive Metastore zugegriffen werden, um Spark-Abfragen auszuführen. Die Abfragen werden im Dataproc-Cluster ausgeführt.

Legen Sie für Parquet-Daten die Spark-Eigenschaft spark.sql.hive.convertMetastoreParquet auf false fest, um Ausführungsfehler zu vermeiden. Weitere Informationen finden Sie unter Konvertierung von Hive-Metastore-Parquet-Tabellen.

Dataproc-Cluster erstellen

Führen Sie die folgenden Befehle aus, um einen Dataproc-Cluster zu erstellen und den Dataproc Metastore-Dienst anzugeben, der dem Dataplex Universal Catalog-Lake zugeordnet ist:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Metadaten untersuchen

Führen Sie DQL-Abfragen aus, um die Metadaten zu untersuchen, und Spark-Abfragen, um Daten abzufragen.

Hinweis

  1. Öffnen Sie eine SSH-Sitzung auf dem primären Knoten des Dataproc-Clusters.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. Öffnen Sie an der Eingabeaufforderung des primären Knotens eine neue Python-REPL.

    python3
    

Datenbanken auflisten

Jede Dataplex Universal Catalog-Zone im Lake wird einer Metastore-Datenbank zugeordnet.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Tabellen auflisten

Tabellen in einer der Zonen auflisten

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Daten abfragen

Daten in einer der Tabellen abfragen

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Tabellen und Partitionen in Metadaten erstellen

DDL-Abfragen ausführen, um Tabellen und Partitionen in Dataplex Universal Catalog-Metadaten mit Apache Spark zu erstellen.

Weitere Informationen zu den unterstützten Datentypen, Dateiformaten und Zeilenformaten finden Sie unter Unterstützte Werte.

Hinweis

Bevor Sie eine Tabelle erstellen, müssen Sie ein Dataplex Universal Catalog-Asset erstellen, das dem Cloud Storage-Bucket mit den zugrunde liegenden Daten zugeordnet ist. Weitere Informationen finden Sie unter Asset hinzufügen.

Tabelle erstellen

Parquet-, ORC-, AVRO-, CSV- und JSON-Tabellen werden unterstützt.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Tabelle ändern

In Dataplex Universal Catalog können Sie den Speicherort einer Tabelle nicht ändern und die Partitionsspalten einer Tabelle nicht bearbeiten. Wenn Sie eine Tabelle ändern, wird userManaged nicht automatisch auf true gesetzt.

In Spark SQL können Sie eine Tabelle umbenennen, Spalten hinzufügen und das Dateiformat einer Tabelle festlegen.

Tabelle umbenennen

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Spalten hinzufügen

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Dateiformat festlegen

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Tabelle löschen

Wenn Sie eine Tabelle über die Dataplex Universal Catalog-Metadaten-API löschen, werden die zugrunde liegenden Daten in Cloud Storage nicht gelöscht.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Partition hinzufügen

In Dataplex Universal Catalog können Partitionen nach dem Erstellen nicht mehr geändert werden. Die Partition kann jedoch gelöscht werden.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN=VALUE1) PARTITION (COLUMN=VALUE2)")
  df.show()

Sie können mehrere Partitionen mit demselben Partitionsschlüssel und unterschiedlichen Partitionswerten hinzufügen, wie im vorherigen Beispiel gezeigt.

Partition löschen

Führen Sie den folgenden Befehl aus, um eine Partition zu löschen:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Iceberg-Tabellen abfragen

Sie können Iceberg-Tabellen mit Apache Spark abfragen.

Hinweis

Spark SQL-Sitzung mit Iceberg einrichten

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Iceberg-Tabelle erstellen

Führen Sie den folgenden Befehl aus, um eine Iceberg-Tabelle zu erstellen:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Iceberg-Snapshot und -Verlauf ansehen

Sie können Snapshots und den Verlauf von Iceberg-Tabellen mit Apache Spark abrufen.

Hinweis

Richten Sie eine PySpark-Sitzung mit Iceberg-Unterstützung ein:

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Verlauf von Iceberg-Tabellen abrufen

Führen Sie den folgenden Befehl aus, um den Verlauf einer Iceberg-Tabelle abzurufen:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Snapshots von Iceberg-Tabellen erstellen

Führen Sie den folgenden Befehl aus, um einen Snapshot einer Iceberg-Tabelle zu erstellen:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Unterstützte Datentypen und Dateiformate

Die unterstützten Datentypen sind so definiert:

Datentyp Werte
Primitiv
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Array ARRAY < DATA_TYPE >
Struktur STRUCT < COLUMN : DATA_TYPE >

Folgende Dateiformate werden unterstützt:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Weitere Informationen zu den Dateiformaten finden Sie unter Speicherformate.

Die folgenden Zeilenformate werden unterstützt:

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

Nächste Schritte