Kafka トピックを Hive にストリーミングする

Apache Kafka は、リアルタイム データ パイプラインとデータ統合用のオープンソースの分散型ストリーミング プラットフォームです。次のようなさまざまなアプリケーションで使用する、効率的でスケーラブルなストリーミング システムを提供します。

  • リアルタイム分析
  • ストリーム処理
  • ログ集計
  • 分散メッセージング
  • イベント ストリーミング

チュートリアルのステップ

次の手順を実行して、Dataproc Kafka クラスタを作成し、Kafka トピックを Parquet または ORC 形式で Cloud Storage に読み込みます。

Kafka インストール スクリプトを Cloud Storage にコピーする

kafka.sh 初期化アクション スクリプトは、Kafka を Dataproc クラスタにインストールします。

  1. コードを参照します。

    #!/bin/bash
    #    Copyright 2015 Google, Inc.
    #
    #    Licensed under the Apache License, Version 2.0 (the "License");
    #    you may not use this file except in compliance with the License.
    #    You may obtain a copy of the License at
    #
    #        http://www.apache.org/licenses/LICENSE-2.0
    #
    #    Unless required by applicable law or agreed to in writing, software
    #    distributed under the License is distributed on an "AS IS" BASIS,
    #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #    See the License for the specific language governing permissions and
    #    limitations under the License.
    #
    # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud
    # Dataproc cluster.
    
    set -euxo pipefail
    
    readonly ZOOKEEPER_HOME=/usr/lib/zookeeper
    readonly KAFKA_HOME=/usr/lib/kafka
    readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties'
    readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
    readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)"
    readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)"
    readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/kafka-jmx-port || echo 9999)"
    readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)"
    
    # The first ZooKeeper server address, e.g., "cluster1-m-0:2181".
    ZOOKEEPER_ADDRESS=''
    # Integer broker ID of this node, e.g., 0
    BROKER_ID=''
    
    function retry_apt_command() {
      cmd="$1"
      for ((i = 0; i < 10; i++)); do
        if eval "$cmd"; then
          return 0
        fi
        sleep 5
      done
      return 1
    }
    
    function recv_keys() {
      retry_apt_command "apt-get install -y gnupg2 &&\
                         apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C"
    }
    
    function update_apt_get() {
      retry_apt_command "apt-get update"
    }
    
    function install_apt_get() {
      pkgs="$@"
      retry_apt_command "apt-get install -y $pkgs"
    }
    
    function err() {
      echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2
      return 1
    }
    
    # Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,".
    function get_broker_list() {
      ${KAFKA_HOME}/bin/zookeeper-shell.sh "${ZOOKEEPER_ADDRESS}" \
        <<<"ls /brokers/ids" |
        grep '\[.*\]' |
        sed 's/\[/ /' |
        sed 's/\]/,/'
    }
    
    # Waits for zookeeper to be up or time out.
    function wait_for_zookeeper() {
      for i in {1..20}; do
        if "${ZOOKEEPER_HOME}/bin/zkCli.sh" -server "${ZOOKEEPER_ADDRESS}" ls /; then
          return 0
        else
          echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}" >&2
      exit 1
    }
    
    # Wait until the current broker is registered or time out.
    function wait_for_kafka() {
      for i in {1..20}; do
        local broker_list=$(get_broker_list || true)
        if [[ "${broker_list}" == *" ${BROKER_ID},"* ]]; then
          return 0
        else
          echo "Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to start Kafka broker ${BROKER_ID}." >&2
      exit 1
    }
    
    function install_and_configure_kafka_server() {
      # Find zookeeper list first, before attempting any installation.
      local zookeeper_client_port
      zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg |
        tail -n 1 |
        cut -d '=' -f 2)
    
      local zookeeper_list
      zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg |
        cut -d '=' -f 2 |
        cut -d ':' -f 1 |
        sort |
        uniq |
        sed "s/$/:${zookeeper_client_port}/" |
        xargs echo |
        sed "s/ /,/g")
    
      if [[ -z "${zookeeper_list}" ]]; then
        # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't
        # bother to populate it. Check if YARN HA is configured.
        zookeeper_list=$(bdconfig get_property_value --configuration_file \
          /etc/hadoop/conf/yarn-site.xml \
          --name yarn.resourcemanager.zk-address 2>/dev/null)
      fi
    
      # If all attempts failed, error out.
      if [[ -z "${zookeeper_list}" ]]; then
        err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'
      fi
    
      ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"
    
      # Install Kafka from Dataproc distro.
      install_apt_get kafka-server || dpkg -l kafka-server ||
        err 'Unable to install and find kafka-server.'
    
      mkdir -p /var/lib/kafka-logs
      chown kafka:kafka -R /var/lib/kafka-logs
    
      if [[ "${ROLE}" == "Master" ]]; then
        # For master nodes, broker ID starts from 10,000.
        if [[ "$(hostname)" == *-m ]]; then
          # non-HA
          BROKER_ID=10000
        else
          # HA
          BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g')))
        fi
      else
        # For worker nodes, broker ID is a random number generated less than 10000.
        # 10000 is choosen since the max broker ID allowed being set is 10000.
        BROKER_ID=$((RANDOM % 10000))
      fi
      sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' \
        "${KAFKA_PROP_FILE}"
      echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}"
      echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}"
    
      if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then
        sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh
        sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh
      fi
    
      wait_for_zookeeper
    
      # Start Kafka.
      service kafka-server restart
    
      wait_for_kafka
    }
    
    function install_kafka_python_package() {
      KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"
      if [[ "${INSTALL_KAFKA_PYTHON}" != "true" ]]; then
        return
      fi
    
      if [[ "$(echo "${DATAPROC_IMAGE_VERSION} > 2.0" | bc)" -eq 1 ]]; then
        /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}"; }
      else
        OS=$(. /etc/os-release && echo "${ID}")
        if [[ "${OS}" == "rocky" ]]; then
          yum install -y python2-pip
        else
          apt-get install -y python-pip
        fi
        pip2 install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; pip2 install "${KAFKA_PYTHON_PACKAGE}"; } || { sleep 10; pip install "${KAFKA_PYTHON_PACKAGE}"; }
      fi
    }
    
    function remove_old_backports {
      # This script uses 'apt-get update' and is therefore potentially dependent on
      # backports repositories which have been archived.  In order to mitigate this
      # problem, we will remove any reference to backports repos older than oldstable
    
      # https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157
      oldstable=$(curl -s https://deb.debian.org/debian/dists/oldstable/Release | awk '/^Codename/ {print $2}');
      stable=$(curl -s https://deb.debian.org/debian/dists/stable/Release | awk '/^Codename/ {print $2}');
    
      matched_files="$(grep -rsil '\-backports' /etc/apt/sources.list*)"
      if [[ -n "$matched_files" ]]; then
        for filename in "$matched_files"; do
          grep -e "$oldstable-backports" -e "$stable-backports" "$filename" || \
            sed -i -e 's/^.*-backports.*$//' "$filename"
        done
      fi
    }
    
    function main() {
      OS=$(. /etc/os-release && echo "${ID}")
      if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} <= 2.1" | bc -l) == 1 ]]; then
        remove_old_backports
      fi
      recv_keys || err 'Unable to receive keys.'
      update_apt_get || err 'Unable to update packages lists.'
      install_kafka_python_package
    
      # Only run the installation on workers; verify zookeeper on master(s).
      if [[ "${ROLE}" == 'Master' ]]; then
        service zookeeper-server status ||
          err 'Required zookeeper-server not running on master!'
        if [[ "${RUN_ON_MASTER}" == "true" ]]; then
          # Run installation on masters.
          install_and_configure_kafka_server
        else
          # On master nodes, just install kafka command-line tools and libs but not
          # kafka-server.
          install_apt_get kafka ||
            err 'Unable to install kafka libraries on master!'
        fi
      else
        # Run installation on workers.
        install_and_configure_kafka_server
      fi
    }
    
    main
    

  2. kafka.sh 初期化アクション スクリプトを Cloud Storage バケットにコピーします。このスクリプトは、Kafka を Dataproc クラスタにインストールします。

    1. Cloud Shell を開き、次のコマンドを実行します。

      gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
      

      次のように置き換えます。

      • REGION: kafka.sh は、Cloud Storage のリージョンタグ付き公開バケットに保存されます。地理的に近い Compute Engine リージョンを指定します(例: us-central1)。
      • BUCKET_NAME: Cloud Storage バケットの名前。

Dataproc Kafka クラスタを作成する

  1. Cloud Shell を開き、次の gcloud dataproc clusters create コマンドを実行して、Kafka と ZooKeeper コンポーネントをインストールする Dataproc HA クラスタを作成します。

    gcloud dataproc clusters create KAFKA_CLUSTER \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.1-debian11 \
        --num-masters=3 \
        --enable-component-gateway \
        --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
    

    注:

    • KAFKA_CLUSTER: クラスタ名。プロジェクト内で一意にする必要があります。名前は先頭を小文字にする必要があり、51 文字以下の小文字、数字、ハイフンを使用できます。末尾をハイフンにすることはできません。削除されたクラスタの名前は再利用できます。
    • PROJECT_ID: このクラスタに関連付けるプロジェクト。
    • REGION: クラスタが配置される Compute Engine のリージョンus-central1 など)。
      • オプションの --zone=ZONE フラグを追加して、指定されたリージョン内のゾーン(us-central1-a など)を指定できます。ゾーンを指定しない場合、Dataproc の自動ゾーン プレースメント機能により、指定されたリージョン内のゾーンが選択されます。
    • --image-version: このチュートリアルでは、Dataproc イメージ バージョン 2.1-debian11 をおすすめします。注: 各イメージ バージョンには、このチュートリアルで使用される Hive コンポーネントなど、一連のプリインストール コンポーネントが含まれています(サポートされている Dataproc イメージ バージョンを参照)。
    • --num-master: 3 個のマスターノードが HA クラスタを作成します。Kafka に必要な Zookeeper コンポーネントは HA クラスタにプリインストールされています。
    • --enable-component-gateway: Dataproc コンポーネント ゲートウェイを有効にします。
    • BUCKET_NAME: /scripts/kafka.sh 初期化スクリプトを含む Cloud Storage バケットの名前(Kafka インストール スクリプトを Cloud Storage にコピーするを参照)。

