Dataproc で Trino を使用する

Trino(旧称 Presto)は、1 つ以上の異種混合データソースに分散された大規模なデータセットをクエリするために設計された分散 SQL クエリエンジンです。Trino は、コネクタを介して Hive、MySQL、Kafka などのデータソースに対してクエリを実行できます。このチュートリアルでは、次の方法について説明します。

  • Dataproc クラスタに Trino サービスをインストールする
  • クラスタ上の Trino サービスと通信するローカルマシンにインストールされた Trino クライアントから一般公開データをクエリする
  • Trino Java JDBC ドライバを介してクラスタ上の Trino サービスと通信する Java アプリケーションからクエリを実行する

Dataproc クラスタを作成する

optional-components フラグ(イメージ バージョン 2.1 以降で使用可能)を使用して、Dataproc クラスタを作成し、クラスタに Trino オプション コンポーネントをインストールします。また、enable-component-gateway フラグを使用して、コンポーネント ゲートウェイを有効にし、 Google Cloud コンソールから Trino ウェブ UI にアクセスできるようにします。

  1. 環境変数を設定します。
    • PROJECT: プロジェクト ID
    • BUCKET_NAME: 始める前にで作成した Cloud Storage バケットの名前。
    • REGION: このチュートリアルで使用するクラスタが作成されるリージョン。例: 「us-west1」
    • WORKERS: このチュートリアルでは 3~5 人のワーカーをおすすめします。
    export PROJECT=project-id
    export WORKERS=number
    export REGION=region
    export BUCKET_NAME=bucket-name
    
  2. ローカルマシンで Google Cloud CLI を実行して、クラスタを作成します。
    gcloud beta dataproc clusters create trino-cluster \
        --project=${PROJECT} \
        --region=${REGION} \
        --num-workers=${WORKERS} \
        --scopes=cloud-platform \
        --optional-components=TRINO \
        --image-version=2.1  \
        --enable-component-gateway
    

データを準備する

bigquery-public-datachicago_taxi_trips データセットを CSV ファイルとして Cloud Storage にエクスポートし、データを参照する Hive 外部テーブルを作成します。

  1. ローカルマシンで次のコマンドを実行して、始める前にで作成した Cloud Storage バケットに、BigQuery の taxi データをヘッダーなしの CSV ファイルとしてインポートします。
    bq --location=us extract --destination_format=CSV \
         --field_delimiter=',' --print_header=false \
           "bigquery-public-data:chicago_taxi_trips.taxi_trips" \
           gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv
    
  2. Cloud Storage バケット内の CSV ファイルと Parquet ファイルに基づいて Hive 外部テーブルを作成します。
    1. Hive 外部テーブル chicago_taxi_trips_csv を作成します。
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_csv(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              ROW FORMAT DELIMITED
              FIELDS TERMINATED BY ','
              STORED AS TEXTFILE
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';"
      
    2. Hive 外部テーブルの作成を確認します。
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;"
      
    3. 別の Hive 外部テーブル chicago_taxi_trips_parquet を作成します。列は同じにしますが、クエリ パフォーマンスを向上させるために、データは Parquet 形式で保存されます。
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_parquet(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              STORED AS PARQUET
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';"
      
    4. Hive CSV テーブルから Hive Parquet テーブルにデータを読み込みます。
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "
              INSERT OVERWRITE TABLE chicago_taxi_trips_parquet
              SELECT * FROM chicago_taxi_trips_csv;"
      
    5. データが正しく読み込まれたことを確認します。
      gcloud dataproc jobs submit hive \
          --cluster trino-cluster \
          --region=${REGION} \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
      

クエリを実行する

クエリは、Trino CLI またはアプリケーションからローカルで実行できます。

Trino CLI のクエリ

このセクションでは、Trino CLI を使用して Hive Parquet taxi データセットをクエリする方法を説明します。

  1. ローカルマシンで次のコマンドを実行して、クラスタのマスターノードに SSH 接続します。コマンドの実行中、ローカル ターミナルは応答しなくなります。
    gcloud compute ssh trino-cluster-m
    
  2. クラスタのマスターノードの SSH ターミナル ウィンドウで、マスターノードで実行されている Trino サーバーに接続する Trino CLI を実行します。
    trino --catalog hive --schema default
    
  3. trino:default プロンプトで、Trino で Hive テーブルの検出が可能なことを確認します。
    show tables;
    
    Table
    ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
     chicago_taxi_trips_csv
     chicago_taxi_trips_parquet
    (2 rows)
    
  4. trino:default プロンプトからクエリを実行し、Parquet と CSV データのクエリのパフォーマンスを比較します。
    • Parquet データのクエリ
      select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
      
       _col0
      ‐‐‐‐‐‐‐‐
       117957
      (1 row)
      Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s]
    • CSV データのクエリ
      select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
      
      _col0
      ‐‐‐‐‐‐‐‐
       117957
      (1 row)
      Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]

Java アプリケーションのクエリ

Trino Java JDBC ドライバを介して Java アプリケーションからクエリを実行する方法は次のとおりです。 1. Trino Java JDBC ドライバをダウンロードします。 1. Maven pom.xml にある trino-jdbc 依存関係を追加します。

<dependency>
  <groupId>io.trino</groupId>
  <artifactId>trino-jdbc</artifactId>
  <version>376</version>
</dependency>
Java コードのサンプル
package dataproc.codelab.trino;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
public class TrinoQuery {
  private static final String URL = "jdbc:trino://trino-cluster-m:8080/hive/default";
  private static final String SOCKS_PROXY = "localhost:1080";
  private static final String USER = "user";
  private static final String QUERY =
      "select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";
  public static void main(String[] args) {
    try {
      Properties properties = new Properties();
      properties.setProperty("user", USER);
      properties.setProperty("socksProxy", SOCKS_PROXY);
      Connection connection = DriverManager.getConnection(URL, properties);
      try (Statement stmt = connection.createStatement()) {
        ResultSet rs = stmt.executeQuery(QUERY);
        while (rs.next()) {
          int count = rs.getInt("count");
          System.out.println("The number of long trips: " + count);
        }
      }
    } catch (SQLException e) {
      e.printStackTrace();
    }
  }
}

ロギングとモニタリング

ロギング

Trino のログは、クラスタのマスターノードとワーカーノードの /var/log/trino/ にあります。

ウェブ UI

クラスタのマスターノード上で実行されている Trino ウェブ UI をローカル ブラウザで開くには、コンポーネント ゲートウェイの URL を表示してアクセスするをご覧ください。

モニタリング

Trino は、ランタイム テーブルを介してクラスタのランタイム情報を公開します。(trino:default からの)Trino セッションのプロンプトで、次のクエリを実行してランタイム テーブルのデータを表示します。

select * FROM system.runtime.nodes;