from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split # 创建SparkSession spark = SparkSession \ .builder \ .appName("StructuredKafkaWordCount") \ .getOrCreate() # 读取Kafka数据 kafka_df = spark \ .readStream \ .format("my_kafka") \ .option("my_kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "test") \ .load() # 提取数据中的value字段 kafka_values = kafka_df.selectExpr("CAST(value AS STRING)") # 对数据执行flatMap和groupBy操作 words = kafka_values.select( explode(split(kafka_values.value, " ")).alias("word") ) word_counts = words.groupBy("word").count() # 输出结果 query = word_counts \ .writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination()