Apache Kafka è una piattaforma di streaming distribuita open source per pipeline di dati e integrazione dei dati in tempo reale. Fornisce un sistema di streaming efficiente e scalabile per l'utilizzo in una varietà di applicazioni, tra cui:
- Analisi in tempo reale
- Elaborazione dei flussi
- Aggregazione dei log
- Messaggistica distribuita
- Streaming di eventi
Passaggi del tutorial
Per creare un cluster Dataproc Kafka per leggere un argomento Kafka in Cloud Storage in formato Parquet o ORC, segui questi passaggi.
Copia lo script di installazione di Kafka in Cloud Storage
Lo script dell'kafka.sh azione di inizializzazione
installa Kafka su un cluster Dataproc.
Sfoglia il codice.
Copia lo script
kafka.shdell'azione di inizializzazione nel bucket Cloud Storage. Questo script installa Kafka su un cluster Dataproc.Apri Cloud Shell ed esegui questo comando:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
Effettua le seguenti sostituzioni:
- REGION:
kafka.shè archiviato in bucket pubblici con tag regionali in Cloud Storage. Specifica una regione di Compute Engine geograficamente vicina (ad esempious-central1). - BUCKET_NAME: il nome del bucket Cloud Storage.
- REGION:
Crea un cluster Dataproc Kafka
Apri Cloud Shell, quindi esegui il seguente comando
gcloud dataproc clusters createper creare un cluster HA Dataproc che installa i componenti Kafka e ZooKeeper:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
Note:
- KAFKA_CLUSTER: il nome del cluster, che deve essere univoco all'interno di un progetto. Il nome deve iniziare con una lettera minuscola e può contenere fino a 51 lettere minuscole, numeri e trattini. Non può terminare con un trattino. Il nome di un cluster eliminato può essere riutilizzato.
- PROJECT_ID: il progetto da associare a questo cluster.
- REGION: la
regione di Compute Engine
in cui si troverà il cluster, ad esempio
us-central1.- Puoi aggiungere il flag facoltativo
--zone=ZONEper specificare una zona all'interno della regione specificata, ad esempious-central1-a. Se non specifichi una zona, la funzionalità di posizionamento automatico delle zone di Dataproc seleziona una zona con la regione specificata.
- Puoi aggiungere il flag facoltativo
--image-version: per questo tutorial è consigliata la versione immagine di Dataproc2.1-debian11. Nota: ogni versione dell'immagine contiene un insieme di componenti preinstallati, incluso il componente Hive utilizzato in questo tutorial (vedi Versioni delle immagini Dataproc supportate).--num-master: i nodi master3creano un cluster HA. Il componente Zookeeper, richiesto da Kafka, è preinstallato su un cluster HA.--enable-component-gateway: attiva il gateway dei componenti Dataproc.- BUCKET_NAME: il nome del bucket Cloud Storage
che contiene lo
/scripts/kafka.shscript di inizializzazione (vedi Copia lo script di installazione di Kafka in Cloud Storage).
Crea un argomento Kafka custdata
Per creare un argomento Kafka sul cluster Kafka Dataproc:
Utilizza l'utilità SSH per aprire una finestra del terminale sulla VM master del cluster.
Crea un argomento Kafka
custdata./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
Note:
KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka.
-w-0:9092indica il broker Kafka in esecuzione sulla porta9092sul nodoworker-0.Dopo aver creato l'argomento
custdata, puoi eseguire i seguenti comandi:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Pubblica contenuti nell'argomento Kafka custdata
Il seguente script utilizza lo strumento kafka-console-producer.sh Kafka per
generare dati fittizi dei clienti in formato CSV.
Copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka. Premi <return> per eseguire lo script.
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"Note:
- KAFKA_CLUSTER: il nome del cluster Kafka.
Esegui questo comando Kafka per verificare che l'argomento
custdatacontenga 10.000 messaggi./usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
Note:
- KAFKA_CLUSTER: il nome del cluster Kafka.
Output previsto:
custdata:0:10000
Crea tabelle Hive in Cloud Storage
Crea tabelle Hive per ricevere i dati degli argomenti Kafka in streaming.
Esegui i seguenti passaggi per creare tabelle Hive cust_parquet (parquet) e
cust_orc (ORC) nel bucket Cloud Storage.
Inserisci il tuo BUCKET_NAME nel seguente script, quindi copia e incolla lo script nel terminale SSH sul nodo master del cluster Kafka, poi premi <return> per creare uno script
~/hivetables.hql(Hive Query Language).Nel passaggio successivo eseguirai lo script
~/hivetables.hqlper creare tabelle Hive parquet e ORC nel bucket Cloud Storage.cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
Nel terminale SSH del nodo master del cluster Kafka, invia il job Hive
~/hivetables.hqlper creare tabelle Hivecust_parquet(parquet) ecust_orc(ORC) nel bucket Cloud Storage.gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
Note:
- Il componente Hive è preinstallato sul cluster Dataproc Kafka. Consulta le versioni release 2.1.x per un elenco delle versioni dei componenti Hive incluse nelle immagini 2.1 rilasciate di recente.
- KAFKA_CLUSTER: il nome del cluster Kafka.
- REGION: la regione in cui si trova il cluster Kafka.
Trasmettere in streaming Kafka custdata alle tabelle Hive
- Esegui questo comando nel terminale SSH sul nodo master del cluster Kafka per installare la libreria
kafka-python. Per trasmettere in streaming i dati degli argomenti Kafka a Cloud Storage è necessario un client Kafka.pip install kafka-python
Inserisci il tuo BUCKET_NAME, quindi copia e incolla il seguente codice PySpark nel terminale SSH sul nodo master del cluster Kafka e premi <return> per creare un file
streamdata.py.Lo script si iscrive all'argomento Kafka
custdata, quindi trasmette i dati alle tabelle Hive in Cloud Storage. Il formato di output, che può essere Parquet o ORC, viene passato allo script come parametro.cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOFNel terminale SSH sul nodo master del cluster Kafka, esegui
spark-submitper trasmettere i dati in streaming alle tabelle Hive in Cloud Storage.Inserisci il nome del tuo KAFKA_CLUSTER e dell'output FORMAT, quindi copia e incolla il seguente codice nel terminale SSH sul nodo master del tuo cluster Kafka, quindi premi <return> per eseguire il codice e trasmettere in streaming i dati
custdatadi Kafka in formato Parquet alle tue tabelle Hive in Cloud Storage.spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMATNote:
- KAFKA_CLUSTER: inserisci il nome del tuo cluster Kafka.
- FORMAT: specifica
parquetoorccome formato di output. Puoi eseguire il comando in successione per trasmettere in streaming entrambi i formati alle tabelle Hive: ad esempio, nella prima chiamata, specificaparquetper trasmettere in streaming l'argomento Kafkacustdataalla tabella parquet Hive; poi, nella seconda chiamata, specifica il formatoorcper trasmettere in streamingcustdataalla tabella Hive ORC.
Quando l'output standard si interrompe nel terminale SSH, il che indica che tutto il
custdataè stato trasmesso in streaming, premi <control-c> nel terminale SSH per interrompere il processo.Elenca le tabelle Hive in Cloud Storage.
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
Note:
- BUCKET_NAME: inserisci il nome del bucket Cloud Storage che contiene le tabelle Hive (vedi Creare tabelle Hive).
Eseguire query sui dati in streaming
Nel terminale SSH sul nodo master del cluster Kafka, esegui il seguente comando
hiveper conteggiare i messaggicustdataKafka trasmessi in streaming nelle tabelle Hive in Cloud Storage.hive -e "select count(1) from TABLE_NAME"
Note:
- TABLE_NAME: specifica
cust_parquetocust_orccome nome della tabella Hive.
Snippet dell'output previsto:
- TABLE_NAME: specifica
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)