cate_asin_detail_to_doris.py 6.33 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录

from utils.DorisHelper import DorisHelper
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from pyspark.sql import functions as F


class CateAsinDetailToDoris(object):

    def __init__(self):
        # 创建spark对象
        self.spark = SparkUtil.get_spark_session(app_name="cate_asin_detail_to_doris")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_asin_last5m_comments = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.last_5m = CommonUtil.get_month_offset('2025-01', -5)

    def read_data(self):
        # 读取流量选品的asin数据
        sql1 = f"""
        select category_id, asin, asin_total_comments, asin_comments_change, asin_launch_time, asin_launch_time_type, 
        asin_bsr_orders_change, asin_price, asin_size_type, is_alarm_brand
        from dwt_flow_asin where site_name = 'us' and date_type = 'month' and date_info = '2025-01' and category_id is not null;
        """
        self.df_asin_detail = self.spark.sql(sqlQuery=sql1).repartition(80, 'category_id').cache()
        print("当前月流量选品的基础数据:")
        self.df_asin_detail.show(10, True)

        # 读取asin前五个月的评论数据
        sql2 = f"""
        select asin, asin_total_comments
        from dwt_flow_asin where site_name = 'us' and date_type = 'month' and date_info >= '{self.last_5m}'
        and date_info < '2025-01';
        """
        self.df_asin_last5m_comments = self.spark.sql(sqlQuery=sql2).cache()
        print("asin前5个月的评论数是:")
        self.df_asin_last5m_comments.show(10, True)

    def handle_data(self):
        # 计算每个分类的平均评论数
        df_comments_avg = self.df_asin_detail.groupby(['category_id']).agg(
            F.round(F.avg("asin_total_comments")).cast("int").alias("comments_avg")
        )
        print("分类的平均评论数:")
        df_comments_avg.show(10, True)

        # 计算asin前五个月的评论数
        df_comments_last_5m = self.df_asin_last5m_comments.groupby(['asin']).agg(
            F.sum("asin_total_comments").alias("comments_last_five_month")
        )
        print("asin前5个月的评论数之和:")
        df_comments_last_5m.show(10, True)

        self.df_asin_detail = self.df_asin_detail\
            .join(df_comments_avg, 'category_id', 'left')\
            .join(df_comments_last_5m, 'asin', 'left')\
            .withColumnRenamed('asin_total_comments', 'comments')\
            .withColumnRenamed('asin_comments_change', 'comments_change')\
            .withColumnRenamed('asin_launch_time', 'launch_time')\
            .withColumnRenamed('asin_launch_time_type', 'launch_time_type')\
            .withColumnRenamed('asin_bsr_orders_change', 'bsr_orders_change')\
            .withColumnRenamed('asin_price', 'price')\
            .withColumnRenamed('asin_size_type', 'size_type')\
            .fillna({'comments_last_five_month': 0})

        self.df_asin_detail = self.df_asin_detail\
            .withColumn('comments_avg_type',
                        F.when(F.col('comments_avg') < 400, 1)
                        .when((F.col('comments_avg') < 700) & (F.col('comments_avg') >= 400), 2)
                        .when((F.col('comments_avg') < 1000) & (F.col('comments_avg') >= 700), 3)
                        .when((F.col('comments_avg') < 1400) & (F.col('comments_avg') >= 1000), 4)
                        .when((F.col('comments_avg') <= 1800) & (F.col('comments_avg') >= 1400), 5)
                        .when(F.col('comments_avg') > 1800, 6)
                        .otherwise(0)
                        )\
            .withColumn('one_year_and_rise_type',
                        F.when((F.col('launch_time_type') <= 4) & (F.col('launch_time_type') > 0) & (F.col('comments_change') >= 1) & (F.col('bsr_orders_change') >= 0.2) & (F.col('bsr_orders_change') <= 0.4), 1)
                        .when((F.col('launch_time_type') <= 4) & (F.col('launch_time_type') > 0) & (F.col('comments_change') >= 1) & (F.col('bsr_orders_change') > 0.4) & (F.col('bsr_orders_change') <= 0.6), 2)
                        .when((F.col('launch_time_type') <= 4) & (F.col('launch_time_type') > 0) & (F.col('comments_change') >= 1) & (F.col('bsr_orders_change') > 0.6), 3)
                        .otherwise(0)
                        )\
            .withColumn('half_year_and_rise_type',
                        F.when((F.col('launch_time_type') <= 3) & (F.col('launch_time_type') > 0) & (F.col('comments') > F.col('comments_last_five_month')) & (F.col('bsr_orders_change') >= 0.3) & (F.col('bsr_orders_change') <= 0.5), 1)
                        .when((F.col('launch_time_type') <= 3) & (F.col('launch_time_type') > 0) & (F.col('comments') > F.col('comments_last_five_month')) & (F.col('bsr_orders_change') > 0.5) & (F.col('bsr_orders_change') <= 0.7), 2)
                        .when((F.col('launch_time_type') <= 3) & (F.col('launch_time_type') > 0) & (F.col('comments') > F.col('comments_last_five_month')) & (F.col('bsr_orders_change') > 0.7), 3)
                        .otherwise(0)
                        )\
            .withColumn('price_right_type',
                        F.when((F.col('price') >= 15) & (F.col('price') < 20), 1)
                        .when((F.col('price') >= 20) & (F.col('price') < 30), 2)
                        .when((F.col('price') >= 30) & (F.col('price') < 40), 3)
                        .when((F.col('price') >= 40) & (F.col('price') <= 50), 4)
                        .otherwise(0)
                        )

        self.df_save = self.df_asin_detail.select(
            'category_id', 'asin', 'comments', 'comments_last_five_month', 'comments_avg', 'comments_change',
            'launch_time', 'launch_time_type', 'bsr_orders_change', 'price', 'size_type', 'is_alarm_brand',
            'comments_avg_type', 'one_year_and_rise_type', 'half_year_and_rise_type', 'price_right_type'
        )

    def save_data(self):
        columns = self.df_save.columns
        columns_str = ",".join(columns)
        DorisHelper.spark_export_with_columns(self.df_save, 'test', 'us_cate_asin_flag', columns_str)
        print('导出完成')

    def run(self):
        self.read_data()
        self.handle_data()
        self.save_data()


if __name__ == '__main__':
    obj = CateAsinDetailToDoris()
    obj.run()