Apache Kafka 是一个开源分布式流式传输平台,用于实时数据流水线和数据集成。它提供了一个高效且可扩缩的流式传输系统,可用于各种应用,包括:
- 实时分析
- 流处理
- 日志汇总
- 分布式消息传递
- 事件流处理
教程步骤
请执行以下步骤创建 Dataproc Kafka 集群,以 Parquet 或 ORC 格式将 Kafka 主题读入 Cloud Storage。
将 Kafka 安装脚本复制到 Cloud Storage
kafka.sh
初始化操作脚本会在 Dataproc 集群上安装 Kafka。
浏览代码。
将
kafka.sh
初始化操作脚本复制到 Cloud Storage 存储桶中。此脚本会在 Dataproc 集群上安装 Kafka。打开 Cloud Shell,然后运行以下命令:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
进行以下替换:
- REGION:
kafka.sh
存储在 Cloud Storage 中带有区域标记的公共存储桶。指定在地理位置上靠近的 Compute Engine 区域(例如:us-central1
)。 - BUCKET_NAME - Cloud Storage 存储桶的名称。
- REGION:
创建 Dataproc Kafka 集群
打开 Cloud Shell,然后运行以下
gcloud dataproc clusters create
命令,以创建用于安装 Kafka 和 ZooKeeper 组件的 Dataproc 高可用性集群:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
注意:
- KAFKA_CLUSTER:集群名称(在项目中必须是唯一的)。该名称必须以小写字母开头,最多可包含 51 个小写字母、数字和连字符,不能以连字符结尾。已删除集群的名称可以再次使用。
- PROJECT_ID:要与此集群关联的项目。
- REGION:集群所在的 Compute Engine 区域,例如
us-central1
。- 您可以添加可选的
--zone=ZONE
标志,以在指定区域内指定可用区,例如us-central1-a
。如果您未指定可用区,Dataproc 自动选择可用区功能会在指定区域内选择可用区。
- 您可以添加可选的
--image-version
:本教程建议使用 Dataproc 映像版本2.1-debian11
。注意:每个映像版本都包含一组预安装的组件,包括本教程中使用的 Hive 组件(请参阅支持的 Dataproc 映像版本)。--num-master
:3
主节点用于创建高可用性集群。Kafka 所需的 Zookeeper 组件已预安装在高可用性集群上。--enable-component-gateway
:启用 Dataproc 组件网关。- BUCKET_NAME:包含
/scripts/kafka.sh
初始化脚本的 Cloud Storage 存储桶的名称(请参阅将 Kafka 安装脚本复制到 Cloud Storage)。
创建 Kafka custdata
主题
如需在 Dataproc Kafka 集群上创建 Kafka 主题,请执行以下操作:
使用 SSH 实用程序在集群主虚拟机上打开终端窗口。
创建 Kafka
custdata
主题。/usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
注意:
KAFKA_CLUSTER:插入 Kafka 集群的名称。
-w-0:9092
表示在worker-0
节点的端口9092
上运行的 Kafka 代理。创建
custdata
主题后,您可以运行以下命令:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
将内容发布到 Kafka custdata
主题
以下脚本使用 kafka-console-producer.sh
Kafka 工具生成 CSV 格式的虚构客户数据。
将该脚本复制并其粘贴到 Kafka 集群主节点上的 SSH 终端。按 <return> 运行脚本。
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
注意:
- KAFKA_CLUSTER:Kafka 集群的名称。
运行以下 Kafka 命令,以确认
custdata
主题包含 10,000 条消息。/usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
注意:
- KAFKA_CLUSTER:Kafka 集群的名称。
预期输出:
custdata:0:10000
在 Cloud Storage 中创建 Hive 表
创建 Hive 表以接收流式传输的 Kafka 主题数据。
执行以下步骤,以在 Cloud Storage 存储桶中创建 cust_parquet
(Parquet) 和 cust_orc
(ORC) Hive 表。
在以下脚本中插入 BUCKET_NAME,将该脚本复制并粘贴到 Kafka 集群主节点上的 SSH 终端,然后按 <return> 以创建
~/hivetables.hql
(Hive 查询语言)脚本。您将在下一步中运行
~/hivetables.hql
脚本,以在 Cloud Storage 存储桶中创建 Parquet 和 ORC Hive 表。cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
在 Kafka 集群主节点上的 SSH 终端中,提交
~/hivetables.hql
Hive 作业,以在 Cloud Storage 存储桶中创建cust_parquet
(Parquet) 和cust_orc
(ORC) Hive 表。gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
注意:
- Hive 组件已预安装在 Dataproc Kafka 集群上。如需查看最近发布的 2.1 映像中包含的 Hive 组件版本列表,请参阅 2.1.x 发布版本。
- KAFKA_CLUSTER:Kafka 集群的名称。
- REGION:Kafka 集群所在的区域。
将 Kafka custdata
流式传输到 Hive 表
- 在 Kafka 集群的主节点上的 SSH 终端中运行以下命令,以安装
kafka-python
库。需要使用 Kafka 客户端才能将 Kafka 主题数据流式传输到 Cloud Storage。pip install kafka-python
插入 BUCKET_NAME,将以下 PySpark 代码复制并粘贴到 Kafka 集群主节点上的 SSH 终端,然后按 <return> 以创建
streamdata.py
文件。该脚本会订阅 Kafka
custdata
主题,然后将数据流式传输到 Cloud Storage 中的 Hive 表。输出格式(可以是 Parquet 或 ORC)作为参数传递到脚本中。cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
在 Kafka 集群主节点上的 SSH 终端中,运行
spark-submit
以将数据流式传输到 Cloud Storage 中的 Hive 表。插入 KAFKA_CLUSTER 的名称和输出 FORMAT,将以下代码复制并粘贴到 Kafka 集群主节点上的 SSH 终端,然后按 <return> 运行代码,并将 Kafka
custdata
数据以 Parquet 格式流式传输到 Cloud Storage 中的 Hive 表。spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
注意:
- KAFKA_CLUSTER:插入 Kafka 集群的名称。
- FORMAT:指定
parquet
或orc
作为输出格式。您可以连续运行该命令,以将这两种格式的数据流式传输到 Hive 表中:例如,在第一次调用中,指定parquet
以将 Kafkacustdata
主题流式传输到 Hive Parquet 表;然后在第二次调用中,指定orc
格式以将custdata
流式传输到 Hive ORC 表。
标准输出在 SSH 终端中停止后(这表示所有
custdata
都已流式传输),请在 SSH 终端中按 <control-c> 以停止流程。列出 Cloud Storage 中的 Hive 表。
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
注意:
- BUCKET_NAME:插入包含 Hive 表的 Cloud Storage 存储桶的名称(请参阅创建 Hive 表)。
查询流式传输的数据
在 Kafka 集群主节点上的 SSH 终端中,运行以下
hive
命令,以统计 Cloud Storage 的 Hive 表中流式传输的 Kafkacustdata
消息的数量。hive -e "select count(1) from TABLE_NAME"
注意:
- TABLE_NAME:将
cust_parquet
或cust_orc
指定为 Hive 表名称。
预期输出代码段:
- TABLE_NAME:将
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)