將 Kafka 主題串流至 Hive

Apache Kafka 是開放原始碼的分散式串流平台,適用於即時資料管道和資料整合。這個系統提供有效率且可擴充的串流系統,適用於各種應用程式,包括:

  • 即時分析
  • 串流處理
  • 記錄檔匯總
  • 分散式訊息
  • 活動串流

目標

  1. Managed Service for Apache Spark HA 叢集上安裝 Kafka,並搭配 ZooKeeper (在本教學課程中稱為「Managed Service for Apache Spark Kafka 叢集」)。

  2. 建立虛構的顧客資料,然後將資料發布至 Kafka 主題。

  3. 在 Cloud Storage 中建立 Hive Parquet 和 ORC 資料表,接收串流的 Kafka 主題資料。

  4. 提交 PySpark 工作,訂閱 Kafka 主題並以 Parquet 和 ORC 格式將資料串流至 Cloud Storage。

  5. 對串流 Hive 資料表資料執行查詢,計算串流 Kafka 訊息的數量。

費用

在本文件中,您會使用下列 Google Cloud的計費元件:

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用期資格。

完成本文所述工作後,您可以刪除建立的資源,避免繼續計費,詳情請參閱「清除所用資源」。

事前準備

建立 Google Cloud 專案 (如果尚未建立的話)。

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. 前往 Google Cloud 控制台的 Cloud Storage「Buckets」(值區) 頁面。

    前往「Buckets」(值區) 頁面

  9. 點選 「Create」(建立)
  10. 在「建立 bucket」頁面中,輸入 bucket 資訊。如要前往下一個步驟,請按「繼續」
    1. 在「開始使用」部分,執行下列操作:
      • 輸入符合值區命名規定的全域不重複名稱。
      • 如要新增值區標籤,請展開「標籤」部分 (),按一下 「新增標籤」,然後為標籤指定 keyvalue
    2. 在「Choose where to store your data」(選擇資料的儲存位置) 專區中,執行下列操作:
      1. 選取「位置類型」
      2. 從「位置類型」下拉式選單中,選擇要永久儲存 bucket 資料的位置。
        • 如果您選取「雙區域」位置類型,也可以使用相關核取方塊啟用強化型複製
      3. 如要設定跨值區複製,請選取「透過 Storage 移轉服務新增跨值區複製作業」,然後按照下列步驟操作:

        設定跨 bucket 複製作業

        1. 在「Bucket」選單中選取 bucket。
        2. 在「複製設定」部分,按一下「設定」,設定複製作業的設定。

          系統隨即會顯示「設定跨 bucket 複製作業」窗格。

          • 如要依物件名稱前置字串篩選要複製的物件,請輸入要納入或排除物件的前置字串,然後按一下「新增前置字串」
          • 如要為複製的物件設定儲存空間級別,請從「儲存空間級別」選單中選取儲存空間級別。如果略過這個步驟,複製的物件預設會使用目標值區的儲存空間級別。
          • 按一下 [完成]
    3. 在「選擇資料儲存方式」部分,執行下列操作:
      1. 選取 bucket 的預設儲存空間級別,或選取「Autoclass」,讓系統自動管理 bucket 資料的儲存空間級別。
      2. 如要啟用階層命名空間,請在「為資料密集型工作負載提供最理想的儲存空間」部分,選取「為這個值區啟用階層命名空間」
    4. 在「選取如何控制物件的存取權」部分,選取 bucket 是否要強制執行禁止公開存取,並為 bucket 的物件選取存取控管方法
    5. 在「選擇保護物件資料的方式」部分,執行下列操作:
      • 選取「資料保護」下方的任何選項,為 bucket 設定所需項目。
        • 如要啟用虛刪除,請按一下「虛刪除政策 (用於資料復原)」核取方塊,並指定要保留物件的天數 (刪除後)。
        • 如要設定「物件版本管理」,請按一下「物件版本管理 (用於版本管控)」核取方塊,並指定每個物件的版本數量上限,以及非現行版本失效的天數。
        • 如要為物件和 bucket 啟用資料保留政策,請勾選「保留 (符合法規)」核取方塊,然後執行下列操作:
          • 如要啟用 Object Retention Lock,請按一下「啟用物件保留功能」核取方塊。
          • 如要啟用 Bucket Lock,請勾選「Set bucket retention policy」(設定值區資料保留政策) 核取方塊,然後選擇保留期限的時間單位和長度。
      • 如要選擇物件資料的加密方式,請展開「資料加密」部分 (),然後選取「資料加密」方法
  11. 點選「建立」

