Apache Kafka は、リアルタイム データ パイプラインとデータ統合用のオープンソースの配信ストリーミング プラットフォームです。次のようなさまざまなアプリケーションで使用する、効率的でスケーラブルなストリーミング システムを提供します。
- リアルタイム分析
- ストリーム処理
- ログ集計
- 分散メッセージング
- イベント ストリーミング
目標
ZooKeeper で Managed Service for Apache Spark HA クラスタ に Kafka をインストールします(このチュートリアルでは「Managed Service for Apache Spark Kafka クラスタ」と呼びます)。
架空の顧客データを作成し、データを Kafka トピックに公開します。
Cloud Storage に Hive Parquet テーブルおよび ORC テーブルを作成して、ストリーミングされた Kafka トピックデータを受信します。
PySpark ジョブを送信して、Kafka トピックをサブスクライブして Cloud Storage に Parquet および ORC 形式でストリーミングします。
ストリーミングされた Hive テーブルデータに対してクエリを実行して、ストリーミングされた Kafka メッセージをカウントします。
費用
このドキュメントでは、課金対象である次のコンポーネントを使用します。 Google Cloud
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
まだ作成していない場合は、 Google Cloud プロジェクトを作成します。
- アカウントにログインします Google Cloud を初めて使用する場合は、 アカウントを作成して、実際のシナリオで Google プロダクトのパフォーマンスを評価してください。 Google Cloud新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- コンソールで Cloud Storage の Google Cloud [**バケット**] ページに移動します。
- [ [Create]] をクリックします。
- [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
- [スタートガイド] セクションで、次の操作を行います。
-
[データの保存場所の選択] セクションで、次の操作を行います。
- ロケーション タイプを選択してください。
- [Location type] プルダウン メニューから、バケットのデータが永続的に保存されるロケーションを選択します。
- ロケーション タイプとして [デュアルリージョン] を選択した場合は、関連するチェックボックスを使用して [ターボ レプリケーション] を有効にすることもできます。
- クロスバケット レプリケーションを設定するには、
[Storage Transfer Service 経由でクロスバケット レプリケーションを追加する] を選択し、
次の手順を実施します:
クロスバケット レプリケーションを設定する
- [バケット] メニューで、バケットを選択します。
[レプリケーション設定] セクションで、[構成] をクリックして、レプリケーション ジョブの設定を構成します。
[**クロスバケット レプリケーションを構成する**] ペインが表示されます。
- オブジェクト名の接頭辞で複製するオブジェクトをフィルタするには、 オブジェクトを追加または除外する接頭辞を入力し、 [接頭辞を追加] をクリックします。
- 複製されたオブジェクトのストレージ クラスを設定するには、 [Storage class] メニューからストレージ クラスを選択します。 この手順をスキップすると、複製されたオブジェクトはデフォルトで宛先バケットのストレージ クラスを使用します。
- [完了] をクリックします。
-
[データの保存方法を選択する] セクションで、次の操作を行います。
- バケットのデフォルトのストレージ クラスを選択するか、バケットデータのストレージ クラスを自動的に管理するAutoclassを選択します。
- 階層名前空間を有効にするには、 [データ量が多いワークロード向けにストレージを最適化] セクションで、 [このバケットで階層的な名前空間を有効にする] を選択します。
- In the [オブジェクトへのアクセスを制御する方法を選択する] セクションで、バケットに 公開アクセスの防止 を適用するかどうかを選択し、バケットのオブジェクトに使用する アクセス制御方法 を選択します。
-
[オブジェクト データを保護する方法を選択する] セクションで、次の操作を行います。
- [**データ保護**] で、バケットに設定するオプションを選択します。
- 削除(復元可能)を有効にするには、 [削除(復元可能)ポリシー(データ復旧用)] チェックボックスをオンにして、 削除後にオブジェクトを保持する日数を指定します。
- オブジェクトのバージョニングを設定するには、 [オブジェクトのバージョニング(バージョン管理用)] チェックボックスをオンにして、 オブジェクトごとの最大バージョン数と、非現行バージョンが期限切れになるまでの日数を指定します。
- オブジェクトとバケットの保持ポリシーを有効にするには、[保持(コンプライアンス用)] チェックボックスをオンにして、次の操作を行います。
- [オブジェクト保持ロック]を有効にするには、 [オブジェクト保持を有効にする]チェックボックスをオンにします。
- [Bucket Lock] を有効にするには、[バケット保持ポリシーを設定する] チェックボックスをオンにして、保持期間の単位と保持期間を選択します。
- オブジェクト データの暗号化方法を選択するには、 [データ暗号化] セクション()を開き、 [データの暗号化] 方法を選択します。
- [**データ保護**] で、バケットに設定するオプションを選択します。
- [作成] をクリックします。
チュートリアルのステップ
次の手順を実行して、Managed Service for Apache Spark Kafka クラスタを作成し、Kafka トピックをパーケットまたは ORC 形式で Cloud Storage に読み込みます。
Kafka インストール スクリプトを Cloud Storage にコピーする
kafka.sh 初期化アクション
スクリプトは、Kafka を Managed Service for Apache Spark クラスタにインストールします。
コードを参照します。
kafka.sh初期化アクション スクリプトを Cloud Storage バケットにコピーします。 このスクリプトは、Kafka を Managed Service for Apache Spark クラスタにインストールします。Cloud Shell を開き、次のコマンドを実行します。
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
次のように置き換えます。
- REGION:
kafka.shは、Cloud Storage のリージョンタグ付き公開バケットに保存されます。地理的に近い Compute Engine リージョンを指定します(例:us-central1)。 - BUCKET_NAME: Cloud Storage バケットの名前。
- REGION:
Managed Service for Apache Spark Kafka クラスタを作成する
Cloud Shell を開き、次の
gcloud dataproc clusters createコマンドを実行して、Kafka と ZooKeeper コンポーネントをインストールする Managed Service for Apache SparkHA クラスタを作成します。gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
注:
- KAFKA_CLUSTER: クラスタ名。プロジェクト内で一意にする必要があります。名前は先頭を小文字にする必要があり、51 文字以下の小文字、数字、ハイフンを使用できます。末尾をハイフンにすることはできません。削除されたクラスタの名前は再利用できます。
- PROJECT_ID: このクラスタに関連付けるプロジェクト。
- REGION: クラスタが配置される Compute Engine のリージョン(
us-central1など)。- オプションの
--zone=ZONEフラグ を追加して、指定されたリージョン内のゾーン(us-central1-aなど)を指定できます。ゾーンを指定しない場合、 Managed Service for Apache Spark 自動ゾーン プレースメント 機能は、指定されたリージョンのあるゾーンを選択します。
- オプションの
--image-version: このチュートリアルでは、Managed Service for Apache Spark イメージ バージョン2.1-debian11を使用することをおすすめします。注: 各画像モードには、このチュートリアルで使用される Hive コンポーネントなど、一連のプリインストール コンポーネントが含まれています( サポートされている Managed Service for Apache Spark 画像モードを参照)。--num-master:3個のマスターノードが HA クラスタを作成します。 Kafka に必要な Zookeeper コンポーネントは HA クラスタにプリインストールされています。--enable-component-gateway: Managed Service for Apache Spark コンポーネント ゲートウェイを有効にします。- BUCKET_NAME:
/scripts/kafka.sh初期化スクリプトを含む Cloud Storage バケットの名前(Kafka インストール スクリプトを Cloud Storage にコピーするを参照)。
Kafka custdata トピックを作成する
Managed Service for Apache Spark Kafka クラスタに Kafka トピックを作成するには:
SSH ユーティリティを使用して、クラスタ マスター VM でターミナル ウィンドウを開きます。
Kafka
custdataトピックを作成します。/usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
注:
KAFKA_CLUSTER: Kafka クラスタの名前を挿入します。
-w-0:9092は、worker-0ノードのポート9092で実行されている Kafka ブローカーを表します。custdataトピックの作成後は、次のコマンドを実行できます。# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Kafka custdata トピックにコンテンツを公開する
次のスクリプトでは、kafka-console-producer.sh Kafka ツールを使用して架空の顧客データを CSV 形式で生成します。
スクリプトをコピーして、Kafka クラスタのマスターノードの SSH ターミナルに貼り付けます。<return> を押してスクリプトを実行します。
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"注:
- KAFKA_CLUSTER: Kafka クラスタの名前。
次の Kafka コマンドを実行して、
custdataトピックに 10,000 件のメッセージが含まれていることを確認します。/usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
注:
- KAFKA_CLUSTER: Kafka クラスタの名前。
予想される出力:
custdata:0:10000
Cloud Storage に Hive テーブルを作成する
ストリーミングされた Kafka トピックデータを受信する Hive テーブルを作成します。次の手順を実行して、Cloud Storage バケットに cust_parquet(Parquet)と cust_orc(ORC)の Hive テーブルを作成します。
BUCKET_NAME を次のスクリプトに挿入し、スクリプトをコピーして Kafka クラスタ マスターノードの SSH ターミナルに貼り付けてから、Enter キーを押して
~/hivetables.hql(Hive クエリ言語)スクリプトを作成します。次の手順で
~/hivetables.hqlスクリプトを実行して、Cloud Storage バケットに Parquet と ORC 形式の Hive テーブルを作成します。cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
Kafka クラスタのマスターノードの SSH ターミナルで、
~/hivetables.hqlHive ジョブを送信して、Cloud Storage バケットにcust_parquet(Parquet)とcust_orc(ORC)の Hive テーブルを作成します。gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
注:
- Hive コンポーネントは Managed Service for Apache Spark Kafka クラスタにプリインストールされています。最近リリースされた 2.1 イメージに含まれる Hive コンポーネント バージョンの一覧については、2.1.x リリース バージョン をご覧ください。
- KAFKA_CLUSTER: Kafka クラスタの名前。
- REGION: Kafka クラスタが配置されているリージョン。
Kafka custdata を Hive テーブルにストリーミングする
- Kafka クラスタのマスターノードの SSH ターミナルで次のコマンドを実行して、
kafka-pythonライブラリをインストールします。Kafka クライアントは、Kafka トピックデータを Cloud Storage にストリーミングするのに必要です。pip install kafka-python
BUCKET_NAME を挿入し、次の PySpark コードをコピーして Kafka クラスタ マスターノードの SSH ターミナルに貼り付けてから、Enter キーを押して
streamdata.pyファイルを作成します。このスクリプトは、Kafka
custdataトピックをサブスクライブし、データを Cloud Storage の Hive テーブルにストリーミングします。出力形式(Parquet または ORC)は、パラメータとしてスクリプトに渡されます。cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOFKafka クラスタのマスターノードの SSH ターミナルで
spark-submitを実行して、Cloud Storage の Hive テーブルにデータをストリーミングします。KAFKA_CLUSTER の名前と出力 FORMAT を挿入し、次のコードをコピーして Kafka クラスタのマスターノードの SSH ターミナルに貼り付け、Enter キーを押してコードを実行し、Kafka
custdataデータを Parquet 形式で Cloud Storage の Hive テーブルにストリーミングします。spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT注:
- KAFKA_CLUSTER: Kafka クラスタの名前を挿入します。
- FORMAT: 出力形式として
parquetまたはorcを指定します。コマンドを連続して実行して、両方の形式を Hive テーブルにストリーミングできます。たとえば、最初の呼び出しではparquetを指定して Kafkacustdataトピックを Hive Parquet テーブルにストリーミングし、2 番目の呼び出しではorc形式を指定してcustdataを Hive ORC テーブルにストリーミングします。
標準出力が SSH ターミナルで停止したら(これは、すべての
custdataがストリーミングされたことを示します)、SSH ターミナルで Ctrl+C キーを押してプロセスを停止します。Cloud Storage 内の Hive テーブルを一覧表示します。
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
注:
- BUCKET_NAME: Hive テーブルを含む Cloud Storage バケットの名前を挿入します(Hive テーブルを作成するを参照)。
ストリーミング データをクエリする
Kafka クラスタのマスターノードの SSH ターミナルで、次の
hiveコマンドを実行して、Cloud Storage の Hive テーブルにストリーミングされた Kafkacustdataメッセージをカウントします。hive -e "select count(1) from TABLE_NAME"
注:
- TABLE_NAME: Hive テーブル名として
cust_parquetまたはcust_orcを指定します。
想定される出力スニペット:
- TABLE_NAME: Hive テーブル名として
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
クリーンアップ
プロジェクトを削除する
プロジェクトを削除します。 Google Cloud
gcloud projects delete PROJECT_ID
リソースの削除
-
バケットを削除します。
gcloud storage buckets delete BUCKET_NAME
- Kafka クラスタを削除します。
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}