Use o Trino com o Dataproc

O Trino (anteriormente Presto) é um motor de consultas SQL distribuído concebido para consultar grandes conjuntos de dados distribuídos por uma ou mais origens de dados heterogéneas. O Trino pode consultar o Hive, o MySQL, o Kafka e outras origens de dados através de conetores. Este tutorial mostra como:

  • Instale o serviço Trino num cluster do Dataproc
  • Consultar dados públicos a partir de um cliente Trino instalado na sua máquina local que comunica com um serviço Trino no seu cluster
  • Execute consultas a partir de uma aplicação Java que comunica com o serviço Trino no seu cluster através do controlador JDBC Java do Trino.

Crie um cluster do Dataproc

Crie um cluster do Dataproc com a flag optional-components (disponível na versão 2.1 e posterior da imagem) para instalar o componente opcional do Trino no cluster e a flag enable-component-gateway para ativar o Component Gateway, o que lhe permite aceder à IU Web do Trino a partir da Google Cloud consola.

  1. Defina variáveis de ambiente:
    • PROJETO: o seu ID do projeto
    • BUCKET_NAME: o nome do contentor do Cloud Storage que criou em Antes de começar
    • REGION: region onde o cluster usado neste tutorial vai ser criado, por exemplo, "us-west1"
    • TRABALHADORES: são recomendados 3 a 5 trabalhadores para este tutorial
    export PROJECT=project-id
    export WORKERS=number
    export REGION=region
    export BUCKET_NAME=bucket-name
    
  2. Execute a CLI Google Cloud na sua máquina local para criar o cluster.
    gcloud beta dataproc clusters create trino-cluster \
        --project=${PROJECT} \
        --region=${REGION} \
        --num-workers=${WORKERS} \
        --scopes=cloud-platform \
        --optional-components=TRINO \
        --image-version=2.1  \
        --enable-component-gateway
    

Prepare os dados

Exporte o conjunto de dados bigquery-public-data chicago_taxi_trips para o Cloud Storage como ficheiros CSV e, em seguida, crie uma tabela externa do Hive para referenciar os dados.

  1. Na sua máquina local, execute o seguinte comando para importar os dados de táxi do BigQuery como ficheiros CSV sem cabeçalhos para o contentor do Cloud Storage que criou em Antes de começar.
    bq --location=us extract --destination_format=CSV \
         --field_delimiter=',' --print_header=false \
           "bigquery-public-data:chicago_taxi_trips.taxi_trips" \
           gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv
    
  2. Crie tabelas externas do Hive com base nos ficheiros CSV e Parquet no seu contentor do Cloud Storage.
    1. Crie a tabela externa do Hive chicago_taxi_trips_csv.
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_csv(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              ROW FORMAT DELIMITED
              FIELDS TERMINATED BY ','
              STORED AS TEXTFILE
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';"
      
    2. Verifique a criação da tabela externa do Hive.
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;"
      
    3. Crie outra tabela externa do Hive chicago_taxi_trips_parquet com as mesmas colunas, mas com dados armazenados no formato Parquet para um melhor desempenho das consultas.
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_parquet(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              STORED AS PARQUET
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';"
      
    4. Carregue os dados da tabela CSV do Hive para a tabela Parquet do Hive.
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              INSERT OVERWRITE TABLE chicago_taxi_trips_parquet
              SELECT * FROM chicago_taxi_trips_csv;"
      
    5. Verifique se os dados foram carregados corretamente.
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
      

Executar consultas

Pode executar consultas localmente a partir da CLI do Trino ou de uma aplicação.

Consultas da CLI do Trino

Esta secção demonstra como consultar o conjunto de dados de táxis do Hive Parquet através da CLI do Trino.

  1. Execute o seguinte comando na sua máquina local para estabelecer uma ligação SSH ao nó principal do cluster. O terminal local deixa de responder durante a execução do comando.
    gcloud compute ssh trino-cluster-m
    
  2. Na janela do terminal SSH no nó principal do cluster, execute a CLI do Trino, que se liga ao servidor Trino em execução no nó principal.
    trino --catalog hive --schema default
    
  3. No comando trino:default, verifique se o Trino consegue encontrar as tabelas do Hive.
    show tables;
    
    Table
    ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
     chicago_taxi_trips_csv
     chicago_taxi_trips_parquet
    (2 rows)
    
  4. Execute consultas a partir do comando trino:default e compare o desempenho da consulta de dados Parquet com dados CSV.
    • Consulta de dados Parquet
      select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
      
       _col0
      ‐‐‐‐‐‐‐‐
       117957
      (1 row)
      Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s]
    • Consulta de dados CSV
      select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
      
      _col0
      ‐‐‐‐‐‐‐‐
       117957
      (1 row)
      Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]

Consultas de aplicações Java

Para executar consultas a partir de uma aplicação Java através do controlador JDBC Java do Trino: 1. Transfira o controlador JDBC Java do Trino. 1. Adicione uma dependência trino-jdbc no ficheiro Maven pom.xml.

<dependency>
  <groupId>io.trino</groupId>
  <artifactId>trino-jdbc</artifactId>
  <version>376</version>
</dependency>
Exemplo de código Java
package dataproc.codelab.trino;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
public class TrinoQuery {
  private static final String URL = "jdbc:trino://trino-cluster-m:8080/hive/default";
  private static final String SOCKS_PROXY = "localhost:1080";
  private static final String USER = "user";
  private static final String QUERY =
      "select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";
  public static void main(String[] args) {
    try {
      Properties properties = new Properties();
      properties.setProperty("user", USER);
      properties.setProperty("socksProxy", SOCKS_PROXY);
      Connection connection = DriverManager.getConnection(URL, properties);
      try (Statement stmt = connection.createStatement()) {
        ResultSet rs = stmt.executeQuery(QUERY);
        while (rs.next()) {
          int count = rs.getInt("count");
          System.out.println("The number of long trips: " + count);
        }
      }
    } catch (SQLException e) {
      e.printStackTrace();
    }
  }
}

Registo e monitorização

Registo

Os registos do Trino encontram-se em /var/log/trino/ nos nós principais e de trabalho do cluster.

IU da Web

Consulte o artigo Ver e aceder aos URLs do gateway de componentes para abrir a IU Web do Trino em execução no nó principal do cluster no seu navegador local.

Monitorização

O Trino expõe informações de tempo de execução do cluster através de tabelas de tempo de execução. Numa sessão do Trino (a partir do comando trino:default), execute a seguinte consulta para ver os dados da tabela de tempo de execução:

select * FROM system.runtime.nodes;