將 Kafka 主題串流至 Hive

Apache Kafka 是開放原始碼的分散式串流平台,適用於即時資料管道和資料整合。這項技術提供有效率且可擴充的串流系統,適用於各種應用程式,包括:

  • 即時分析
  • 串流處理
  • 記錄檔匯總
  • 分散式訊息
  • 活動串流

教學課程步驟

請按照下列步驟建立 Dataproc Kafka 叢集,將 Kafka 主題讀取至 Cloud Storage,並採用 Parquet 或 ORC 格式。

將 Kafka 安裝指令碼複製到 Cloud Storage

kafka.sh 初始化動作指令碼會在 Dataproc 叢集上安裝 Kafka。

  1. 瀏覽程式碼。

    #!/bin/bash
    #    Copyright 2015 Google, Inc.
    #
    #    Licensed under the Apache License, Version 2.0 (the "License");
    #    you may not use this file except in compliance with the License.
    #    You may obtain a copy of the License at
    #
    #        http://www.apache.org/licenses/LICENSE-2.0
    #
    #    Unless required by applicable law or agreed to in writing, software
    #    distributed under the License is distributed on an "AS IS" BASIS,
    #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #    See the License for the specific language governing permissions and
    #    limitations under the License.
    #
    # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud
    # Dataproc cluster.
    
    set -euxo pipefail
    
    readonly ZOOKEEPER_HOME=/usr/lib/zookeeper
    readonly KAFKA_HOME=/usr/lib/kafka
    readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties'
    readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
    readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)"
    readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)"
    readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/kafka-jmx-port || echo 9999)"
    readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)"
    
    # The first ZooKeeper server address, e.g., "cluster1-m-0:2181".
    ZOOKEEPER_ADDRESS=''
    # Integer broker ID of this node, e.g., 0
    BROKER_ID=''
    
    function retry_apt_command() {
      cmd="$1"
      for ((i = 0; i < 10; i++)); do
        if eval "$cmd"; then
          return 0
        fi
        sleep 5
      done
      return 1
    }
    
    function recv_keys() {
      retry_apt_command "apt-get install -y gnupg2 &&\
                         apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C"
    }
    
    function update_apt_get() {
      retry_apt_command "apt-get update"
    }
    
    function install_apt_get() {
      pkgs="$@"
      retry_apt_command "apt-get install -y $pkgs"
    }
    
    function err() {
      echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2
      return 1
    }
    
    # Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,".
    function get_broker_list() {
      ${KAFKA_HOME}/bin/zookeeper-shell.sh "${ZOOKEEPER_ADDRESS}" \
        <<<"ls /brokers/ids" |
        grep '\[.*\]' |
        sed 's/\[/ /' |
        sed 's/\]/,/'
    }
    
    # Waits for zookeeper to be up or time out.
    function wait_for_zookeeper() {
      for i in {1..20}; do
        if "${ZOOKEEPER_HOME}/bin/zkCli.sh" -server "${ZOOKEEPER_ADDRESS}" ls /; then
          return 0
        else
          echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}" >&2
      exit 1
    }
    
    # Wait until the current broker is registered or time out.
    function wait_for_kafka() {
      for i in {1..20}; do
        local broker_list=$(get_broker_list || true)
        if [[ "${broker_list}" == *" ${BROKER_ID},"* ]]; then
          return 0
        else
          echo "Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to start Kafka broker ${BROKER_ID}." >&2
      exit 1
    }
    
    function install_and_configure_kafka_server() {
      # Find zookeeper list first, before attempting any installation.
      local zookeeper_client_port
      zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg |
        tail -n 1 |
        cut -d '=' -f 2)
    
      local zookeeper_list
      zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg |
        cut -d '=' -f 2 |
        cut -d ':' -f 1 |
        sort |
        uniq |
        sed "s/$/:${zookeeper_client_port}/" |
        xargs echo |
        sed "s/ /,/g")
    
      if [[ -z "${zookeeper_list}" ]]; then
        # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't
        # bother to populate it. Check if YARN HA is configured.
        zookeeper_list=$(bdconfig get_property_value --configuration_file \
          /etc/hadoop/conf/yarn-site.xml \
          --name yarn.resourcemanager.zk-address 2>/dev/null)
      fi
    
      # If all attempts failed, error out.
      if [[ -z "${zookeeper_list}" ]]; then
        err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'
      fi
    
      ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"
    
      # Install Kafka from Dataproc distro.
      install_apt_get kafka-server || dpkg -l kafka-server ||
        err 'Unable to install and find kafka-server.'
    
      mkdir -p /var/lib/kafka-logs
      chown kafka:kafka -R /var/lib/kafka-logs
    
      if [[ "${ROLE}" == "Master" ]]; then
        # For master nodes, broker ID starts from 10,000.
        if [[ "$(hostname)" == *-m ]]; then
          # non-HA
          BROKER_ID=10000
        else
          # HA
          BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g')))
        fi
      else
        # For worker nodes, broker ID is a random number generated less than 10000.
        # 10000 is choosen since the max broker ID allowed being set is 10000.
        BROKER_ID=$((RANDOM % 10000))
      fi
      sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' \
        "${KAFKA_PROP_FILE}"
      echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}"
      echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}"
    
      if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then
        sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh
        sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh
      fi
    
      wait_for_zookeeper
    
      # Start Kafka.
      service kafka-server restart
    
      wait_for_kafka
    }
    
    function install_kafka_python_package() {
      KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"
      if [[ "${INSTALL_KAFKA_PYTHON}" != "true" ]]; then
        return
      fi
    
      if [[ "$(echo "${DATAPROC_IMAGE_VERSION} > 2.0" | bc)" -eq 1 ]]; then
        /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}"; }
      else
        OS=$(. /etc/os-release && echo "${ID}")
        if [[ "${OS}" == "rocky" ]]; then
          yum install -y python2-pip
        else
          apt-get install -y python-pip
        fi
        pip2 install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; pip2 install "${KAFKA_PYTHON_PACKAGE}"; } || { sleep 10; pip install "${KAFKA_PYTHON_PACKAGE}"; }
      fi
    }
    
    function remove_old_backports {
      # This script uses 'apt-get update' and is therefore potentially dependent on
      # backports repositories which have been archived.  In order to mitigate this
      # problem, we will remove any reference to backports repos older than oldstable
    
      # https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157
      oldstable=$(curl -s https://deb.debian.org/debian/dists/oldstable/Release | awk '/^Codename/ {print $2}');
      stable=$(curl -s https://deb.debian.org/debian/dists/stable/Release | awk '/^Codename/ {print $2}');
    
      matched_files="$(grep -rsil '\-backports' /etc/apt/sources.list*)"
      if [[ -n "$matched_files" ]]; then
        for filename in "$matched_files"; do
          grep -e "$oldstable-backports" -e "$stable-backports" "$filename" || \
            sed -i -e 's/^.*-backports.*$//' "$filename"
        done
      fi
    }
    
    function main() {
      OS=$(. /etc/os-release && echo "${ID}")
      if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} <= 2.1" | bc -l) == 1 ]]; then
        remove_old_backports
      fi
      recv_keys || err 'Unable to receive keys.'
      update_apt_get || err 'Unable to update packages lists.'
      install_kafka_python_package
    
      # Only run the installation on workers; verify zookeeper on master(s).
      if [[ "${ROLE}" == 'Master' ]]; then
        service zookeeper-server status ||
          err 'Required zookeeper-server not running on master!'
        if [[ "${RUN_ON_MASTER}" == "true" ]]; then
          # Run installation on masters.
          install_and_configure_kafka_server
        else
          # On master nodes, just install kafka command-line tools and libs but not
          # kafka-server.
          install_apt_get kafka ||
            err 'Unable to install kafka libraries on master!'
        fi
      else
        # Run installation on workers.
        install_and_configure_kafka_server
      fi
    }
    
    main
    

  2. kafka.sh 初始化動作 指令碼複製到 Cloud Storage bucket。 這項指令碼會為 Dataproc 叢集安裝 Kafka。

    1. 開啟 Cloud Shell,然後執行下列指令:

      gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
      

      請將下列項目改為對應的值:

      • REGIONkafka.sh 儲存在 Cloud Storage 中標記區域的公開值區。指定地理位置接近的 Compute Engine 區域 (例如:us-central1)。
      • BUCKET_NAME:Cloud Storage bucket 的名稱。

建立 Dataproc Kafka 叢集

  1. 開啟 Cloud Shell,然後執行下列 gcloud dataproc clusters create 指令,建立安裝 Kafka 和 ZooKeeper 元件的 Dataproc 高可用性叢集

    gcloud dataproc clusters create KAFKA_CLUSTER \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.1-debian11 \
        --num-masters=3 \
        --enable-component-gateway \
        --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
    

    注意:

    • KAFKA_CLUSTER:叢集名稱,在專案中不得重複。 名稱開頭須為小寫英文字母,最多可包含 51 個小寫英文字母、數字和連字號。結尾不得為連字號。您可以重複使用已刪除叢集的名稱。
    • PROJECT_ID:要與這個叢集建立關聯的專案。
    • REGION:叢集所在的 Compute Engine 區域,例如 us-central1
      • 您可以新增選用的 --zone=ZONE 旗標,在指定區域內指定區域,例如 us-central1-a。如未指定區域,Dataproc 自動選擇放置區域功能會選取指定地區中的區域。
    • --image-version:建議在本教學課程中使用 Dataproc 映像檔版本 2.1-debian11。注意:每個映像檔版本都包含一組預先安裝的元件,包括本教學課程中使用的 Hive 元件 (請參閱「支援的 Dataproc 映像檔版本」)。
    • --num-master3 個主節點會建立高可用性叢集。Kafka 需要 Zookeeper 元件,因此系統會在 HA 叢集上預先安裝該元件。
    • --enable-component-gateway:啟用 Dataproc 元件閘道
    • BUCKET_NAME:Cloud Storage bucket 的名稱,其中包含 /scripts/kafka.sh 初始化指令碼 (請參閱「將 Kafka 安裝指令碼複製到 Cloud Storage」)。

建立 Kafka custdata 主題

如要在 Dataproc Kafka 叢集上建立 Kafka 主題,請按照下列步驟操作:

  1. 使用 SSH 公用程式,在叢集主 VM 上開啟終端機視窗。

  2. 建立 Kafka custdata 主題。

    /usr/lib/kafka/bin/kafka-topics.sh \
        --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
        --create --topic custdata
    

    注意:

    • KAFKA_CLUSTER:插入 Kafka 叢集的名稱。 -w-0:9092 代表在 worker-0 節點的 9092 連接埠上執行的 Kafka 代理程式。

    • 建立 custdata 主題後,您可以執行下列指令:

      # List all topics.
      /usr/lib/kafka/bin/kafka-topics.sh \
          --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
          --list
      
      # Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \     --broker-list KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --delete --topic custdata

將內容發布至 Kafka custdata 主題

下列指令碼使用 kafka-console-producer.sh Kafka 工具,以 CSV 格式產生虛構的顧客資料。

  1. 複製指令碼,然後貼到 Kafka 叢集主要節點的 SSH 終端機。按下 <return> 執行指令碼。

    for i in {1..10000}; do \
    custname="cust name${i}"
    uuid=$(dbus-uuidgen)
    age=$((45 + $RANDOM % 45))
    amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))")
    message="${uuid}:${custname},${age},${amount}"
    echo ${message}
    done | /usr/lib/kafka/bin/kafka-console-producer.sh \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata \
    --property "parse.key=true" \
    --property "key.separator=:"
    

    注意:

    • KAFKA_CLUSTER:Kafka 叢集的名稱。
  2. 執行下列 Kafka 指令,確認 custdata 主題包含 10,000 則訊息。

    /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata
    

    注意:

    • KAFKA_CLUSTER:Kafka 叢集的名稱。

    預期輸出內容:

    custdata:0:10000
    