教學課程步驟

請按照下列步驟建立 Managed Service for Apache Spark Kafka 叢集,將 Kafka 主題讀取至 Cloud Storage,並採用 Parquet 或 ORC 格式。

將 Kafka 安裝指令碼複製到 Cloud Storage

kafka.sh 初始化動作指令碼會在 Managed Service for Apache Spark 叢集上安裝 Kafka。

  1. 瀏覽程式碼。

    #!/bin/bash
    #    Copyright 2015 Google, Inc.
    #
    #    Licensed under the Apache License, Version 2.0 (the "License");
    #    you may not use this file except in compliance with the License.
    #    You may obtain a copy of the License at
    #
    #        http://www.apache.org/licenses/LICENSE-2.0
    #
    #    Unless required by applicable law or agreed to in writing, software
    #    distributed under the License is distributed on an "AS IS" BASIS,
    #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #    See the License for the specific language governing permissions and
    #    limitations under the License.
    #
    # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud
    # Dataproc cluster.
    
    set -euxo pipefail
    
    readonly ZOOKEEPER_HOME=/usr/lib/zookeeper
    readonly KAFKA_HOME=/usr/lib/kafka
    readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties'
    readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
    readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)"
    readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)"
    readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/kafka-jmx-port || echo 9999)"
    readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)"
    
    # The first ZooKeeper server address, e.g., "cluster1-m-0:2181".
    ZOOKEEPER_ADDRESS=''
    # Integer broker ID of this node, e.g., 0
    BROKER_ID=''
    
    function retry_apt_command() {
      cmd="$1"
      for ((i = 0; i < 10; i++)); do
        if eval "$cmd"; then
          return 0
        fi
        sleep 5
      done
      return 1
    }
    
    function recv_keys() {
      if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} >= 3.0" | bc -l) == 1 ]]; then
        retry_apt_command "apt-get update && apt-get install -y gnupg"
        export GNUPGHOME="$(mktemp -d)"
        trap 'rm -rf "${GNUPGHOME}"' EXIT
        gpg --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C
        mkdir -p /etc/apt/trusted.gpg.d
        gpg --export B7B3B788A8D3785C > /etc/apt/trusted.gpg.d/mysql-repo.gpg
      else
        retry_apt_command "apt-get install -y gnupg2 && \
          apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C"
      fi
    }
    
    function update_apt_get() {
      retry_apt_command "apt-get update"
    }
    
    function install_apt_get() {
      pkgs="$@"
      retry_apt_command "apt-get install -y $pkgs"
    }
    
    function err() {
      echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2
      return 1
    }
    
    # Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,".
    function get_broker_list() {
      ${KAFKA_HOME}/bin/zookeeper-shell.sh "${ZOOKEEPER_ADDRESS}" \
        <<<"ls /brokers/ids" |
        grep '\[.*\]' |
        sed 's/\[/ /' |
        sed 's/\]/,/'
    }
    
    # Waits for zookeeper to be up or time out.
    function wait_for_zookeeper() {
      for i in {1..20}; do
        if "${ZOOKEEPER_HOME}/bin/zkCli.sh" -server "${ZOOKEEPER_ADDRESS}" ls /; then
          return 0
        else
          echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}" >&2
      exit 1
    }
    
    # Wait until the current broker is registered or time out.
    function wait_for_kafka() {
      for i in {1..20}; do
        local broker_list=$(get_broker_list || true)
        if [[ "${broker_list}" == *" ${BROKER_ID},"* ]]; then
          return 0
        else
          echo "Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to start Kafka broker ${BROKER_ID}." >&2
      exit 1
    }
    
    function install_and_configure_kafka_server() {
      # Find zookeeper list first, before attempting any installation.
      local zookeeper_client_port
      zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg |
        tail -n 1 |
        cut -d '=' -f 2)
    
      local zookeeper_list
      zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg |
        cut -d '=' -f 2 |
        cut -d ':' -f 1 |
        sort |
        uniq |
        sed "s/$/:${zookeeper_client_port}/" |
        xargs echo |
        sed "s/ /,/g")
    
      if [[ -z "${zookeeper_list}" ]]; then
        # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't
        # bother to populate it. Check if YARN HA is configured.
        zookeeper_list=$(bdconfig get_property_value --configuration_file \
          /etc/hadoop/conf/yarn-site.xml \
          --name yarn.resourcemanager.zk-address 2>/dev/null)
      fi
    
      # If all attempts failed, error out.
      if [[ -z "${zookeeper_list}" ]]; then
        err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'
      fi
    
      ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"
    
      # Install Kafka from Dataproc distro.
      install_apt_get kafka-server || dpkg -l kafka-server ||
        err 'Unable to install and find kafka-server.'
    
      mkdir -p /var/lib/kafka-logs
      chown kafka:kafka -R /var/lib/kafka-logs
    
      if [[ "${ROLE}" == "Master" ]]; then
        # For master nodes, broker ID starts from 10,000.
        if [[ "$(hostname)" == *-m ]]; then
          # non-HA
          BROKER_ID=10000
        else
          # HA
          BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g')))
        fi
      else
        # For worker nodes, broker ID is a random number generated less than 10000.
        # 10000 is choosen since the max broker ID allowed being set is 10000.
        BROKER_ID=$((RANDOM % 10000))
      fi
      sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' \
        "${KAFKA_PROP_FILE}"
      echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}"
      echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}"
    
      if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then
        sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh
        sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh
      fi
    
      wait_for_zookeeper
    
      # Start Kafka.
      service kafka-server restart
    
      wait_for_kafka
    }
    
    function install_kafka_python_package() {
      KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"
      if [[ "${INSTALL_KAFKA_PYTHON}" != "true" ]]; then
        return
      fi
    
      if [[ "$(echo "${DATAPROC_IMAGE_VERSION} > 2.0" | bc)" -eq 1 ]]; then
        /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}"; }
      else
        OS=$(. /etc/os-release && echo "${ID}")
        if [[ "${OS}" == "rocky" ]]; then
          yum install -y python2-pip
        else
          apt-get install -y python-pip
        fi
        pip2 install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; pip2 install "${KAFKA_PYTHON_PACKAGE}"; } || { sleep 10; pip install "${KAFKA_PYTHON_PACKAGE}"; }
      fi
    }
    
    function remove_old_backports {
      # This script uses 'apt-get update' and is therefore potentially dependent on
      # backports repositories which have been archived.  In order to mitigate this
      # problem, we will remove any reference to backports repos older than oldstable
    
      # https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157
      oldstable=$(curl -s https://deb.debian.org/debian/dists/oldstable/Release | awk '/^Codename/ {print $2}');
      stable=$(curl -s https://deb.debian.org/debian/dists/stable/Release | awk '/^Codename/ {print $2}');
    
      matched_files="$(grep -rsil '\-backports' /etc/apt/sources.list*)"
      if [[ -n "$matched_files" ]]; then
        for filename in "$matched_files"; do
          grep -e "$oldstable-backports" -e "$stable-backports" "$filename" || \
            sed -i -e 's/^.*-backports.*$//' "$filename"
        done
      fi
    }
    
    function main() {
      OS=$(. /etc/os-release && echo "${ID}")
      if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} <= 2.1" | bc -l) == 1 ]]; then
        remove_old_backports
      fi
      recv_keys || err 'Unable to receive keys.'
      update_apt_get || err 'Unable to update packages lists.'
      install_kafka_python_package
    
      # Only run the installation on workers; verify zookeeper on master(s).
      if [[ "${ROLE}" == 'Master' ]]; then
        service zookeeper-server status ||
          err 'Required zookeeper-server not running on master!'
        if [[ "${RUN_ON_MASTER}" == "true" ]]; then
          # Run installation on masters.
          install_and_configure_kafka_server
        else
          # On master nodes, just install kafka command-line tools and libs but not
          # kafka-server.
          install_apt_get kafka ||
            err 'Unable to install kafka libraries on master!'
        fi
      else
        # Run installation on workers.
        install_and_configure_kafka_server
      fi
    }
    
    main
    

  2. kafka.sh 初始化動作 指令碼複製到 Cloud Storage bucket。 這項指令碼會在 Managed Service for Apache Spark 叢集上安裝 Kafka。

    1. 開啟 Cloud Shell,然後執行下列指令:

      gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
      

      請替換下列項目:

      • REGIONkafka.sh 儲存在 Cloud Storage 中公開的區域標記值區。指定地理位置接近的 Compute Engine 區域 (例如:us-central1)。
      • BUCKET_NAME:Cloud Storage bucket 的名稱。

建立 Managed Service for Apache Spark Kafka 叢集

  1. 開啟 Cloud Shell,然後執行下列 gcloud dataproc clusters create 指令,建立 Managed Service for Apache Spark HA 叢集,並安裝 Kafka 和 ZooKeeper 元件:

    gcloud dataproc clusters create KAFKA_CLUSTER \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.1-debian11 \
        --num-masters=3 \
        --enable-component-gateway \
        --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
    

    注意:

    • KAFKA_CLUSTER:叢集名稱,在專案內不得重複。名稱開頭須為小寫英文字母,最多包含 51 個小寫英文字母、數字和連字號,而且結尾不得為連字號。可以重複使用已刪除叢集的名稱。
    • PROJECT_ID:要與這個叢集建立關聯的專案。
    • REGION:叢集所在的 Compute Engine 區域,例如 us-central1
      • 您可以新增選用的 --zone=ZONE 旗標,在指定區域內指定區域,例如 us-central1-a。如未指定可用區,Apache Spark 的受管理服務會透過自動可用區放置功能,在指定區域中選取可用區。
    • --image-version:建議在本教學課程中使用 Managed Service for Apache Spark 映像檔版本 2.1-debian11 。注意:每個映像檔版本都包含一組預先安裝的元件,包括本教學課程中使用的 Hive 元件 (請參閱「支援的 Managed Service for Apache Spark 映像檔版本」)。
    • --num-master3 個主要節點會建立高可用性叢集。Kafka 必須使用 Zookeeper 元件,而高可用性叢集已預先安裝該元件。
    • --enable-component-gateway:啟用 Managed Service for Apache Spark 元件閘道
    • BUCKET_NAME:Cloud Storage bucket 的名稱,內含 /scripts/kafka.sh 初始化指令碼 (請參閱「將 Kafka 安裝指令碼複製到 Cloud Storage」)。

建立 Kafka custdata 主題

如要在 Managed Service for Apache Spark Kafka 叢集上建立 Kafka 主題,請按照下列步驟操作:

  1. 使用 SSH 公用程式,在叢集主要執行個體 VM 上開啟終端機視窗。

  2. 建立 Kafka custdata 主題。

    /usr/lib/kafka/bin/kafka-topics.sh \
        --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
        --create --topic custdata
    

    注意:

    • KAFKA_CLUSTER:插入 Kafka 叢集的名稱。 -w-0:9092 代表在 worker-0 節點的 9092 連接埠上執行的 Kafka 代理程式。

    • 建立 custdata 主題後,您可以執行下列指令:

      # List all topics.
      /usr/lib/kafka/bin/kafka-topics.sh \
          --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
          --list
      
      # Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \     --broker-list KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --delete --topic custdata

將內容發布至 Kafka custdata 主題

下列指令碼會使用 kafka-console-producer.sh Kafka 工具,以 CSV 格式產生虛構的顧客資料。

  1. 複製指令碼,然後貼到 Kafka 叢集主節點的 SSH 終端機。按下 <return> 執行指令碼。

    for i in {1..10000}; do \
    custname="cust name${i}"
    uuid=$(dbus-uuidgen)
    age=$((45 + $RANDOM % 45))
    amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))")
    message="${uuid}:${custname},${age},${amount}"
    echo ${message}
    done | /usr/lib/kafka/bin/kafka-console-producer.sh \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata \
    --property "parse.key=true" \
    --property "key.separator=:"
    

    注意:

    • KAFKA_CLUSTER:Kafka 叢集的名稱。
  2. 執行下列 Kafka 指令,確認 custdata 主題包含 10,000 則訊息。

    /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata
    

    注意:

    • KAFKA_CLUSTER:Kafka 叢集的名稱。

    預期輸出內容:

    custdata:0:10000
    

