将 Trino 与 Managed Service for Apache Spark 搭配使用

Trino(原 Presto)是一个分布式 SQL 查询引擎,旨在查询分布在一个或多个异构数据源上的大型数据集。Trino 可以通过连接器查询 Hive、MySQL、Kafka 及其他数据源。本教程将介绍如何执行以下操作:

  • 在 Managed Service for Apache Spark 集群上安装 Trino 服务
  • 从安装在本地机器上的 Trino 客户端查询公共数据,该客户端与集群上的 Trino 服务通信
  • 通过 Trino Java JDBC 驱动程序,从与集群上的 Trino 服务通信的 Java 应用运行查询。

目标

  • 创建安装了 Trino 的 Managed Service for Apache Spark 集群
  • 准备数据。本教程使用 BigQuery 中提供的 Chicago Taxi Trips(芝加哥出租车行程)公共数据集。
    1. 从 BigQuery 中提取数据
    2. 以 CSV 文件的形式将数据加载到 Cloud Storage
    3. 转换数据:
      1. 将数据公开为 Hive 外部表,以便 Trino 可以查询这些数据
      2. 将 CSV 格式的数据转换为 Parquet 格式,以加快查询速度
  • 通过 SSH 隧道或 Trino JDBC 驱动程序,分别向集群上运行的 Trino 协调器发送 Trino CLI 查询或应用代码查询
  • 通过 Trino 网页界面检查日志并监控 Trino 服务
  • 费用

    在本文档中,您将使用 Google Cloud的以下收费组件:

    如需根据您的预计使用情况来估算费用,请使用价格计算器

    新 Google Cloud 用户可能有资格申请免费试用

    准备工作

    如果您尚未完成此操作,请创建 Google Cloud 项目和 Cloud Storage 存储桶,以保存此教程中使用的数据。1. 设置项目
    1. 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    3. Verify that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, Cloud Storage, and BigQuery APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    5. 安装 Google Cloud CLI。

    6. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

    7. 如需初始化 gcloud CLI,请运行以下命令:

      gcloud init
    8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    9. Verify that billing is enabled for your Google Cloud project.

    10. Enable the Dataproc, Compute Engine, Cloud Storage, and BigQuery APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    11. 安装 Google Cloud CLI。

    12. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

    13. 如需初始化 gcloud CLI,请运行以下命令:

      gcloud init
    1. 在项目中创建 Cloud Storage 存储桶,以保存此教程中使用的数据。
    1. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区页面。

      进入“存储桶”

    2. 点击 创建
    3. 创建存储桶页面上,输入您的存储桶信息。要转到下一步,请点击继续
      1. 开始使用部分中,执行以下操作:
        • 输入符合存储桶命名要求的全局唯一的名称。
        • 如需添加存储桶标签,请展开标签部分 (),点击 添加标签,并为标签指定 keyvalue
      2. 选择数据存储位置部分,执行以下操作:
        1. 选择位置类型
        2. 位置类型下拉菜单中选择一个位置,用于永久存储存储桶的数据。
        3. 如需设置跨存储桶复制,请选择通过 Storage Transfer Service 添加跨存储桶复制,然后按照以下步骤操作:

          设置跨存储桶复制

          1. 存储桶菜单中,选择一个存储桶。
          2. 复制设置部分中,点击配置以配置复制作业的设置。

            系统会显示配置跨存储桶复制窗格。

            • 如需按对象名称前缀过滤要复制的对象,请输入要用于包含或排除对象的前缀,然后点击 添加前缀
            • 如需为复制的对象设置存储类别,请从存储类别菜单中选择一个存储类别。 如果您跳过此步骤,则复制的对象会默认使用目标存储桶的存储类别。
            • 点击完成
      3. 选择数据存储方式部分中,执行以下操作:
        1. 为存储桶选择默认存储类别,或者选择 Autoclass 对存储桶数据进行自动存储类别管理。
        2. 如需启用分层命名空间,请在针对数据密集型工作负载优化存储部分中,选择在此存储桶上启用分层命名空间
      4. 选择如何控制对对象的访问权限部分中,选择存储桶是否强制执行禁止公开访问,然后为存储桶对象选择访问权限控制方法
      5. 选择如何保护对象数据部分中,执行以下操作:
        • 数据保护下,选择您要为存储桶设置的任何选项。
          • 如需启用软删除,请选中软删除政策(用于数据恢复)复选框,然后指定您希望在删除对象后保留对象的天数。
          • 如需设置对象版本控制,请选中对象版本控制(用于版本控制)复选框,然后指定每个对象的最大版本数以及非当前版本过期前的天数。
          • 如需为对象和存储分区启用保留政策,请点击保留(合规性)复选框,然后执行以下操作:
            • 如需启用对象保留锁定,请点击启用对象保留复选框。
            • 如需启用存储桶锁定,请点击设置存储桶保留政策复选框,然后为保留期限选择时间单位和时长。
        • 如需选择对象数据的加密方式,请展开数据加密部分 (),然后选择数据加密方法
    4. 点击创建

    创建 Managed Service for Apache Spark 集群

    使用 optional-components 标志(在 2.1 版本及更高版本的映像中可用)创建 Managed Service for Apache Spark 集群,在该集群上安装 Trino 可选组件,并使用 enable-component-gateway 标志启用组件网关,使您能够从 Google Cloud 控制台访问 Trino 网页界面。

    1. 设置环境变量:
      • PROJECT:您的项目 ID
      • BUCKET_NAME::您在准备工作中创建的 Cloud Storage 存储桶的名称
      • REGION:将在其中创建此教程所使用集群的区域,例如“us-west1”
      • WORKERS:此教程推荐配备 3 到 5 个工作器
      export PROJECT=project-id
      export WORKERS=number
      export REGION=region
      export BUCKET_NAME=bucket-name
      
    2. 在本地机器上运行 Google Cloud CLI 以创建集群。
      gcloud beta dataproc clusters create trino-cluster \
          --project=${PROJECT} \
          --region=${REGION} \
          --num-workers=${WORKERS} \
          --scopes=cloud-platform \
          --optional-components=TRINO \
          --image-version=2.1  \
          --enable-component-gateway
      

    准备数据

    bigquery-public-data chicago_taxi_trips 数据集作为 CSV 文件导出到 Cloud Storage,然后创建 Hive 外部表以引用数据。

    1. 在本地机器上,运行以下命令,以 CSV 文件(不含标题)形式将出租车数据从 BigQuery 导入您在准备工作中创建的 Cloud Storage 存储分区。
      bq --location=us extract --destination_format=CSV \
           --field_delimiter=',' --print_header=false \
             "bigquery-public-data:chicago_taxi_trips.taxi_trips" \
             gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv
      
    2. 创建由 Cloud Storage 存储桶中的 CSV 和 Parquet 文件支持的 Hive 外部表。
      1. 创建 Hive 外部表 chicago_taxi_trips_csv
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                CREATE EXTERNAL TABLE chicago_taxi_trips_csv(
                  unique_key   STRING,
                  taxi_id  STRING,
                  trip_start_timestamp  TIMESTAMP,
                  trip_end_timestamp  TIMESTAMP,
                  trip_seconds  INT,
                  trip_miles   FLOAT,
                  pickup_census_tract  INT,
                  dropoff_census_tract  INT,
                  pickup_community_area  INT,
                  dropoff_community_area  INT,
                  fare  FLOAT,
                  tips  FLOAT,
                  tolls  FLOAT,
                  extras  FLOAT,
                  trip_total  FLOAT,
                  payment_type  STRING,
                  company  STRING,
                  pickup_latitude  FLOAT,
                  pickup_longitude  FLOAT,
                  pickup_location  STRING,
                  dropoff_latitude  FLOAT,
                  dropoff_longitude  FLOAT,
                  dropoff_location  STRING)
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY ','
                STORED AS TEXTFILE
                location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';"
        
      2. 验证 Hive 外部表的创建过程。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;"
        
      3. 创建另一个具有相同列的 Hive 外部表 chicago_taxi_trips_parquet,但以 Parquet 格式存储数据可提高查询性能。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                CREATE EXTERNAL TABLE chicago_taxi_trips_parquet(
                  unique_key   STRING,
                  taxi_id  STRING,
                  trip_start_timestamp  TIMESTAMP,
                  trip_end_timestamp  TIMESTAMP,
                  trip_seconds  INT,
                  trip_miles   FLOAT,
                  pickup_census_tract  INT,
                  dropoff_census_tract  INT,
                  pickup_community_area  INT,
                  dropoff_community_area  INT,
                  fare  FLOAT,
                  tips  FLOAT,
                  tolls  FLOAT,
                  extras  FLOAT,
                  trip_total  FLOAT,
                  payment_type  STRING,
                  company  STRING,
                  pickup_latitude  FLOAT,
                  pickup_longitude  FLOAT,
                  pickup_location  STRING,
                  dropoff_latitude  FLOAT,
                  dropoff_longitude  FLOAT,
                  dropoff_location  STRING)
                STORED AS PARQUET
                location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';"
        
      4. 将 Hive CSV 表中的数据加载到 Hive Parquet 表中。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                INSERT OVERWRITE TABLE chicago_taxi_trips_parquet
                SELECT * FROM chicago_taxi_trips_csv;"
        
      5. 验证数据已正确加载。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
        

    运行查询

    您可以在本地通过 Trino CLI 或通过应用运行查询。

    Trino CLI 查询

    此部分演示如何使用 Trino CLI 查询 Hive Parquet 格式的出租车数据集。

    1. 在本地机器上运行以下命令,以通过 SSH 连接到集群的主节点。在执行命令期间,本地终端将停止响应。
      gcloud compute ssh trino-cluster-m
      
    2. 在集群主节点的 SSH 终端窗口中运行 Trino CLI,连接到该主节点上运行的 Trino 服务器。
      trino --catalog hive --schema default
      
    3. trino:default 提示符下,验证 Trino 是否能找到 Hive 表。
      show tables;
      
      Table
      ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
       chicago_taxi_trips_csv
       chicago_taxi_trips_parquet
      (2 rows)
      
    4. trino:default 提示符运行查询,并比较查询 Parquet 与 CSV 数据的性能。
      • Parquet 数据查询
        select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
        
         _col0
        ‐‐‐‐‐‐‐‐
         117957
        (1 row)
        Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s]
      • CSV 数据查询
        select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
        
        _col0
        ‐‐‐‐‐‐‐‐
         117957
        (1 row)
        Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]

    Java 应用查询

    如需通过 Trino Java JDBC 驱动程序从 Java 应用运行查询,请执行以下操作:下载 Trino Java JDBC 驱动程序。1. 在 Maven pom.xml 中添加 trino-jdbc 依赖项。

    <dependency>
      <groupId>io.trino</groupId>
      <artifactId>trino-jdbc</artifactId>
      <version>376</version>
    </dependency>
    
    Java 代码示例
    package dataproc.codelab.trino;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class TrinoQuery {
      private static final String URL = "jdbc:trino://trino-cluster-m:8080/hive/default";
      private static final String SOCKS_PROXY = "localhost:1080";
      private static final String USER = "user";
      private static final String QUERY =
          "select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";
      public static void main(String[] args) {
        try {
          Properties properties = new Properties();
          properties.setProperty("user", USER);
          properties.setProperty("socksProxy", SOCKS_PROXY);
          Connection connection = DriverManager.getConnection(URL, properties);
          try (Statement stmt = connection.createStatement()) {
            ResultSet rs = stmt.executeQuery(QUERY);
            while (rs.next()) {
              int count = rs.getInt("count");
              System.out.println("The number of long trips: " + count);
            }
          }
        } catch (SQLException e) {
          e.printStackTrace();
        }
      }
    }

    日志记录和监控

    日志记录

    Trino 日志位于集群主节点和工作器节点上的 /var/log/trino/

    网页界面

    请参阅查看和访问组件网关网址,在本地浏览器中打开集群主节点上运行的 Trino 网页界面。

    监控

    Trino 通过运行时表公开集群的运行时信息。在 Trino 会话(从 trino:default)提示符中,运行以下查询以查看运行时表数据:

    select * FROM system.runtime.nodes;
    

    清理

    完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。

    删除项目

    为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

    要删除项目,请执行以下操作:

    1. 在 Google Cloud 控制台中,前往管理资源页面。

      转到“管理资源”

    2. 在项目列表中,选择要删除的项目,然后点击删除
    3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

    删除集群

    • 如需删除您的集群,请输入以下命令:
      gcloud dataproc clusters delete --project=${PROJECT} trino-cluster \
          --region=${REGION}
      

    删除存储桶

    • 如需删除您在准备工作中创建的 Cloud Storage 存储分区(包括存储在存储分区中的数据文件),请输入以下命令:
      gcloud storage rm gs://${BUCKET_NAME} --recursive