kafka_test.py 11 KB
import os
import re
import sys
import traceback
from datetime import datetime

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.types import *
from yswg_utils.common_udf import udf_rank_and_category
# from ..yswg_utils.common_udf import udf_rank_and_category


class DimStAsinInfo(Templates):

    def __init__(self, site_name='us', date_type="day", date_info='2022-10-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'kafka_test'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
        self.kafka = self.create_kafka_object()
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_st_asin = self.spark.sql(f"select 1+1;")
        self.df_asin_templates = self.spark.sql("select asin_zr_counts, asin_sp_counts, asin_sb1_counts,asin_sb2_counts,asin_sb3_counts,asin_ac_counts,asin_bs_counts,asin_er_counts,asin_tr_counts from dwd_asin_measure limit 0")
        self.df_asin_counts = self.spark.sql("select asin_zr_counts, asin_sp_counts, asin_sb1_counts,asin_sb2_counts,asin_sb3_counts,asin_ac_counts,asin_bs_counts,asin_er_counts,asin_tr_counts from dwd_asin_measure limit 0")
        self.schema = self.init_schema()
        schema = StructType([
            StructField('bs_rank_str', StringType(), True),
            StructField('bs_category_str', StringType(), True),
        ])
        # self.u_rank_and_category = self.spark.udf.register("u_rank_and_category", udf_rank_and_category, schema)
        self.u_rank_and_category = self.spark.udf.register("u_rank_and_category", self.udf_rank_and_category, schema)

    @staticmethod
    def init_schema():
        schema = StructType([
            StructField("asin", StringType(), True),
            StructField("week", StringType(), True),
            StructField("title", StringType(), True),
            StructField("img_url", StringType(), True),
            StructField("rating", StringType(), True),
            StructField("total_comments", StringType(), True),
            StructField("price", FloatType(), True),
            StructField("rank", StringType(), True),
            StructField("category", StringType(), True),
            StructField("launch_time", StringType(), True),
            StructField("volume", StringType(), True),
            StructField("weight", StringType(), True),
            StructField("page_inventory", IntegerType(), True),
            StructField("buy_box_seller_type", IntegerType(), True),
            StructField("asin_vartion_list", IntegerType(), True),
            StructField("title_len", IntegerType(), True),
            StructField("img_num", IntegerType(), True),
            StructField("img_type", StringType(), True),
            StructField("activity_type", StringType(), True),
            StructField("one_two_val", StringType(), True),
            StructField("three_four_val", StringType(), True),
            StructField("eight_val", StringType(), True),
            StructField("qa_num", IntegerType(), True),
            StructField("five_star", IntegerType(), True),
            StructField("four_star", IntegerType(), True),
            StructField("three_star", IntegerType(), True),
            StructField("two_star", IntegerType(), True),
            StructField("one_star", IntegerType(), True),
            StructField("low_star", IntegerType(), True),
            StructField("together_asin", StringType(), True),
            StructField("brand", StringType(), True),
            StructField("ac_name", StringType(), True),
            StructField("material", StringType(), True),
            StructField("node_id", StringType(), True),
            StructField("data_type", IntegerType(), True),
            StructField("sp_num", StringType(), True),
            StructField("describe", StringType(), True),
            StructField("date_info", StringType(), True),
            StructField("weight_str", StringType(), True),
            StructField("package_quantity", StringType(), True),
            StructField("pattern_name", StringType(), True),
            StructField("seller_id", StringType(), True),
            StructField("variat_num", IntegerType(), True),
            StructField("site_name", StringType(), True),
            StructField("best_sellers_rank", StringType(), True),
            StructField("best_sellers_herf", StringType(), True)
        ])
        return schema

    # @staticmethod
    # def udf_rank_and_category(best_sellers_rank):
    #     # 提取到公共方法中 直接复制的
    #     return udf_rank_and_category(best_sellers_rank)

    @staticmethod
    def udf_rank_and_category(best_sellers_rank):
        pattern = r"([\d,]+) in ([\w&' ]+)"
        best_sellers_rank = re.sub(r'\(See Top 100 in .*?\)', '', str(best_sellers_rank))
        matches = re.findall(pattern, str(best_sellers_rank))

        bs_rank_str = ",".join([rank.replace(",", "") for rank, category in matches])
        bs_category_str = ",".join([category.strip().replace(",", " ") for rank, category in matches])
        return bs_rank_str, bs_category_str

    def create_kafka_object(self):
        # .option("my_kafka.bootstrap.servers", "113.100.143.162:39092") \
        kafkaStreamDF = self.spark.readStream \
            .format("my_kafka") \
            .option("my_kafka.bootstrap.servers", "192.168.10.221:9092,192.168.10.220:9092,192.168.10.210:9092") \
            .option("subscribe", f"{self.site_name}_asin_detail") \
            .option("startingOffsets", "lastest") \
            .load()
        return kafkaStreamDF

    def read_data(self):
        print("1.1 读取dim_st_asin_info表, 计算ao值")
        sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
        print("sql:", sql)
        self.df_st_asin = self.spark.sql(sql)
        self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin', 'data_type']).cache()
        self.df_st_asin.show(10, truncate=False)

        # print("1.2 读取dim_st_asin_info表, 计算ao值")
        # sql = f"select asin, asin_weight, asin_volume from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
        # print("sql:", sql)
        # self.df_st_asin = self.spark.sql(sql)
        # self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin', 'data_type']).cache()
        # self.df_st_asin.show(10, truncate=False)

    def handle_data(self):
        # 计算asin的ao值
        self.df_asin_counts = self.handle_st_asin_counts()
        self.df_asin_counts = self.df_asin_counts.select("asin", "asin_ao").cache()

    def handle_asin_bs_category_rank(self, df):
        df = df.withColumn(
            'bs_str', self.u_rank_and_category('best_sellers_rank')
        )
        df = df.withColumn('bs_rank_str', df.bs_str.getField('bs_rank_str')) \
            .withColumn('bs_category_str', df.bs_str.getField('bs_category_str')) \
            .drop('bs_str')
        df.show(10, truncate=False)
        return df

    def handle_st_asin_counts(self):
        self.df_st_asin = self.df_st_asin.withColumn(
            f"asin_data_type",
            F.concat(F.lit(f"asin_"), self.df_st_asin.data_type, F.lit(f"_counts"))
        )
        df_asin_counts = self.df_st_asin.groupby([f'asin']). \
            pivot(f"asin_data_type").count()

        df_asin_counts = self.df_asin_templates.unionByName(df_asin_counts, allowMissingColumns=True)  # 防止爬虫数据没有导致程序运行出错
        df_asin_counts = df_asin_counts.fillna(0)
        # df.show(10, truncate=False)
        df_asin_counts = df_asin_counts.withColumn(
            f"asin_sb_counts",
            df_asin_counts[f"asin_sb1_counts"] + df_asin_counts[f"asin_sb2_counts"] + df_asin_counts[f"asin_sb3_counts"]
        )
        df_asin_counts = df_asin_counts.withColumn(
            f"asin_adv_counts",
            df_asin_counts[f"asin_sb_counts"] + df_asin_counts[f"asin_sp_counts"]
        )
        df_asin_counts = df_asin_counts.withColumn(
            f"asin_ao",
            df_asin_counts[f"asin_adv_counts"] / df_asin_counts[f"asin_zr_counts"]
        )  # 不要把null置为0, null值产生原因是zr类型没有搜到对应的搜索词
        df_asin_counts.show(10, truncate=False)
        return df_asin_counts

    def process_batch(self, df, epoch_id):
        try:
            print("df.count():", df.count())
            # df.show(5, truncate=False)
            # 确保schema非空以避免NoneType错误
            if not self.schema:
                raise ValueError("Schema is not defined")
            df = df.withColumn("parsed_value", F.from_json(F.col("value").cast("string"), self.schema)) \
                .selectExpr("parsed_value.*")
            # df.show(5, truncate=False)
            # print("df.columns:", df.columns)
            df = df.select("asin", "launch_time", "volume", "weight", "weight_str", "variat_num", "best_sellers_rank", "best_sellers_herf", "site_name")
            df.show(5, truncate=False)
            # # 提取排名和分类
            df_bs = self.handle_asin_bs_category_rank(df=df.select("asin", "best_sellers_rank"))
            # join
            # df_bs = df_bs.join(self.df_asin_counts, on='asin', how='left')
            # df_bs.show(5, truncate=False)
            df_save = df.join(
                df_bs, on='asin', how='left'
            ).join(
                self.df_asin_counts, on='asin', how='left'
            )
            df_save.show(5, truncate=False)
        except Exception as e:
            print(e, traceback.format_exc())

        # # 与从Kafka读取的数据进行连接
        # joined_df = df.join(self.df_asin_title, "asin", how='left')
        # # 执行你的转换和聚合逻辑
        # result_df = joined_df.groupBy("asin").count()
        # result_df.show(10, truncate=False)
        print("epoch_id:", epoch_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def run(self):
        self.read_data()
        self.handle_data()
        # 将消息值转换为字符串,并创建一个临时视图
        stringifiedDF = self.kafka.selectExpr("CAST(value AS STRING)")
        stringifiedDF.createOrReplaceTempView("KafkaData")
        # 设置streaming查询,每5分钟触发一次
        query = stringifiedDF.writeStream.foreachBatch(self.process_batch).trigger(processingTime='600 seconds').start()
        # 等待查询终止
        query.awaitTermination()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter/day
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    handle_obj = DimStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()