Managed Service for Apache Spark で HBase と Apache Spark を併用する

目標

このチュートリアルでは、次の方法について説明します。

  1. Managed Service for Apache Spark クラスタを作成し、クラスタに Apache HBase と Apache ZooKeeper をインストールする
  2. Managed Service for Apache Spark クラスタのマスターノードで実行されている HBase シェルを使用して HBase テーブルを作成する
  3. Cloud Shell を使用して、Java または PySpark Spark ジョブを Managed Service for Apache Spark に送信して HBase テーブルへのデータの書き込みと読み取りを行う

費用

このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

新規の Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

Google Cloud Platform プロジェクトをまだ作成していない場合は、作成します。

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Managed Service for Apache Spark クラスタを作成する

  1. Cloud Shell セッション ターミナルで次のコマンドを実行して、以下の操作を行います。

    • HBase コンポーネントと ZooKeeper コンポーネントをインストールする
    • 3 つのワーカーノードをプロビジョニングする(このチュートリアルのコードを実行するには、3~5 つのワーカーをおすすめします)
    • コンポーネント ゲートウェイを有効にする
    • イメージ バージョン 2.0 を使用する
    • --properties フラグを使用して、Spark ドライバとエグゼキュータのクラスパスに HBase 構成と HBase ライブラリを追加する
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

コネクタのインストールを確認する

  1. Google Cloud コンソールまたは Cloud Shell セッション ターミナルから、Managed Service for Apache Spark クラスタ マスターノードに SSH で接続します。

  2. マスターノードで Apache HBase Spark コネクタのインストールを確認します。

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    出力例:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. SSH セッション ターミナルを開いたままにして、以下の操作を行います。

    1. HBase テーブルを作成する
    2. (Java ユーザー): クラスタのマスターノードでコマンドを実行して、クラスタにインストールされているコンポーネントのバージョンを確認する
    3. コードを実行した後、Hbase テーブルをスキャンする

HBase テーブルを作成する

前の手順で開いたマスターノードの SSH セッション ターミナルで、このセクションに記載されているコマンドを実行します。

  1. HBase シェルを開きます。

    hbase shell
    

  2. 「cf」列ファミリーを持つ HBase の「my-table」を作成します。

    create 'my_table','cf'
    

    1. テーブルの作成を確認するには、 Google Cloud コンソールで、Google Cloud コンソール コンポーネント ゲートウェイのリンクの [HBase] をクリックし、Apache HBase UI を開きます。my-table は、[ホーム] ページの [テーブル] セクションに表示されます。

Spark コードを表示する

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;


        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

コードを実行する

  1. Cloud Shell セッション ターミナルを開きます。

  2. Cloud Shell セッション ターミナルに GitHub の GoogleCloudDataproc/cloud-dataproc リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. cloud-managed-spark/spark-hbase ディレクトリに変更します。

    cd cloud-managed-spark/spark-hbase
    
    出力例:
    user-name@cloudshell:~/cloud-managed-spark/spark-hbase (project-id)$
    

  4. Managed Service for Apache Spark ジョブを送信します。

Java

  1. pom.xml ファイルでコンポーネントのバージョンを設定します。
    1. Managed Service for Apache Spark の 2.0.x リリース バージョンのページには、イメージ 2.0 のサブマイナー バージョンの最新の 4 つと最新の Scala、Spark、HBase コンポーネントのバージョンがリストされています。
      1. 2.0 イメージ バージョンのクラスタのサブマイナー バージョンを確認するには、Google Cloud コンソールの [クラスタ] ページでクラスタ名をクリックして、[クラスタの詳細] ページを開きます。クラスタのイメージ バージョンが一覧表示されます。
    2. または、クラスタのマスターノードから SSH セッション ターミナルで次のコマンドを実行して、コンポーネントのバージョンを確認することもできます。
      1. Scala のバージョンを確認します。
        scala -version
        
      2. Spark のバージョンを確認します(Ctrl+D キーを押して終了します)。
        spark-shell
        
      3. HBase のバージョンを確認します。
        hbase version
        
      4. Maven の pom.xml で、Spark、Scala、HBase のバージョン依存関係を特定します。
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        注: hbase-spark.version は現在の Spark HBase コネクタのバージョンです。このバージョン番号は変更しないでください。
    3. Cloud Shell エディタで pom.xml ファイルを編集し、正しい Scala、Spark、HBase のバージョン番号を挿入します。編集が完了したら、[ターミナルを開く] をクリックして Cloud Shell ターミナル コマンドラインに戻ります。
      cloudshell edit .
      
    4. Cloud Shell で Java 8 に切り替えます。コードをビルドするには、この JDK バージョンが必要です(プラグインの警告メッセージは無視してかまいません)。
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Java 8 のインストールを確認します。
      java -version
      
      出力例:
      openjdk version "1.8..."
       
  2. jar ファイルをビルドします。
    mvn clean package
    
    .jar ファイルは /target サブディレクトリに配置されます(例: target/spark-hbase-1.0-SNAPSHOT.jar)。
  3. ジョブを送信します。

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: .jar ファイルの名前を「target/」の後、「.jar」の前に挿入します。
    • クラスタを作成したときに Spark ドライバとエグゼキュータの HBase クラスパスを設定しなかった場合、ジョブ送信コマンドで次の ‑‑properties フラグを指定して、ジョブ送信ごとにクラスパスを設定する必要があります。
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Cloud Shell セッション ターミナルの出力で HBase テーブルの出力を確認します。

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. ジョブを送信します。

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • クラスタを作成したときに Spark ドライバとエグゼキュータの HBase クラスパスを設定しなかった場合、ジョブ送信コマンドで次の ‑‑properties フラグを指定して、ジョブ送信ごとにクラスパスを設定する必要があります。
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Cloud Shell セッション ターミナルの出力で HBase テーブルの出力を確認します。

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

HBase テーブルをスキャンする

HBase テーブルのコンテンツをスキャンするには、コネクタのインストールを確認するで開いたマスターノードの SSH セッション ターミナルで次のコマンドを実行します。

  1. HBase シェルを開きます。
    hbase shell
    
  2. 「my-table」をスキャンします。
    scan 'my_table'
    
    出力例:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

クリーンアップ

チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金されないようにする最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. Google Cloud コンソールで [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

クラスタの削除

  • クラスタを削除するには:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}