在 Cloud Storage 中建立 Hive 資料表

建立 Hive 資料表,接收串流 Kafka 主題資料。 請按照下列步驟,在 Cloud Storage bucket 中建立 cust_parquet (parquet) 和 cust_orc (ORC) Hive 資料表。

  1. 在下列指令碼中插入 BUCKET_NAME,然後複製指令碼並貼到 Kafka 叢集主要執行個體的 SSH 終端機,接著按下 <return> 鍵,建立 ~/hivetables.hql (Hive 查詢語言) 指令碼。

    您會在下一個步驟中執行 ~/hivetables.hql 指令碼,在 Cloud Storage bucket 中建立 Parquet 和 ORC Hive 資料表。

    cat > ~/hivetables.hql <<EOF
    drop table if exists cust_parquet;
    create external table if not exists cust_parquet
    (uuid string, custname string, age string, amount string)
    row format delimited fields terminated by ','
    stored as parquet
    location "gs://BUCKET_NAME/tables/cust_parquet";
    

    drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
  2. 在 Kafka 叢集主要節點的 SSH 終端機中,提交 ~/hivetables.hql Hive 工作,在 Cloud Storage bucket 中建立 cust_parquet (parquet) 和 cust_orc (ORC) Hive 資料表。

    gcloud dataproc jobs submit hive \
        --cluster=KAFKA_CLUSTER \
        --region=REGION \
        -f ~/hivetables.hql
    

    注意:

    • Managed Service for Apache Spark Kafka 叢集已預先安裝 Hive 元件。如要查看最近發布的 2.1 映像檔內含的 Hive 元件版本清單,請參閱「2.1.x 發布版本」。
    • KAFKA_CLUSTER:Kafka 叢集的名稱。
    • REGION:Kafka 叢集所在的區域。