在 Cloud Storage 中建立 Hive 資料表

建立 Hive 資料表,接收串流 Kafka 主題資料。 請按照下列步驟,在 Cloud Storage bucket 中建立 cust_parquet (parquet) 和 cust_orc (ORC) Hive 資料表。

  1. 在下列指令碼中插入 BUCKET_NAME,然後複製指令碼並貼到 Kafka 叢集主要節點的 SSH 終端機,接著按下 <return> 鍵,建立 ~/hivetables.hql (Hive 查詢語言) 指令碼。

    您會在下一個步驟中執行 ~/hivetables.hql 指令碼,在 Cloud Storage bucket 中建立 Parquet 和 ORC Hive 資料表。

    cat > ~/hivetables.hql <<EOF
    drop table if exists cust_parquet;
    create external table if not exists cust_parquet
    (uuid string, custname string, age string, amount string)
    row format delimited fields terminated by ','
    stored as parquet
    location "gs://BUCKET_NAME/tables/cust_parquet";
    

    drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
  2. 在 Kafka 叢集主要節點的 SSH 終端機中,提交 ~/hivetables.hql Hive 工作,在 Cloud Storage 值區中建立 cust_parquet (parquet) 和 cust_orc (ORC) Hive 資料表。

    gcloud dataproc jobs submit hive \
        --cluster=KAFKA_CLUSTER \
        --region=REGION \
        -f ~/hivetables.hql
    

    注意:

    • Dataproc Kafka 叢集已預先安裝 Hive 元件。如要查看最近發布的 2.1 映像檔中包含的 Hive 元件版本清單,請參閱「2.1.x 版本」。
    • KAFKA_CLUSTER:Kafka 叢集的名稱。
    • REGION:Kafka 叢集所在的區域。

