Apache Kafka 是開放原始碼的分散式串流平台,適用於即時資料管道和資料整合。這項技術提供有效率且可擴充的串流系統,適用於各種應用程式,包括:
- 即時分析
- 串流處理
- 記錄檔匯總
- 分散式訊息
- 活動串流
教學課程步驟
請按照下列步驟建立 Dataproc Kafka 叢集,將 Kafka 主題讀取至 Cloud Storage,並採用 Parquet 或 ORC 格式。
將 Kafka 安裝指令碼複製到 Cloud Storage
kafka.sh
初始化動作指令碼會在 Dataproc 叢集上安裝 Kafka。
瀏覽程式碼。
將
kafka.sh
初始化動作 指令碼複製到 Cloud Storage bucket。 這項指令碼會為 Dataproc 叢集安裝 Kafka。開啟 Cloud Shell,然後執行下列指令:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
請將下列項目改為對應的值:
- REGION:
kafka.sh
儲存在 Cloud Storage 中標記區域的公開值區。指定地理位置接近的 Compute Engine 區域 (例如:us-central1
)。 - BUCKET_NAME:Cloud Storage bucket 的名稱。
- REGION:
建立 Dataproc Kafka 叢集
開啟 Cloud Shell,然後執行下列
gcloud dataproc clusters create
指令,建立安裝 Kafka 和 ZooKeeper 元件的 Dataproc 高可用性叢集:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
注意:
- KAFKA_CLUSTER:叢集名稱,在專案中不得重複。 名稱開頭須為小寫英文字母,最多可包含 51 個小寫英文字母、數字和連字號。結尾不得為連字號。您可以重複使用已刪除叢集的名稱。
- PROJECT_ID:要與這個叢集建立關聯的專案。
- REGION:叢集所在的 Compute Engine 區域,例如
us-central1
。- 您可以新增選用的
--zone=ZONE
旗標,在指定區域內指定區域,例如us-central1-a
。如未指定區域,Dataproc 自動選擇放置區域功能會選取指定地區中的區域。
- 您可以新增選用的
--image-version
:建議在本教學課程中使用 Dataproc 映像檔版本2.1-debian11
。注意:每個映像檔版本都包含一組預先安裝的元件,包括本教學課程中使用的 Hive 元件 (請參閱「支援的 Dataproc 映像檔版本」)。--num-master
:3
個主節點會建立高可用性叢集。Kafka 需要 Zookeeper 元件,因此系統會在 HA 叢集上預先安裝該元件。--enable-component-gateway
:啟用 Dataproc 元件閘道。- BUCKET_NAME:Cloud Storage bucket 的名稱,其中包含
/scripts/kafka.sh
初始化指令碼 (請參閱「將 Kafka 安裝指令碼複製到 Cloud Storage」)。
建立 Kafka custdata
主題
如要在 Dataproc Kafka 叢集上建立 Kafka 主題,請按照下列步驟操作:
使用 SSH 公用程式,在叢集主 VM 上開啟終端機視窗。
建立 Kafka
custdata
主題。/usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
注意:
KAFKA_CLUSTER:插入 Kafka 叢集的名稱。
-w-0:9092
代表在worker-0
節點的9092
連接埠上執行的 Kafka 代理程式。建立
custdata
主題後,您可以執行下列指令:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
將內容發布至 Kafka custdata
主題
下列指令碼使用 kafka-console-producer.sh
Kafka 工具,以 CSV 格式產生虛構的顧客資料。
複製指令碼,然後貼到 Kafka 叢集主要節點的 SSH 終端機。按下 <return> 執行指令碼。
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
注意:
- KAFKA_CLUSTER:Kafka 叢集的名稱。
執行下列 Kafka 指令,確認
custdata
主題包含 10,000 則訊息。/usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
注意:
- KAFKA_CLUSTER:Kafka 叢集的名稱。
預期輸出內容:
custdata:0:10000
在 Cloud Storage 中建立 Hive 資料表
建立 Hive 資料表,接收串流 Kafka 主題資料。
請按照下列步驟,在 Cloud Storage bucket 中建立 cust_parquet
(parquet) 和 cust_orc
(ORC) Hive 資料表。
在下列指令碼中插入 BUCKET_NAME,然後複製指令碼並貼到 Kafka 叢集主要節點的 SSH 終端機,接著按下 <return> 鍵,建立
~/hivetables.hql
(Hive 查詢語言) 指令碼。您會在下一個步驟中執行
~/hivetables.hql
指令碼,在 Cloud Storage bucket 中建立 Parquet 和 ORC Hive 資料表。cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
在 Kafka 叢集主要節點的 SSH 終端機中,提交
~/hivetables.hql
Hive 工作,在 Cloud Storage 值區中建立cust_parquet
(parquet) 和cust_orc
(ORC) Hive 資料表。gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
注意:
- Dataproc Kafka 叢集已預先安裝 Hive 元件。如要查看最近發布的 2.1 映像檔中包含的 Hive 元件版本清單,請參閱「2.1.x 版本」。
- KAFKA_CLUSTER:Kafka 叢集的名稱。
- REGION:Kafka 叢集所在的區域。
將 Kafka custdata
串流至 Hive 資料表
- 在 Kafka 叢集主要節點的 SSH 終端機中執行下列指令,安裝
kafka-python
程式庫。如要將 Kafka 主題資料串流至 Cloud Storage,必須使用 Kafka 用戶端。pip install kafka-python
插入 BUCKET_NAME,然後複製下列 PySpark 程式碼並貼到 Kafka 叢集主要節點的 SSH 終端機,然後按下 <return> 鍵建立
streamdata.py
檔案。這個指令碼會訂閱 Kafka
custdata
主題,然後將資料串流至 Cloud Storage 中的 Hive 資料表。輸出格式 (可以是 Parquet 或 ORC) 會以參數形式傳遞至指令碼。cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
在 Kafka 叢集主要節點的 SSH 終端機中,執行
spark-submit
,將資料串流至 Cloud Storage 中的 Hive 資料表。插入 KAFKA_CLUSTER 的名稱和輸出 FORMAT,然後將下列程式碼複製並貼到 Kafka 叢集主要節點的 SSH 終端機,然後按下 <return> 執行程式碼,並以 Parquet 格式將 Kafka
custdata
資料串流至 Cloud Storage 中的 Hive 資料表。spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
注意:
- KAFKA_CLUSTER:插入 Kafka 叢集的名稱。
- FORMAT:指定
parquet
或orc
做為輸出格式。您可以連續執行指令,將兩種格式的資料串流至 Hive 資料表。舉例來說,第一次叫用時,請指定parquet
,將 Kafkacustdata
主題串流至 Hive parquet 資料表;第二次叫用時,請指定orc
格式,將custdata
串流至 Hive ORC 資料表。
SSH 終端機停止標準輸出後,表示所有
custdata
都已串流,請在 SSH 終端機中按下 <control-c> 停止程序。列出 Cloud Storage 中的 Hive 資料表。
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
注意:
- BUCKET_NAME:插入包含 Hive 資料表的 Cloud Storage bucket 名稱 (請參閱「建立 Hive 資料表」)。
查詢串流資料
在 Kafka 叢集主要節點的 SSH 終端機中,執行下列
hive
指令,計算 Cloud Storage 中 Hive 表格的串流 Kafkacustdata
訊息。hive -e "select count(1) from TABLE_NAME"
注意:
- TABLE_NAME:指定
cust_parquet
或cust_orc
做為 Hive 資料表名稱。
預期的輸出內容片段:
- TABLE_NAME:指定
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)