Use Trino with Dataproc

Trino (formerly Presto) is a distributed SQL query engine designed to query large data sets distributed over one or more heterogeneous data sources. Trino can query Hive, MySQL, Kafka and other data sources through connectors. This tutorial shows you how to:

  • Install the Trino service on a Dataproc cluster
  • Query public data from a Trino client installed on your local machine that communicates with a Trino service on your cluster
  • Run queries from a Java application that communicates with the Trino service on your cluster through the Trino Java JDBC driver.

Create a Dataproc cluster

Create a Dataproc cluster using the optional-components flag (available on image version 2.1 and later) to install the Trino optional component on the cluster and the enable-component-gateway flag to enable the Component Gateway to allow you to access the Trino Web UI from the Google Cloud console.

  1. Set environment variables:
    • PROJECT: your project ID
    • BUCKET_NAME: the name of the Cloud Storage bucket you created in Before you begin
    • REGION: region where the cluster used in this tutorial will be created, for example, "us-west1"
    • WORKERS: 3 - 5 workers are recommended for this tutorial
    export PROJECT=project-id
    export WORKERS=number
    export REGION=region
    export BUCKET_NAME=bucket-name
    
  2. Run the Google Cloud CLI on your local machine to create the cluster.
    gcloud beta dataproc clusters create trino-cluster \
        --project=${PROJECT} \
        --region=${REGION} \
        --num-workers=${WORKERS} \
        --scopes=cloud-platform \
        --optional-components=TRINO \
        --image-version=2.1  \
        --enable-component-gateway
    

Prepare data

Export the bigquery-public-data chicago_taxi_trips dataset to Cloud Storage as CSV files, then create a Hive external table to reference the data.

  1. On your local machine, run the following command to import the taxi data from BigQuery as CSV files without headers into the Cloud Storage bucket you created in Before you begin.
    bq --location=us extract --destination_format=CSV \
         --field_delimiter=',' --print_header=false \
           "bigquery-public-data:chicago_taxi_trips.taxi_trips" \
           gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv
    
  2. Create Hive external tables that are backed by the CSV and Parquet files in your Cloud Storage bucket.
    1. Create the Hive external table chicago_taxi_trips_csv.
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_csv(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              ROW FORMAT DELIMITED
              FIELDS TERMINATED BY ','
              STORED AS TEXTFILE
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';"
      
    2. Verify the creation of the Hive external table.
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;"
      
    3. Create another Hive external table chicago_taxi_trips_parquet with the same columns, but with data stored in Parquetformat for better query performance.
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_parquet(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              STORED AS PARQUET
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';"
      
    4. Load the data from the Hive CSV table into the Hive Parquet table.
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              INSERT OVERWRITE TABLE chicago_taxi_trips_parquet
              SELECT * FROM chicago_taxi_trips_csv;"
      
    5. Verify that the data loaded correctly.
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
      

Run queries

You can run queries locally from the Trino CLI or from an application.

Trino CLI queries

This section demonstrates how to query the Hive Parquet taxi dataset using the Trino CLI.

  1. Run the following command on your local machine to SSH into your cluster's master node. The local terminal will stop responding during the execution of the command.
    gcloud compute ssh trino-cluster-m
    
  2. In SSH terminal window on your cluster's master node, run the Trino CLI, which connects to the Trino server running on the master node.
    trino --catalog hive --schema default
    
  3. At the trino:default prompt, verify that Trino can find the Hive tables.
    show tables;
    
    Table
    ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
     chicago_taxi_trips_csv
     chicago_taxi_trips_parquet
    (2 rows)
    
  4. Run queries from the trino:default prompt, and compare the performance of querying Parquet versus CSV data.
    • Parquet data query
      select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
      
       _col0
      ‐‐‐‐‐‐‐‐
       117957
      (1 row)
      Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s]
    • CSV data query
      select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
      
      _col0
      ‐‐‐‐‐‐‐‐
       117957
      (1 row)
      Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]

Java application queries

To run queries from a Java application through the Trino Java JDBC driver: 1. Download the Trino Java JDBC driver. 1. Add a trino-jdbc dependency in Maven pom.xml.

<dependency>
  <groupId>io.trino</groupId>
  <artifactId>trino-jdbc</artifactId>
  <version>376</version>
</dependency>
Sample Java code
package dataproc.codelab.trino;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
public class TrinoQuery {
  private static final String URL = "jdbc:trino://trino-cluster-m:8080/hive/default";
  private static final String SOCKS_PROXY = "localhost:1080";
  private static final String USER = "user";
  private static final String QUERY =
      "select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";
  public static void main(String[] args) {
    try {
      Properties properties = new Properties();
      properties.setProperty("user", USER);
      properties.setProperty("socksProxy", SOCKS_PROXY);
      Connection connection = DriverManager.getConnection(URL, properties);
      try (Statement stmt = connection.createStatement()) {
        ResultSet rs = stmt.executeQuery(QUERY);
        while (rs.next()) {
          int count = rs.getInt("count");
          System.out.println("The number of long trips: " + count);
        }
      }
    } catch (SQLException e) {
      e.printStackTrace();
    }
  }
}

Logging and monitoring

Logging

The Trino logs are located at /var/log/trino/ on the cluster's master and worker nodes.

Web UI

See Viewing and Accessing Component Gateway URLs to open the Trino Web UI running on the cluster's master node in your local browser.

Monitoring

Trino exposes cluster runtime information through runtime tables. In a Trino session (from the trino:default) prompt, run the following query to view runtime table data:

select * FROM system.runtime.nodes;