Kafka custdata トピックを作成する

Dataproc Kafka クラスタに Kafka トピックを作成するには:

  1. SSH ユーティリティを使用して、クラスタ マスター VM でターミナル ウィンドウを開きます。

  2. Kafka custdata トピックを作成します。

    /usr/lib/kafka/bin/kafka-topics.sh \
        --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
        --create --topic custdata
    

    注:

    • KAFKA_CLUSTER: Kafka クラスタの名前を挿入します。-w-0:9092 は、worker-0 ノードのポート 9092 で実行されている Kafka ブローカーを表します。

    • custdata トピックの作成後は、次のコマンドを実行できます。

      # List all topics.
      /usr/lib/kafka/bin/kafka-topics.sh \
          --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
          --list
      
      # Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \     --broker-list KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --delete --topic custdata

Kafka custdata トピックにコンテンツを公開する

次のスクリプトでは、kafka-console-producer.sh Kafka ツールを使用して架空の顧客データを CSV 形式で生成します。

  1. スクリプトをコピーして、Kafka クラスタのマスターノードの SSH ターミナルに貼り付けます。<return> を押してスクリプトを実行します。

    for i in {1..10000}; do \
    custname="cust name${i}"
    uuid=$(dbus-uuidgen)
    age=$((45 + $RANDOM % 45))
    amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))")
    message="${uuid}:${custname},${age},${amount}"
    echo ${message}
    done | /usr/lib/kafka/bin/kafka-console-producer.sh \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata \
    --property "parse.key=true" \
    --property "key.separator=:"
    

    注:

    • KAFKA_CLUSTER: Kafka クラスタの名前。
  2. 次の Kafka コマンドを実行して、custdata トピックに 10,000 件のメッセージが含まれていることを確認します。

    /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata
    

    注:

    • KAFKA_CLUSTER: Kafka クラスタの名前。

    予想される出力:

    custdata:0:10000
    

