dwd_feedback.py 16.7 KB
"""
   @Author      : HuangJian
   @Description : 店铺feedBack页面top20相关指标统计
   @SourceTable :
                  ①ods_seller_account_syn
                  ②ods_seller_asin_account
                  ③ods_seller_account_feedback_report
                  ④ods_asin_detail_product
                  ⑤dim_asin_history_info
   @SinkTable   :
                  ①dwd_seller_asin_account_agg
                  ②dwd_seller_asin_account_detail
   @CreateTime  : 2022/11/2 9:56
   @UpdateTime  : 2022/11/2 9:56
"""

import datetime
import traceback
import os
import sys
from datetime import date, timedelta

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.sql.functions import ceil
from pyspark.sql.types import IntegerType
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from yswg_utils.common_udf import udf_get_package_quantity


class DwdFeedBack(Templates):
    def __init__(self, site_name='us', date_type="week", date_info='2022-40'):
        super().__init__()
        self.db_save = "feedback_week"
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}:{self.site_name}_{self.date_type}_{self.date_info}")
        self.df_date = self.get_year_week_tuple()
        self.month = self.get_month_by_week()
        self.week_date = self.get_calDay_by_dateInfo()
        self.month_old = int(self.month)
        self.ym = f"{self.year}_{self.month_old}"
        self.partitions_by = ['site_name', 'date_type', 'date_info']

        # 自定义udf函数相关对象
        self.u_launch_time = self.spark.udf.register("u_launch_time", self.udf_launch_time, IntegerType())
        self.u_days_diff = self.spark.udf.register("u_days_diff", self.udf_days_diff, IntegerType())
        self.u_judge_package_quantity = F.udf(udf_get_package_quantity,IntegerType())
        # 初始化全局变量df--ods获取数据的原始df
        self.df_seller_acount_syn = self.spark.sql("select 1+1;")
        self.df_seller_asin_acount = self.spark.sql("select 1+1;")
        self.df_seller_account_feedback_report = self.spark.sql("select 1+1;")
        self.df_asin_detail_product = self.spark.sql("select 1+1;")
        self.df_asin_detail = self.spark.sql("select 1+1;")
        # 初始化全局变量df--dwd层转换输出的df
        self.df_seller = self.spark.sql("select 1+1;")
        self.df_seller_agg = self.spark.sql("select 1+1;")
        self.df_seller_detail = self.spark.sql("select 1+1;")
        self.df_asin_parent = self.spark.sql("select 1+1;")

        # 初始化全局变量df -- 中间过程使用
        self.df_seller_top = self.spark.sql("select 1+1;")
        self.df_asin_counts = self.spark.sql("select 1+1;")
        self.df_asin_new_counts = self.spark.sql("select 1+1;")
        self.df_variat_ratio = self.spark.sql("select 1+1;")

    @staticmethod
    def udf_launch_time(launch_time, cal_day):
        # 针对launch_time字段进行计算与当前日期的间隔天数
        if "-" in str(launch_time):
            # print(DwdFeedBack.week_date)
            asin_date_list = str(launch_time).split("-")
            try:
                asin_date = datetime.date(year=int(asin_date_list[0]),
                                          month=int(asin_date_list[1]),
                                          day=int(asin_date_list[2]))
                if not cal_day.strip():
                    week_date = '2022-11-02'
                else:
                    week_date = cal_day
                cur_date_list = str(week_date).split("-")
                cur_date = datetime.date(year=int(cur_date_list[0]),
                                         month=int(cur_date_list[1]),
                                         day=int(cur_date_list[2]))
                days_diff = (cur_date - asin_date).days
            except Exception as e:
                print(e, traceback.format_exc())
                print(launch_time, asin_date_list)
                days_diff = 999999
        else:
            days_diff = 999999
        return days_diff

    @staticmethod
    def udf_days_diff(days_diff):
        # 针对days_diff字段进行计算180天,判断是否为新品
        if days_diff <= 180:
            return 1
        elif days_diff == 999999:
            return None
        else:
            return 0

    def get_month_by_week(self):
        if self.date_type == "week":
            df = self.df_date.loc[self.df_date.year_week == self.year_week]
            month = list(df.month)[0]
            if int(month) < 10:
                month = "0" + str(month)
            print("month:", month)
            return str(month)

    def get_calDay_by_dateInfo(self):
        if self.date_type in ['day', 'last30day']:
            return str(self.date_info)
        # 如果为 周、月则取该周、月的最后一日,作为新品计算基准日
        if self.date_type in ['week', 'month']:
            self.df_date = self.spark.sql(f"select * from dim_date_20_to_30;")
            df = self.df_date.toPandas()
            df_loc = df.loc[df[f'year_{self.date_type}'] == f"{self.date_info}"]
            self.date_info_tuple = tuple(df_loc.date)
            # week_date_info_tuple = tuple(df_loc.date)
            # last_index = len(week_date_info_tuple)
            # print("self.cal_day:", str(tuple(df_loc.date)[last_index - 1]))
            # # 判断长度,取最后一日
            # return str(tuple(df_loc.date)[last_index - 1])
            # 取周第一天、月的第一天
            print("self.cal_day:", str(list(df_loc.date)[0]))
            return str(list(df_loc.date)[0])

    # 1.获取原始数据
    def read_data(self):
        # 获取ods_seller相关原始表
        print("获取 ods_seller_account_syn")
        sql = f"select id as account_id, account_name, {self.week} as week from ods_seller_account_syn " \
              f"where site_name='{self.site_name}' "
        self.df_seller_acount_syn = self.spark.sql(sqlQuery=sql)
        print(sql)
        # print("self.df_seller_acount_syn:", self.df_seller_acount_syn.show(10, truncate=False))

        print("获取 ods_seller_asin_account")
        sql = f"select account_name, asin from ods_seller_asin_account " \
              f"where site_name = '{self.site_name}' group by account_name, asin"
        self.df_seller_asin_acount = self.spark.sql(sqlQuery=sql)
        print(sql)
        # print("self.df_seller_asin_acount:", self.df_seller_asin_acount.show(10, truncate=False))

        print("获取 ods_seller_account_feedback_report")
        sql = f"select account_id,num as asin_counts from ods_seller_account_feedback_report  " \
              f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}'"
        self.df_seller_account_feedback_report = self.spark.sql(sqlQuery=sql)
        print(sql)
        # print("self.df_seller_account_feedback_report", self.df_seller_account_feedback_report.show(10, truncate=False))

        # 获取ods_asin相关原始表
        print("获取 ods_asin_detail_product")
        sql = f"select account_id, asin, price, rating, total_comments, {self.week} as week, row_num, created_at " \
              f"from ods_asin_detail_product where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}'"
        self.df_asin_detail_product = self.spark.sql(sqlQuery=sql)
        print(sql)
        # print("self.df_asin_detail_product1:", self.df_asin_detail_product.show(10, truncate=False))
        self.df_asin_detail_product = self.df_asin_detail_product.sort(['account_id', "row_num", "created_at"],
                                                                       ascending=[True, True, False])
        self.df_asin_detail_product = self.df_asin_detail_product.dropDuplicates(['account_id', "row_num"])
        #print("self.df_asin_detail_product2:", self.df_asin_detail_product.show(10, truncate=False))

        print("获取 dim_asin_history_info")
        sql = f"select asin, asin_title,asin_launch_time as launch_time from dim_cal_asin_history_detail " \
              f"where site_name='{self.site_name}'"
        self.df_asin_detail = self.spark.sql(sqlQuery=sql)
        print(sql)
        # print("self.df_asin_detail:", self.df_asin_detail.show(10, truncate=False))

        print("获取 dim_asin_variation_info")
        sql = f"select asin,parent_asin from dim_asin_variation_info " \
              f"where site_name='{self.site_name}'" \
              f" and asin != parent_asin "
        self.df_asin_parent = self.spark.sql(sqlQuery=sql)
        print(sql)
        # print("self.df_asin_parent:", self.df_asin_parent.show(10, truncate=False))

    def handle_data(self):
        self.handle_asin_top20_avg()
        self.handle_asin_count()
        self.handle_save_date()

    # 2.1处理top20产品的平均指标值-按account聚合统计
    def handle_asin_top20_avg(self):
        print("处理asin_detail_product的top20指标逻辑")
        self.df_seller_top = self.df_asin_detail_product.filter("row_num<=20")
        self.df_seller_top = self.df_seller_top.groupby('account_id').agg(
            F.avg('price').alias('top_20_avg_price'),
            F.avg('rating').alias('top_20_avg_rating'),
            F.avg('total_comments').alias('top_20_avg_total_comments'),
        )
        self.df_seller_top = self.df_seller_top.withColumn("top_20_avg_total_comments",
                                                           ceil(self.df_seller_top.top_20_avg_total_comments))
        # print("df_seller_top:", self.df_seller_top.show(10, truncate=False))

    # 2.2 计算asin_count和asin_new_count逻辑
    def handle_asin_count(self):
        print("处理df_seller_account相关数据逻辑")
        # 让 df_seller_acount_syn 与 df_seller_asin_acount 和 df_seller_account_feedback_report 关联得到具体明细
        self.df_seller = self.df_seller_acount_syn. \
            join(self.df_seller_asin_acount, on='account_name', how='inner'). \
            join(self.df_asin_detail, on='asin', how='left')

        # 标记是否新品标签
        self.df_seller = self.df_seller.withColumn("days_diff", self.u_launch_time(
            self.df_seller.launch_time, F.lit(self.week_date)))

        # 通过days_diff走自定义udf,生成is_asin_new字段(是否asin新品标记)
        self.df_seller = self.df_seller.withColumn("is_asin_new", self.u_days_diff(
            self.df_seller.days_diff))

        # 做缓存
        self.df_seller = self.df_seller.cache()

        # 计算店铺-asin的打包数量
        self.df_seller = self.df_seller.withColumn('asin_package_quantity', self.u_judge_package_quantity(F.col('asin_title')))
        # 符合打包数量>=2的商品数标识
        self.df_seller = self.df_seller.withColumn('is_pq_flag', F.when(F.col('asin_package_quantity') >= 2, F.lit(1)))

        # 计算asin_counts_exists与asin_new_counts指标
        self.df_asin_counts = self.df_seller.groupby(['account_id', 'account_name']).agg(
            F.count('asin').alias('asin_counts_exists'),
            F.sum('is_asin_new').alias('asin_new_counts'),
            F.sum('is_pq_flag').alias('fb_package_quantity_num')
        )

        # 关联report表,获取店铺的商品数量
        self.df_asin_counts = self.df_asin_counts\
            .join(self.df_seller_account_feedback_report, on='account_id', how='left')

        # 计算 counts_new_rate asin新品比率
        self.df_asin_counts = self.df_asin_counts.withColumn('counts_new_rate',
                                                             F.round(F.col('asin_new_counts')/F.col('asin_counts_exists'), 3))

        # 计算店铺打包数量的比例
        self.df_asin_counts = self.df_asin_counts.withColumn('fb_package_quantity_prop',
                                                             F.round(F.col('fb_package_quantity_num') / F.col('asin_counts_exists'),
                                                                     3))

        # 计算店铺多数量占比 有变体asin数量/(有变体asin数量+单产品asin数量) 逻辑实现
        self.df_variat_ratio = self.df_seller.join(self.df_asin_parent, on='asin', how='left')

        # 没有匹配上asin_parent的给他附上自己的asin值:1.可能是父asin最顶端; 2.可能是asin单产品
        self.df_variat_ratio = self.df_variat_ratio.withColumn('parent_asin_new',
                                                               F.when(F.col('parent_asin').isNull(), F.col('asin'))
                                                               .otherwise(F.col('parent_asin')))

        # 按照account_name、asin_parent分组统计数量
        self.df_variat_ratio = self.df_variat_ratio.groupby(['account_id', 'account_name', 'parent_asin_new'])\
            .agg(
            F.count('asin').alias('asin_son_count')
        )

        # 打上多变体标签
        self.df_variat_ratio = self.df_variat_ratio.withColumn('is_variat_flag',
                                                               F.when(F.col('asin_son_count') > 1, F.lit(1)))

        # 按照account_name分组,得出分子分母
        self.df_variat_ratio = self.df_variat_ratio.groupby(['account_id', 'account_name'])\
            .agg(
            F.sum('is_variat_flag').alias('variat_num'),
            F.count('parent_asin_new').alias('total_asin_num'))

        self.df_variat_ratio = self.df_variat_ratio.na.fill({'variat_num': 0, 'total_asin_num': 0})
        ## 计算店铺多数量占比 有变体asin数量/(有变体asin数量+单产品asin数量)
        self.df_variat_ratio = self.df_variat_ratio.withColumn('fb_variat_prop',
                                                               F.round(F.col('variat_num')/F.col('total_asin_num'), 3))

        self.df_variat_ratio = self.df_variat_ratio.drop('account_name')

    # 2.4 指标整合逻辑
    def handle_save_date(self):
        # seller_detail
        self.df_seller = self.df_seller\
            .select(
            F.col('account_id'),
            F.col('account_name'),
            F.col('asin'),
            F.col('launch_time'),
            F.col('days_diff'),
            F.col('is_asin_new'),
            # 遗留的无用字段
            F.lit(0).alias('asin_counts'),
            F.lit(0).alias('asin_new_counts'),
            F.lit(0).alias('counts_new_rate'),
            F.lit(0).alias('asin_counts_exists'),

            # 分区字段补全
            F.lit(self.week).cast('int').alias('week'),
            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_type).alias("date_type"),
            F.lit(self.date_info).alias("date_info")
                    )

        # 关联top20avg相关计算指标以及计算店铺多数量占比计算指标
        self.df_seller_agg = self.df_asin_counts\
            .join(self.df_seller_top, on='account_id', how='left')\
            .join(self.df_variat_ratio, on='account_id', how='left')

        self.df_seller_agg = self.df_seller_agg.select(
            F.col('account_id'),
            F.col('account_name'),
            F.col('asin_new_counts'),
            F.col('asin_counts'),
            F.col('counts_new_rate'),
            F.col('top_20_avg_price'),
            F.col('top_20_avg_rating'),
            F.col('top_20_avg_total_comments'),
            F.col('asin_counts_exists'),
            F.col('variat_num').alias('fb_variat_num'),
            F.col('total_asin_num').alias('fb_asin_total'),
            F.col('fb_variat_prop'),
            F.col('fb_package_quantity_prop'),
            F.col('fb_package_quantity_num'),
            # 分区字段补全
            F.lit(self.week).cast('int').alias('week'),
            F.lit(self.ym).alias('ym'),
            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_type).alias("date_type"),
            F.lit(self.date_info).alias("date_info")
        )


    # 重写数据写入方法,对应两张目标表
    def save_data(self):
        self.reset_partitions(partitions_num=1)
        self.save_data_common(
            df_save=self.df_seller_agg,
            db_save='dwd_seller_asin_account_agg',
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )

        self.reset_partitions(partitions_num=10)
        self.save_data_common(
            df_save=self.df_seller,
            db_save='dwd_seller_asin_account_detail',
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwdFeedBack(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()