Trino (früher Presto) ist eine verteilte SQL-Abfrage-Engine für die Abfrage großer Datasets, die über eine oder mehrere heterogene Datenquellen verteilt sind. Trino kann Hive, MySQL, Kafka und andere Datenquellen über Connectors abfragen. In dieser Anleitung wird Folgendes erläutert:
- Trino-Dienst in einem Dataproc-Cluster installieren
- Öffentliche Daten über einen Trino-Client abfragen, der auf Ihrem lokalen Computer installiert ist und mit einem Trino-Dienst im Cluster kommuniziert
- Abfragen mit einer Java-Anwendung ausführen, die über den Java-JDBC-Treiber für Trino mit dem Trino-Dienst im Cluster kommuniziert
Dataproc-Cluster erstellen
Erstellen Sie einen Dataproc-Cluster mit dem Flag optional-components (verfügbar ab Image-Version 2.1), um die optionale Komponente "Trino" auf dem Cluster zu installieren, und mit dem Flag enable-component-gateway, um Component Gateway zu aktivieren, damit Sie über die Google Cloud -Konsole auf die Trino-Web-UI zugreifen können.
- Umgebungsvariablen festlegen:
- PROJECT: Ihre Projekt-ID
- BUCKET_NAME: Der Name des Cloud Storage-Buckets, den Sie unter Hinweise erstellt haben
- REGION: Die Region, in der der Cluster für diese Anleitung erstellt wird, z. B. "us-west1"
- WORKERS: Für diese Anleitung werden drei bis fünf Worker empfohlen.
export PROJECT=project-id export WORKERS=number export REGION=region export BUCKET_NAME=bucket-name
- Führen Sie die Google Cloud CLI auf Ihrem lokalen Computer aus, um den Cluster zu erstellen.
gcloud beta dataproc clusters create trino-cluster \ --project=${PROJECT} \ --region=${REGION} \ --num-workers=${WORKERS} \ --scopes=cloud-platform \ --optional-components=TRINO \ --image-version=2.1 \ --enable-component-gateway
Daten vorbereiten
Exportieren Sie das bigquery-public-data-Dataset chicago_taxi_trips als CSV-Dateien in Cloud Storage und erstellen Sie dann eine externe Hive-Tabelle, um auf die Daten zu verweisen.
- Führen Sie den folgenden Befehl auf Ihrem lokalen Computer aus, um die Taxidaten aus BigQuery als CSV-Dateien ohne Header in den Cloud Storage-Bucket zu importieren, den Sie unter Vorbereitung erstellt haben.
bq --location=us extract --destination_format=CSV \ --field_delimiter=',' --print_header=false \ "bigquery-public-data:chicago_taxi_trips.taxi_trips" \ gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv - Erstellen Sie externe Hive-Tabellen, die von den CSV- und Parquet-Dateien im Cloud Storage-Bucket unterstützt werden.
- Erstellen Sie die externe Hive-Tabelle
chicago_taxi_trips_csv.gcloud dataproc jobs submit hive \ --cluster trino-cluster \ --region=${REGION} \ --execute " CREATE EXTERNAL TABLE chicago_taxi_trips_csv( unique_key STRING, taxi_id STRING, trip_start_timestamp TIMESTAMP, trip_end_timestamp TIMESTAMP, trip_seconds INT, trip_miles FLOAT, pickup_census_tract INT, dropoff_census_tract INT, pickup_community_area INT, dropoff_community_area INT, fare FLOAT, tips FLOAT, tolls FLOAT, extras FLOAT, trip_total FLOAT, payment_type STRING, company STRING, pickup_latitude FLOAT, pickup_longitude FLOAT, pickup_location STRING, dropoff_latitude FLOAT, dropoff_longitude FLOAT, dropoff_location STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';" - Prüfen Sie die Erstellung der externen Hive-Tabelle.
gcloud dataproc jobs submit hive \ --cluster trino-cluster \ --region=${REGION} \ --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;" - Erstellen Sie eine weitere externe Hive-Tabelle
chicago_taxi_trips_parquetmit denselben Spalten, aber mit Daten im Parquet-Format, um die Abfrageleistung zu verbessern.gcloud dataproc jobs submit hive \ --cluster trino-cluster \ --region=${REGION} \ --execute " CREATE EXTERNAL TABLE chicago_taxi_trips_parquet( unique_key STRING, taxi_id STRING, trip_start_timestamp TIMESTAMP, trip_end_timestamp TIMESTAMP, trip_seconds INT, trip_miles FLOAT, pickup_census_tract INT, dropoff_census_tract INT, pickup_community_area INT, dropoff_community_area INT, fare FLOAT, tips FLOAT, tolls FLOAT, extras FLOAT, trip_total FLOAT, payment_type STRING, company STRING, pickup_latitude FLOAT, pickup_longitude FLOAT, pickup_location STRING, dropoff_latitude FLOAT, dropoff_longitude FLOAT, dropoff_location STRING) STORED AS PARQUET location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';" - Laden Sie die Daten aus der Hive-CSV-Tabelle in die Hive Parquet-Tabelle.
gcloud dataproc jobs submit hive \ --cluster trino-cluster \ --region=${REGION} \ --execute " INSERT OVERWRITE TABLE chicago_taxi_trips_parquet SELECT * FROM chicago_taxi_trips_csv;" - Prüfen Sie, ob die Daten korrekt geladen wurden.
gcloud dataproc jobs submit hive \ --cluster trino-cluster \ --region=${REGION} \ --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
- Erstellen Sie die externe Hive-Tabelle
Abfragen ausführen
Sie können Abfragen lokal über die Trino-Befehlszeile oder über eine Anwendung ausführen.
Trino-Befehlszeilenabfragen
In diesem Abschnitt wird gezeigt, wie Sie das Hive Parquet-Taxi-Dataset mithilfe der Trino-Befehlszeile abfragen.
- Führen Sie den folgenden Befehl auf Ihrem lokalen Computer aus, um eine SSH-Verbindung zum Masterknoten des Clusters herzustellen. Das lokale Terminal reagiert während der Ausführung des Befehls nicht mehr.
gcloud compute ssh trino-cluster-m
- Führen Sie im SSH-Terminalfenster auf dem Masterknoten des Clusters die Trino-Befehlszeile aus, die eine Verbindung zum Trino-Server herstellt, der auf dem Masterknoten ausgeführt wird.
trino --catalog hive --schema default
- Prüfen Sie an der Eingabeaufforderung
trino:default, ob Trino die Hive-Tabellen finden kann.show tables;
Table ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐ chicago_taxi_trips_csv chicago_taxi_trips_parquet (2 rows)
- Führen Sie Abfragen an der Eingabeaufforderung
trino:defaultaus und vergleichen Sie die Leistung von Parquet-Datenabfragen mit CSV-Datenabfragen.- Parquet-Datenabfrage
select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
_col0 ‐‐‐‐‐‐‐‐ 117957 (1 row)
Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s] - CSV-Datenabfrage
select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
_col0 ‐‐‐‐‐‐‐‐ 117957 (1 row)
Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]
- Parquet-Datenabfrage
Java-Anwendungsabfragen
So führen Sie Abfragen mit einer Java-Anwendung über den Java-JDBC-Treiber für Trino aus:
1. Laden Sie den Java-JDBC-Treiber für Trino herunter.
1. Fügen Sie eine trino-jdbc-Abhängigkeit in die Maven-pom.xml ein.
<dependency> <groupId>io.trino</groupId> <artifactId>trino-jdbc</artifactId> <version>376</version> </dependency>
package dataproc.codelab.trino;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
public class TrinoQuery {
private static final String URL = "jdbc:trino://trino-cluster-m:8080/hive/default";
private static final String SOCKS_PROXY = "localhost:1080";
private static final String USER = "user";
private static final String QUERY =
"select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";
public static void main(String[] args) {
try {
Properties properties = new Properties();
properties.setProperty("user", USER);
properties.setProperty("socksProxy", SOCKS_PROXY);
Connection connection = DriverManager.getConnection(URL, properties);
try (Statement stmt = connection.createStatement()) {
ResultSet rs = stmt.executeQuery(QUERY);
while (rs.next()) {
int count = rs.getInt("count");
System.out.println("The number of long trips: " + count);
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}Logging und Monitoring
Logging
Die Trino-Logs befinden sich auf den Master- und Worker-Knoten des Clusters unter /var/log/trino/.
Web-UI
Unter Component Gateway-URLs ansehen und auf diese zugreifen erfahren Sie, wie Sie die auf dem Masterknoten des Clusters ausgeführte Trino-Web-UI in Ihrem lokalen Browser öffnen.
Monitoring
Trino stellt Informationen zur Clusterlaufzeit über Laufzeittabellen bereit.
Führen Sie in einer Trino-Sitzung an der Eingabeaufforderung trino:default die folgende Abfrage aus, um die Daten der Laufzeittabelle anzuzeigen:
select * FROM system.runtime.nodes;