搭配 Dataproc 使用 Trino

Trino (舊稱 Presto) 是分散式 SQL 查詢引擎,可查詢分散在一個或多個異質資料來源的大型資料集。Trino 可以透過連接器查詢 Hive、MySQL、Kafka 和其他資料來源。本教學課程將示範如何:

  • 在 Dataproc 叢集上安裝 Trino 服務
  • 從本機電腦上安裝的 Trino 用戶端查詢公開資料,該用戶端會與叢集上的 Trino 服務通訊
  • 透過 Trino Java JDBC 驅動程式,從與叢集上 Trino 服務通訊的 Java 應用程式執行查詢。

建立 Dataproc 叢集

使用 optional-components 標記 (適用於 2.1 以上版本的映像檔) 建立 Dataproc 叢集,在叢集上安裝 Trino 選用元件,並使用 enable-component-gateway 標記啟用元件閘道,以便從 Google Cloud 控制台存取 Trino 網頁版 UI。

  1. 設定環境變數:
    • PROJECT:您的專案 ID
    • BUCKET_NAME:您在「事前準備」中建立的 Cloud Storage 值區名稱
    • REGION: region 本教學課程中使用的叢集將建立於此,例如「us-west1」
    • 工作人員:建議為本教學課程安排 3 到 5 名工作人員
    export PROJECT=project-id
    export WORKERS=number
    export REGION=region
    export BUCKET_NAME=bucket-name
    
  2. 在本機電腦上執行 Google Cloud CLI,建立叢集。
    gcloud beta dataproc clusters create trino-cluster \
        --project=${PROJECT} \
        --region=${REGION} \
        --num-workers=${WORKERS} \
        --scopes=cloud-platform \
        --optional-components=TRINO \
        --image-version=2.1  \
        --enable-component-gateway
    
    不建議建立

準備資料

bigquery-public-data chicago_taxi_trips 資料集匯出至 Cloud Storage 做為 CSV 檔案,然後建立 Hive 外部資料表來參照資料。

  1. 在本機上執行下列指令,將 BigQuery 中的計程車資料匯出為不含標題的 CSV 檔案,並匯入您在「開始前」建立的 Cloud Storage bucket。
    bq --location=us extract --destination_format=CSV \
         --field_delimiter=',' --print_header=false \
           "bigquery-public-data:chicago_taxi_trips.taxi_trips" \
           gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv
    
  2. 建立 Hive 外部資料表,並以 Cloud Storage bucket 中的 CSV 和 Parquet 檔案做為備份。
    1. 建立 Hive 外部資料表 chicago_taxi_trips_csv
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_csv(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              ROW FORMAT DELIMITED
              FIELDS TERMINATED BY ','
              STORED AS TEXTFILE
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';"
      
    2. 確認 Hive 外部資料表已建立。
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;"
      
    3. 建立另一個 Hive 外部資料表 chicago_taxi_trips_parquet,使用相同的資料欄,但以 Parquet 格式儲存資料,以提升查詢效能。
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_parquet(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              STORED AS PARQUET
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';"
      
    4. 將 Hive CSV 資料表中的資料載入 Hive Parquet 資料表。
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              INSERT OVERWRITE TABLE chicago_taxi_trips_parquet
              SELECT * FROM chicago_taxi_trips_csv;"
      
    5. 確認資料已正確載入。
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
      

執行查詢

您可以透過 Trino CLI 或應用程式在本機執行查詢。

Trino CLI 查詢

本節將示範如何使用 Trino CLI 查詢 Hive Parquet 計程車資料集。

  1. 在本機電腦上執行下列指令,以使用 SSH 連結至叢集的主要節點。執行指令時,本機終端機會停止回應。
    gcloud compute ssh trino-cluster-m
    
  2. 在叢集主要節點的 SSH 終端機視窗中,執行 Trino CLI,連線至主要節點上執行的 Trino 伺服器。
    trino --catalog hive --schema default
    
  3. trino:default 提示中,確認 Trino 可以找到 Hive 表格。
    show tables;
    
    Table
    ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
     chicago_taxi_trips_csv
     chicago_taxi_trips_parquet
    (2 rows)
    
  4. trino:default 提示執行查詢,並比較查詢 Parquet 與 CSV 資料的效能。
    • 查詢 Parquet 資料
      select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
      
       _col0
      ‐‐‐‐‐‐‐‐
       117957
      (1 row)
      Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s]
    • CSV 資料查詢
      select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
      
      _col0
      ‐‐‐‐‐‐‐‐
       117957
      (1 row)
      Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]

Java 應用程式查詢

如要透過 Trino Java JDBC 驅動程式從 Java 應用程式執行查詢,請按照下列步驟操作: 1. 下載 Trino Java JDBC 驅動程式。1. 在 Maven pom.xml 中新增 trino-jdbc 依附元件。

<dependency>
  <groupId>io.trino</groupId>
  <artifactId>trino-jdbc</artifactId>
  <version>376</version>
</dependency>
Java 程式碼範例
package dataproc.codelab.trino;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
public class TrinoQuery {
  private static final String URL = "jdbc:trino://trino-cluster-m:8080/hive/default";
  private static final String SOCKS_PROXY = "localhost:1080";
  private static final String USER = "user";
  private static final String QUERY =
      "select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";
  public static void main(String[] args) {
    try {
      Properties properties = new Properties();
      properties.setProperty("user", USER);
      properties.setProperty("socksProxy", SOCKS_PROXY);
      Connection connection = DriverManager.getConnection(URL, properties);
      try (Statement stmt = connection.createStatement()) {
        ResultSet rs = stmt.executeQuery(QUERY);
        while (rs.next()) {
          int count = rs.getInt("count");
          System.out.println("The number of long trips: " + count);
        }
      }
    } catch (SQLException e) {
      e.printStackTrace();
    }
  }
}

記錄和監控

記錄

Trino 記錄位於叢集主要節點和工作站節點的 /var/log/trino/

網路使用者介面

如要在本機瀏覽器中開啟在叢集主要節點上執行的 Trino 網頁 UI,請參閱「查看及存取元件閘道網址」。

監控

Trino 會透過執行階段資料表公開叢集執行階段資訊。在 Trino 工作階段 (從 trino:default 提示字元) 中,執行下列查詢來查看執行階段資料表資料:

select * FROM system.runtime.nodes;