from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# 创建SparkSession
spark = SparkSession \
.builder \
.appName("StructuredKafkaWordCount") \
.getOrCreate()
# 读取Kafka数据
kafka_df = spark \
.readStream \
.format("my_kafka") \
.option("my_kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()
# 提取数据中的value字段
kafka_values = kafka_df.selectExpr("CAST(value AS STRING)")
# 对数据执行flatMap和groupBy操作
words = kafka_values.select(
explode(split(kafka_values.value, " ")).alias("word")
)
word_counts = words.groupBy("word").count()
# 输出结果
query = word_counts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()