將 Kafka custdata 串流至 Hive 表格

  1. 在 Kafka 叢集主節點的 SSH 終端機中執行下列指令,安裝 kafka-python 程式庫。如要將 Kafka 主題資料串流至 Cloud Storage,需要 Kafka 用戶端。
    pip install kafka-python
    
  2. 插入 BUCKET_NAME,然後複製下列 PySpark 程式碼並貼到 Kafka 叢集主要執行個體的 SSH 終端機,然後按下 <return> 鍵,建立 streamdata.py 檔案。

    這個指令碼會訂閱 Kafka custdata 主題,然後將資料串流至 Cloud Storage 中的 Hive 資料表。輸出格式 (可以是 Parquet 或 ORC) 會以參數形式傳遞至指令碼。

    cat > streamdata.py <<EOF
    #!/bin/python
    
    import sys
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from kafka import KafkaConsumer
    
    def getNameFn (data): return data.split(",")[0]
    def getAgeFn  (data): return data.split(",")[1]
    def getAmtFn  (data): return data.split(",")[2]
    
    def main(cluster, outputfmt):
        spark = SparkSession.builder.appName("APP").getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        Logger = spark._jvm.org.apache.log4j.Logger
        logger = Logger.getLogger(__name__)
    
        rows = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \
        .option("startingOffsets", "earliest")\
        .load()
    
        getNameUDF = udf(getNameFn, StringType())
        getAgeUDF  = udf(getAgeFn,  StringType())
        getAmtUDF  = udf(getAmtFn,  StringType())
    
        logger.warn("Params passed in are cluster name: " + cluster + "  output format(sink): " + outputfmt)
    
        query = rows.select (col("key").cast("string").alias("uuid"),\
            getNameUDF      (col("value").cast("string")).alias("custname"),\
            getAgeUDF       (col("value").cast("string")).alias("age"),\
            getAmtUDF       (col("value").cast("string")).alias("amount"))
    
        writer = query.writeStream.format(outputfmt)\
                .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\
                .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \
            .outputMode("append")\
            .start()
    
        writer.awaitTermination()
    
    if __name__=="__main__":
        if len(sys.argv) < 2:
            print ("Invalid number of arguments passed ", len(sys.argv))
            print ("Usage: ", sys.argv[0], " cluster  format")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  orc")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  parquet")
        main(sys.argv[1], sys.argv[2])
    
    EOF
    
  3. 在 Kafka 叢集主要節點的 SSH 終端機中,執行 spark-submit,將資料串流至 Cloud Storage 中的 Hive 資料表。

    1. 插入 KAFKA_CLUSTER 名稱和輸出 FORMAT,然後將下列程式碼複製並貼到 Kafka 叢集主要節點的 SSH 終端機,然後按下 <return> 鍵執行程式碼,並以 Parquet 格式將 Kafka custdata 資料串流至 Cloud Storage 中的 Hive 資料表。

      spark-submit --packages \
      org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \
          --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
          --conf spark.driver.memory=4096m \
          --conf spark.executor.cores=2 \
          --conf spark.executor.instances=2 \
          --conf spark.executor.memory=6144m \
          streamdata.py KAFKA_CLUSTER FORMAT
          

      注意:

      • KAFKA_CLUSTER:插入 Kafka 叢集的名稱。
      • FORMAT:指定 parquetorc 做為輸出格式。您可以連續執行指令,將兩種格式的資料串流至 Hive 資料表。舉例來說,第一次叫用時,請指定 parquet,將 Kafka custdata 主題串流至 Hive parquet 資料表;第二次叫用時,請指定 orc 格式,將 custdata 串流至 Hive ORC 資料表。
  4. SSH 終端機停止標準輸出後,表示所有 custdata 都已串流,請在 SSH 終端機中按下 <control-c> 停止程序。

  5. 列出 Cloud Storage 中的 Hive 資料表。

    gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
    

    注意:

    • BUCKET_NAME:插入包含 Hive 資料表的 Cloud Storage bucket 名稱 (請參閱「建立 Hive 資料表」)。

查詢串流資料

  1. 在 Kafka 叢集主節點的 SSH 終端機中,執行下列 hive 指令,計算 Cloud Storage 中 Hive 表格的串流 Kafka custdata 訊息。

    hive -e "select count(1) from TABLE_NAME"
    

    注意:

    • TABLE_NAME:指定 cust_parquetcust_orc 做為 Hive 資料表名稱。

    預期的輸出片段:

...
Status: Running (Executing on YARN cluster with App id application_....)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
Reducer 2 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)

清除所用資源

刪除專案

    刪除 Google Cloud 專案:

    gcloud projects delete PROJECT_ID

刪除資源

  • 刪除 bucket:
    gcloud storage buckets delete BUCKET_NAME
  • 刪除 Kafka 叢集:
    gcloud dataproc clusters delete KAFKA_CLUSTER \
        --region=${REGION}