Apache Kafka 是開放原始碼的分散式串流平台,適用於即時資料管道和資料整合。這個系統提供有效率且可擴充的串流系統,適用於各種應用程式,包括:
- 即時分析
- 串流處理
- 記錄檔匯總
- 分散式訊息
- 活動串流
目標
在Managed Service for Apache Spark HA 叢集上安裝 Kafka,並搭配 ZooKeeper (在本教學課程中稱為「Managed Service for Apache Spark Kafka 叢集」)。
建立虛構的顧客資料,然後將資料發布至 Kafka 主題。
在 Cloud Storage 中建立 Hive Parquet 和 ORC 資料表,接收串流的 Kafka 主題資料。
提交 PySpark 工作,訂閱 Kafka 主題並以 Parquet 和 ORC 格式將資料串流至 Cloud Storage。
對串流 Hive 資料表資料執行查詢,計算串流 Kafka 訊息的數量。
費用
在本文件中,您會使用下列 Google Cloud的計費元件:
如要根據預測用量估算費用,請使用 Pricing Calculator。
完成本文所述工作後,您可以刪除建立的資源,避免繼續計費,詳情請參閱「清除所用資源」。
事前準備
建立 Google Cloud 專案 (如果尚未建立的話)。
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- 前往 Google Cloud 控制台的 Cloud Storage「Buckets」(值區) 頁面。
- 點選 「Create」(建立)。
- 在「建立 bucket」頁面中,輸入 bucket 資訊。如要前往下一個步驟,請按「繼續」。
- 在「開始使用」部分,執行下列操作:
-
在「Choose where to store your data」(選擇資料的儲存位置) 專區中,執行下列操作:
- 選取「位置類型」。
- 從「位置類型」下拉式選單中,選擇要永久儲存 bucket 資料的位置。
- 如果您選取「雙區域」位置類型,也可以使用相關核取方塊啟用強化型複製。
- 如要設定跨值區複製,請選取「透過 Storage 移轉服務新增跨值區複製作業」,然後按照下列步驟操作:
設定跨 bucket 複製作業
- 在「Bucket」選單中選取 bucket。
在「複製設定」部分,按一下「設定」,設定複製作業的設定。
系統隨即會顯示「設定跨 bucket 複製作業」窗格。
- 如要依物件名稱前置字串篩選要複製的物件,請輸入要納入或排除物件的前置字串,然後按一下「新增前置字串」。
- 如要為複製的物件設定儲存空間級別,請從「儲存空間級別」選單中選取儲存空間級別。如果略過這個步驟,複製的物件預設會使用目標值區的儲存空間級別。
- 按一下 [完成]。
- 在「選擇資料儲存方式」部分,執行下列操作:
- 在「選取如何控制物件的存取權」部分,選取 bucket 是否要強制執行禁止公開存取,並為 bucket 的物件選取存取控管方法。
-
在「選擇保護物件資料的方式」部分,執行下列操作:
- 選取「資料保護」下方的任何選項,為 bucket 設定所需項目。
- 如要啟用虛刪除,請按一下「虛刪除政策 (用於資料復原)」核取方塊,並指定要保留物件的天數 (刪除後)。
- 如要設定「物件版本管理」,請按一下「物件版本管理 (用於版本管控)」核取方塊,並指定每個物件的版本數量上限,以及非現行版本失效的天數。
- 如要為物件和 bucket 啟用資料保留政策,請勾選「保留 (符合法規)」核取方塊,然後執行下列操作:
- 如要啟用 Object Retention Lock,請按一下「啟用物件保留功能」核取方塊。
- 如要啟用 Bucket Lock,請勾選「Set bucket retention policy」(設定值區資料保留政策) 核取方塊,然後選擇保留期限的時間單位和長度。
- 如要選擇物件資料的加密方式,請展開「資料加密」部分 (),然後選取「資料加密」方法。
- 選取「資料保護」下方的任何選項,為 bucket 設定所需項目。
- 點選「建立」。
教學課程步驟
請按照下列步驟建立 Managed Service for Apache Spark Kafka 叢集,將 Kafka 主題讀取至 Cloud Storage,並採用 Parquet 或 ORC 格式。
將 Kafka 安裝指令碼複製到 Cloud Storage
kafka.sh 初始化動作指令碼會在 Managed Service for Apache Spark 叢集上安裝 Kafka。
瀏覽程式碼。
將
kafka.sh初始化動作 指令碼複製到 Cloud Storage bucket。 這項指令碼會在 Managed Service for Apache Spark 叢集上安裝 Kafka。開啟 Cloud Shell,然後執行下列指令:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
請替換下列項目:
- REGION:
kafka.sh儲存在 Cloud Storage 中公開的區域標記值區。指定地理位置接近的 Compute Engine 區域 (例如:us-central1)。 - BUCKET_NAME:Cloud Storage bucket 的名稱。
- REGION:
建立 Managed Service for Apache Spark Kafka 叢集
開啟 Cloud Shell,然後執行下列
gcloud dataproc clusters create指令,建立 Managed Service for Apache Spark HA 叢集,並安裝 Kafka 和 ZooKeeper 元件:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
注意:
- KAFKA_CLUSTER:叢集名稱,在專案內不得重複。名稱開頭須為小寫英文字母,最多包含 51 個小寫英文字母、數字和連字號,而且結尾不得為連字號。可以重複使用已刪除叢集的名稱。
- PROJECT_ID:要與這個叢集建立關聯的專案。
- REGION:叢集所在的 Compute Engine 區域,例如
us-central1。- 您可以新增選用的
--zone=ZONE旗標,在指定區域內指定區域,例如us-central1-a。如未指定可用區,Apache Spark 的受管理服務會透過自動可用區放置功能,在指定區域中選取可用區。
- 您可以新增選用的
--image-version:建議在本教學課程中使用 Managed Service for Apache Spark 映像檔版本2.1-debian11。注意:每個映像檔版本都包含一組預先安裝的元件,包括本教學課程中使用的 Hive 元件 (請參閱「支援的 Managed Service for Apache Spark 映像檔版本」)。--num-master:3個主要節點會建立高可用性叢集。Kafka 必須使用 Zookeeper 元件,而高可用性叢集已預先安裝該元件。--enable-component-gateway:啟用 Managed Service for Apache Spark 元件閘道。- BUCKET_NAME:Cloud Storage bucket 的名稱,內含
/scripts/kafka.sh初始化指令碼 (請參閱「將 Kafka 安裝指令碼複製到 Cloud Storage」)。
建立 Kafka custdata 主題
如要在 Managed Service for Apache Spark Kafka 叢集上建立 Kafka 主題,請按照下列步驟操作:
使用 SSH 公用程式,在叢集主要執行個體 VM 上開啟終端機視窗。
建立 Kafka
custdata主題。/usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
注意:
KAFKA_CLUSTER:插入 Kafka 叢集的名稱。
-w-0:9092代表在worker-0節點的9092連接埠上執行的 Kafka 代理程式。建立
custdata主題後,您可以執行下列指令:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
將內容發布至 Kafka custdata 主題
下列指令碼會使用 kafka-console-producer.sh Kafka 工具,以 CSV 格式產生虛構的顧客資料。
複製指令碼,然後貼到 Kafka 叢集主節點的 SSH 終端機。按下 <return> 執行指令碼。
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"注意:
- KAFKA_CLUSTER:Kafka 叢集的名稱。
執行下列 Kafka 指令,確認
custdata主題包含 10,000 則訊息。/usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
注意:
- KAFKA_CLUSTER:Kafka 叢集的名稱。
預期輸出內容:
custdata:0:10000
在 Cloud Storage 中建立 Hive 資料表
建立 Hive 資料表,接收串流 Kafka 主題資料。
請按照下列步驟,在 Cloud Storage bucket 中建立 cust_parquet (parquet) 和 cust_orc (ORC) Hive 資料表。
在下列指令碼中插入 BUCKET_NAME,然後複製指令碼並貼到 Kafka 叢集主要執行個體的 SSH 終端機,接著按下 <return> 鍵,建立
~/hivetables.hql(Hive 查詢語言) 指令碼。您會在下一個步驟中執行
~/hivetables.hql指令碼,在 Cloud Storage bucket 中建立 Parquet 和 ORC Hive 資料表。cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
在 Kafka 叢集主要節點的 SSH 終端機中,提交
~/hivetables.hqlHive 工作,在 Cloud Storage bucket 中建立cust_parquet(parquet) 和cust_orc(ORC) Hive 資料表。gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
注意:
- Managed Service for Apache Spark Kafka 叢集已預先安裝 Hive 元件。如要查看最近發布的 2.1 映像檔內含的 Hive 元件版本清單,請參閱「2.1.x 發布版本」。
- KAFKA_CLUSTER:Kafka 叢集的名稱。
- REGION:Kafka 叢集所在的區域。
將 Kafka custdata 串流至 Hive 表格
- 在 Kafka 叢集主節點的 SSH 終端機中執行下列指令,安裝
kafka-python程式庫。如要將 Kafka 主題資料串流至 Cloud Storage,需要 Kafka 用戶端。pip install kafka-python
插入 BUCKET_NAME,然後複製下列 PySpark 程式碼並貼到 Kafka 叢集主要執行個體的 SSH 終端機,然後按下 <return> 鍵,建立
streamdata.py檔案。這個指令碼會訂閱 Kafka
custdata主題,然後將資料串流至 Cloud Storage 中的 Hive 資料表。輸出格式 (可以是 Parquet 或 ORC) 會以參數形式傳遞至指令碼。cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF在 Kafka 叢集主要節點的 SSH 終端機中,執行
spark-submit,將資料串流至 Cloud Storage 中的 Hive 資料表。插入 KAFKA_CLUSTER 名稱和輸出 FORMAT,然後將下列程式碼複製並貼到 Kafka 叢集主要節點的 SSH 終端機,然後按下 <return> 鍵執行程式碼,並以 Parquet 格式將 Kafka
custdata資料串流至 Cloud Storage 中的 Hive 資料表。spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT注意:
- KAFKA_CLUSTER:插入 Kafka 叢集的名稱。
- FORMAT:指定
parquet或orc做為輸出格式。您可以連續執行指令,將兩種格式的資料串流至 Hive 資料表。舉例來說,第一次叫用時,請指定parquet,將 Kafkacustdata主題串流至 Hive parquet 資料表;第二次叫用時,請指定orc格式,將custdata串流至 Hive ORC 資料表。
SSH 終端機停止標準輸出後,表示所有
custdata都已串流,請在 SSH 終端機中按下 <control-c> 停止程序。列出 Cloud Storage 中的 Hive 資料表。
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
注意:
- BUCKET_NAME:插入包含 Hive 資料表的 Cloud Storage bucket 名稱 (請參閱「建立 Hive 資料表」)。
查詢串流資料
在 Kafka 叢集主節點的 SSH 終端機中,執行下列
hive指令,計算 Cloud Storage 中 Hive 表格的串流 Kafkacustdata訊息。hive -e "select count(1) from TABLE_NAME"
注意:
- TABLE_NAME:指定
cust_parquet或cust_orc做為 Hive 資料表名稱。
預期的輸出片段:
- TABLE_NAME:指定
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
清除所用資源
刪除專案
刪除 Google Cloud 專案:
gcloud projects delete PROJECT_ID
刪除資源
-
刪除 bucket:
gcloud storage buckets delete BUCKET_NAME
- 刪除 Kafka 叢集:
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}