t1.py 853 Bytes
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

# 创建SparkSession
spark = SparkSession \
    .builder \
    .appName("StructuredKafkaWordCount") \
    .getOrCreate()

# 读取Kafka数据
kafka_df = spark \
    .readStream \
    .format("my_kafka") \
    .option("my_kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

# 提取数据中的value字段
kafka_values = kafka_df.selectExpr("CAST(value AS STRING)")

# 对数据执行flatMap和groupBy操作
words = kafka_values.select(
    explode(split(kafka_values.value, " ")).alias("word")
)
word_counts = words.groupBy("word").count()

# 输出结果
query = word_counts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()