將 Kafka custdata 串流至 Hive 資料表

  1. 在 Kafka 叢集主要節點的 SSH 終端機中執行下列指令,安裝 kafka-python 程式庫。如要將 Kafka 主題資料串流至 Cloud Storage,必須使用 Kafka 用戶端。
    pip install kafka-python
    
  2. 插入 BUCKET_NAME,然後複製下列 PySpark 程式碼並貼到 Kafka 叢集主要節點的 SSH 終端機,然後按下 <return> 鍵建立 streamdata.py 檔案。

    這個指令碼會訂閱 Kafka custdata 主題,然後將資料串流至 Cloud Storage 中的 Hive 資料表。輸出格式 (可以是 Parquet 或 ORC) 會以參數形式傳遞至指令碼。

    cat > streamdata.py <<EOF
    #!/bin/python
    
    import sys
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from kafka import KafkaConsumer
    
    def getNameFn (data): return data.split(",")[0]
    def getAgeFn  (data): return data.split(",")[1]
    def getAmtFn  (data): return data.split(",")[2]
    
    def main(cluster, outputfmt):
        spark = SparkSession.builder.appName("APP").getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        Logger = spark._jvm.org.apache.log4j.Logger
        logger = Logger.getLogger(__name__)
    
        rows = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \
        .option("startingOffsets", "earliest")\
        .load()
    
        getNameUDF = udf(getNameFn, StringType())
        getAgeUDF  = udf(getAgeFn,  StringType())
        getAmtUDF  = udf(getAmtFn,  StringType())
    
        logger.warn("Params passed in are cluster name: " + cluster + "  output format(sink): " + outputfmt)
    
        query = rows.select (col("key").cast("string").alias("uuid"),\
            getNameUDF      (col("value").cast("string")).alias("custname"),\
            getAgeUDF       (col("value").cast("string")).alias("age"),\
            getAmtUDF       (col("value").cast("string")).alias("amount"))
    
        writer = query.writeStream.format(outputfmt)\
                .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\
                .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \
            .outputMode("append")\
            .start()
    
        writer.awaitTermination()
    
    if __name__=="__main__":
        if len(sys.argv) < 2:
            print ("Invalid number of arguments passed ", len(sys.argv))
            print ("Usage: ", sys.argv[0], " cluster  format")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  orc")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  parquet")
        main(sys.argv[1], sys.argv[2])
    
    EOF
    
  3. 在 Kafka 叢集主要節點的 SSH 終端機中,執行 spark-submit,將資料串流至 Cloud Storage 中的 Hive 資料表。

    1. 插入 KAFKA_CLUSTER 的名稱和輸出 FORMAT,然後將下列程式碼複製並貼到 Kafka 叢集主要節點的 SSH 終端機,然後按下 <return> 執行程式碼,並以 Parquet 格式將 Kafka custdata 資料串流至 Cloud Storage 中的 Hive 資料表。

      spark-submit --packages \
      org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \
          --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
          --conf spark.driver.memory=4096m \
          --conf spark.executor.cores=2 \
          --conf spark.executor.instances=2 \
          --conf spark.executor.memory=6144m \
          streamdata.py KAFKA_CLUSTER FORMAT
          

      注意:

      • KAFKA_CLUSTER:插入 Kafka 叢集的名稱。
      • FORMAT:指定 parquetorc 做為輸出格式。您可以連續執行指令,將兩種格式的資料串流至 Hive 資料表。舉例來說,第一次叫用時,請指定 parquet,將 Kafka custdata 主題串流至 Hive parquet 資料表;第二次叫用時,請指定 orc 格式,將 custdata 串流至 Hive ORC 資料表。
  4. SSH 終端機停止標準輸出後,表示所有 custdata 都已串流,請在 SSH 終端機中按下 <control-c> 停止程序。

  5. 列出 Cloud Storage 中的 Hive 資料表。

    gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
    

    注意:

    • BUCKET_NAME:插入包含 Hive 資料表的 Cloud Storage bucket 名稱 (請參閱「建立 Hive 資料表」)。

查詢串流資料

  1. 在 Kafka 叢集主要節點的 SSH 終端機中,執行下列 hive 指令,計算 Cloud Storage 中 Hive 表格的串流 Kafka custdata 訊息。

    hive -e "select count(1) from TABLE_NAME"
    

    注意:

    • TABLE_NAME:指定 cust_parquetcust_orc 做為 Hive 資料表名稱。

    預期的輸出內容片段:

...
Status: Running (Executing on YARN cluster with App id application_....)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
Reducer 2 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)