Apache Kafka は、リアルタイム データ パイプラインとデータ統合用のオープンソースの分散型ストリーミング プラットフォームです。次のようなさまざまなアプリケーションで使用する、効率的でスケーラブルなストリーミング システムを提供します。
- リアルタイム分析
- ストリーム処理
- ログ集計
- 分散メッセージング
- イベント ストリーミング
チュートリアルのステップ
次の手順を実行して、Dataproc Kafka クラスタを作成し、Kafka トピックを Parquet または ORC 形式で Cloud Storage に読み込みます。
Kafka インストール スクリプトを Cloud Storage にコピーする
kafka.sh 初期化アクション スクリプトは、Kafka を Dataproc クラスタにインストールします。
コードを参照します。
kafka.sh初期化アクション スクリプトを Cloud Storage バケットにコピーします。このスクリプトは、Kafka を Dataproc クラスタにインストールします。Cloud Shell を開き、次のコマンドを実行します。
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
次のように置き換えます。
- REGION:
kafka.shは、Cloud Storage のリージョンタグ付き公開バケットに保存されます。地理的に近い Compute Engine リージョンを指定します(例:us-central1)。 - BUCKET_NAME: Cloud Storage バケットの名前。
- REGION:
Dataproc Kafka クラスタを作成する
Cloud Shell を開き、次の
gcloud dataproc clusters createコマンドを実行して、Kafka と ZooKeeper コンポーネントをインストールする Dataproc HA クラスタを作成します。gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
注:
- KAFKA_CLUSTER: クラスタ名。プロジェクト内で一意にする必要があります。名前は先頭を小文字にする必要があり、51 文字以下の小文字、数字、ハイフンを使用できます。末尾をハイフンにすることはできません。削除されたクラスタの名前は再利用できます。
- PROJECT_ID: このクラスタに関連付けるプロジェクト。
- REGION: クラスタが配置される Compute Engine のリージョン(
us-central1など)。- オプションの
--zone=ZONEフラグを追加して、指定されたリージョン内のゾーン(us-central1-aなど)を指定できます。ゾーンを指定しない場合、Dataproc の自動ゾーン プレースメント機能により、指定されたリージョン内のゾーンが選択されます。
- オプションの
--image-version: このチュートリアルでは、Dataproc イメージ バージョン2.1-debian11をおすすめします。注: 各イメージ バージョンには、このチュートリアルで使用される Hive コンポーネントなど、一連のプリインストール コンポーネントが含まれています(サポートされている Dataproc イメージ バージョンを参照)。--num-master:3個のマスターノードが HA クラスタを作成します。Kafka に必要な Zookeeper コンポーネントは HA クラスタにプリインストールされています。--enable-component-gateway: Dataproc コンポーネント ゲートウェイを有効にします。- BUCKET_NAME:
/scripts/kafka.sh初期化スクリプトを含む Cloud Storage バケットの名前(Kafka インストール スクリプトを Cloud Storage にコピーするを参照)。
Kafka custdata トピックを作成する
Dataproc Kafka クラスタに Kafka トピックを作成するには:
SSH ユーティリティを使用して、クラスタ マスター VM でターミナル ウィンドウを開きます。
Kafka
custdataトピックを作成します。/usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
注:
KAFKA_CLUSTER: Kafka クラスタの名前を挿入します。
-w-0:9092は、worker-0ノードのポート9092で実行されている Kafka ブローカーを表します。custdataトピックの作成後は、次のコマンドを実行できます。# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Kafka custdata トピックにコンテンツを公開する
次のスクリプトでは、kafka-console-producer.sh Kafka ツールを使用して架空の顧客データを CSV 形式で生成します。
スクリプトをコピーして、Kafka クラスタのマスターノードの SSH ターミナルに貼り付けます。<return> を押してスクリプトを実行します。
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"注:
- KAFKA_CLUSTER: Kafka クラスタの名前。
次の Kafka コマンドを実行して、
custdataトピックに 10,000 件のメッセージが含まれていることを確認します。/usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
注:
- KAFKA_CLUSTER: Kafka クラスタの名前。
予想される出力:
custdata:0:10000
Cloud Storage に Hive テーブルを作成する
ストリーミングされた Kafka トピックデータを受信する Hive テーブルを作成します。次の手順を実行して、Cloud Storage バケットに cust_parquet(Parquet)と cust_orc(ORC)の Hive テーブルを作成します。
BUCKET_NAME を次のスクリプトに挿入し、スクリプトをコピーして Kafka クラスタ マスターノードの SSH ターミナルに貼り付けてから、Enter キーを押して
~/hivetables.hql(Hive クエリ言語)スクリプトを作成します。次の手順で
~/hivetables.hqlスクリプトを実行して、Cloud Storage バケットに Parquet と ORC 形式の Hive テーブルを作成します。cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
Kafka クラスタのマスターノードの SSH ターミナルで、
~/hivetables.hqlHive ジョブを送信して、Cloud Storage バケットにcust_parquet(Parquet)とcust_orc(ORC)の Hive テーブルを作成します。gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
注:
- Hive コンポーネントは Dataproc Kafka クラスタにプリインストールされています。最近リリースされた 2.1 イメージに含まれる Hive コンポーネント バージョンの一覧については、2.1.x リリース バージョンをご覧ください。
- KAFKA_CLUSTER: Kafka クラスタの名前。
- REGION: Kafka クラスタが配置されているリージョン。
Kafka custdata を Hive テーブルにストリーミングする
- Kafka クラスタのマスターノードの SSH ターミナルで次のコマンドを実行して、
kafka-pythonライブラリをインストールします。Kafka クライアントは、Kafka トピックデータを Cloud Storage にストリーミングするのに必要です。pip install kafka-python
BUCKET_NAME を挿入し、次の PySpark コードをコピーして Kafka クラスタ マスターノードの SSH ターミナルに貼り付けてから、Enter キーを押して
streamdata.pyファイルを作成します。このスクリプトは、Kafka
custdataトピックをサブスクライブし、データを Cloud Storage の Hive テーブルにストリーミングします。出力形式(Parquet または ORC)は、パラメータとしてスクリプトに渡されます。cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOFKafka クラスタのマスターノードの SSH ターミナルで
spark-submitを実行して、Cloud Storage の Hive テーブルにデータをストリーミングします。KAFKA_CLUSTER の名前と出力 FORMAT を挿入し、次のコードをコピーして Kafka クラスタのマスターノードの SSH ターミナルに貼り付け、Enter キーを押してコードを実行し、Kafka
custdataデータを Parquet 形式で Cloud Storage の Hive テーブルにストリーミングします。spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT注:
- KAFKA_CLUSTER: Kafka クラスタの名前を挿入します。
- FORMAT: 出力形式として
parquetまたはorcを指定します。コマンドを連続して実行して、両方の形式を Hive テーブルにストリーミングできます。たとえば、最初の呼び出しではparquetを指定して Kafkacustdataトピックを Hive Parquet テーブルにストリーミングし、2 番目の呼び出しではorc形式を指定してcustdataを Hive ORC テーブルにストリーミングします。
標準出力が SSH ターミナルで停止したら(これは、すべての
custdataがストリーミングされたことを示します)、SSH ターミナルで Ctrl+C キーを押してプロセスを停止します。Cloud Storage 内の Hive テーブルを一覧表示します。
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
注:
- BUCKET_NAME: Hive テーブルを含む Cloud Storage バケットの名前を挿入します(Hive テーブルを作成するを参照)。
ストリーミング データをクエリする
Kafka クラスタのマスターノードの SSH ターミナルで、次の
hiveコマンドを実行して、Cloud Storage の Hive テーブルにストリーミングされた Kafkacustdataメッセージをカウントします。hive -e "select count(1) from TABLE_NAME"
注:
- TABLE_NAME: Hive テーブル名として
cust_parquetまたはcust_orcを指定します。
想定される出力スニペット:
- TABLE_NAME: Hive テーブル名として
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)