Cloud Storage に Hive テーブルを作成する

ストリーミングされた Kafka トピックデータを受信する Hive テーブルを作成します。次の手順を実行して、Cloud Storage バケットに cust_parquet(Parquet)と cust_orc(ORC)の Hive テーブルを作成します。

  1. BUCKET_NAME を次のスクリプトに挿入し、スクリプトをコピーして Kafka クラスタ マスターノードの SSH ターミナルに貼り付けてから、Enter キーを押して ~/hivetables.hql(Hive クエリ言語)スクリプトを作成します。

    次の手順で ~/hivetables.hql スクリプトを実行して、Cloud Storage バケットに Parquet と ORC 形式の Hive テーブルを作成します。

    cat > ~/hivetables.hql <<EOF
    drop table if exists cust_parquet;
    create external table if not exists cust_parquet
    (uuid string, custname string, age string, amount string)
    row format delimited fields terminated by ','
    stored as parquet
    location "gs://BUCKET_NAME/tables/cust_parquet";
    

    drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
  2. Kafka クラスタのマスターノードの SSH ターミナルで、~/hivetables.hql Hive ジョブを送信して、Cloud Storage バケットに cust_parquet(Parquet)と cust_orc(ORC)の Hive テーブルを作成します。

    gcloud dataproc jobs submit hive \
        --cluster=KAFKA_CLUSTER \
        --region=REGION \
        -f ~/hivetables.hql
    

    注:

    • Hive コンポーネントは Dataproc Kafka クラスタにプリインストールされています。最近リリースされた 2.1 イメージに含まれる Hive コンポーネント バージョンの一覧については、2.1.x リリース バージョンをご覧ください。
    • KAFKA_CLUSTER: Kafka クラスタの名前。
    • REGION: Kafka クラスタが配置されているリージョン。

Kafka custdata を Hive テーブルにストリーミングする

  1. Kafka クラスタのマスターノードの SSH ターミナルで次のコマンドを実行して、kafka-python ライブラリをインストールします。Kafka クライアントは、Kafka トピックデータを Cloud Storage にストリーミングするのに必要です。
    pip install kafka-python
    
  2. BUCKET_NAME を挿入し、次の PySpark コードをコピーして Kafka クラスタ マスターノードの SSH ターミナルに貼り付けてから、Enter キーを押して streamdata.py ファイルを作成します。

    このスクリプトは、Kafka custdata トピックをサブスクライブし、データを Cloud Storage の Hive テーブルにストリーミングします。出力形式(Parquet または ORC)は、パラメータとしてスクリプトに渡されます。

    cat > streamdata.py <<EOF
    #!/bin/python
    
    import sys
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from kafka import KafkaConsumer
    
    def getNameFn (data): return data.split(",")[0]
    def getAgeFn  (data): return data.split(",")[1]
    def getAmtFn  (data): return data.split(",")[2]
    
    def main(cluster, outputfmt):
        spark = SparkSession.builder.appName("APP").getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        Logger = spark._jvm.org.apache.log4j.Logger
        logger = Logger.getLogger(__name__)
    
        rows = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \
        .option("startingOffsets", "earliest")\
        .load()
    
        getNameUDF = udf(getNameFn, StringType())
        getAgeUDF  = udf(getAgeFn,  StringType())
        getAmtUDF  = udf(getAmtFn,  StringType())
    
        logger.warn("Params passed in are cluster name: " + cluster + "  output format(sink): " + outputfmt)
    
        query = rows.select (col("key").cast("string").alias("uuid"),\
            getNameUDF      (col("value").cast("string")).alias("custname"),\
            getAgeUDF       (col("value").cast("string")).alias("age"),\
            getAmtUDF       (col("value").cast("string")).alias("amount"))
    
        writer = query.writeStream.format(outputfmt)\
                .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\
                .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \
            .outputMode("append")\
            .start()
    
        writer.awaitTermination()
    
    if __name__=="__main__":
        if len(sys.argv) < 2:
            print ("Invalid number of arguments passed ", len(sys.argv))
            print ("Usage: ", sys.argv[0], " cluster  format")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  orc")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  parquet")
        main(sys.argv[1], sys.argv[2])
    
    EOF
    
  3. Kafka クラスタのマスターノードの SSH ターミナルで spark-submit を実行して、Cloud Storage の Hive テーブルにデータをストリーミングします。

    1. KAFKA_CLUSTER の名前と出力 FORMAT を挿入し、次のコードをコピーして Kafka クラスタのマスターノードの SSH ターミナルに貼り付け、Enter キーを押してコードを実行し、Kafka custdata データを Parquet 形式で Cloud Storage の Hive テーブルにストリーミングします。

      spark-submit --packages \
      org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \
          --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
          --conf spark.driver.memory=4096m \
          --conf spark.executor.cores=2 \
          --conf spark.executor.instances=2 \
          --conf spark.executor.memory=6144m \
          streamdata.py KAFKA_CLUSTER FORMAT
          

      注:

      • KAFKA_CLUSTER: Kafka クラスタの名前を挿入します。
      • FORMAT: 出力形式として parquet または orc を指定します。コマンドを連続して実行して、両方の形式を Hive テーブルにストリーミングできます。たとえば、最初の呼び出しでは parquet を指定して Kafka custdata トピックを Hive Parquet テーブルにストリーミングし、2 番目の呼び出しでは orc 形式を指定して custdata を Hive ORC テーブルにストリーミングします。
  4. 標準出力が SSH ターミナルで停止したら(これは、すべての custdata がストリーミングされたことを示します)、SSH ターミナルで Ctrl+C キーを押してプロセスを停止します。

  5. Cloud Storage 内の Hive テーブルを一覧表示します。

    gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
    

    注:

ストリーミング データをクエリする

  1. Kafka クラスタのマスターノードの SSH ターミナルで、次の hive コマンドを実行して、Cloud Storage の Hive テーブルにストリーミングされた Kafka custdata メッセージをカウントします。

    hive -e "select count(1) from TABLE_NAME"
    

    注:

    • TABLE_NAME: Hive テーブル名として cust_parquet または cust_orc を指定します。

    想定される出力スニペット:

...
Status: Running (Executing on YARN cluster with App id application_....)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
Reducer 2 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)