"""
   @Author      : HuangJian
   @Description : 店铺基础信息报表
   @SourceTable :
                  ①ods_seller_account_feedback
                  ②ods_seller_asin_account
                  ③ods_asin_detail_product
                  ④ods_seller_asin_syn
                  ⑤ods_asin_variat

   @SinkTable   : dwt_fb_base_report
   @SinkTable   : dwt_fb_asin_info
   @CreateTime  : 2022/07/05 15:48
   @UpdateTime  : 2022/07/05 15:48
"""

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil, DateTypes
from pyspark.sql.types import StringType, IntegerType, DoubleType
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_udf import udf_get_package_quantity
from yswg_utils.common_udf import udf_new_asin_flag
from utils.db_util import DBUtil


class DwtFbBaseReport(object):

    def __init__(self, site_name, date_type, date_info):
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.hive_tb = "dwt_fb_base_report"
        self.partition_dict = {
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
        # 落表路径校验
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)

        # 创建spark_session对象相关
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)

        # 获取不同维度日期下的计算日期YYYY-MM-DD
        self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info)
        self.last_month = CommonUtil.get_month_offset(date_info, -1)

        # 全局df初始化
        self.df_fb_feedback = self.spark.sql(f"select 1+1;")
        self.df_fb_asin = self.spark.sql(f"select 1+1;")
        self.df_top20_asin = self.spark.sql(f"select 1+1;")
        self.df_asin_history = self.spark.sql(f"select 1+1;")
        self.df_asin_parent = self.spark.sql(f"select 1+1;")
        self.df_fb_agg = self.spark.sql(f"select 1+1;")
        self.df_fb_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_self_seller_id = self.spark.sql(f"select 1+1;")
        self.df_seller_account = self.spark.sql(f"select 1+1;")

        # 初始化UDF函数
        self.udf_new_asin_flag = F.udf(udf_new_asin_flag, IntegerType())
        self.u_judge_package_quantity = F.udf(udf_get_package_quantity, IntegerType())

    def read_data(self):
        # ods_seller_account_feedback 月度店铺报告表主表
        print("获取 ods_seller_account_feedback")
        sql = f"""select cur_fd.seller_id,
       cur_fd.fb_web_asin_num,
       cur_fd.fb_country_name,
       cur_fd.count_30_day_num,
       cur_fd.count_1_year_num,
       cur_fd.count_lifetime_num,
       cur_fd.fb_crawl_date,
       round((count_30_day_num - last_30_day_num) / last_30_day_num, 4)       as count_30_day_rate,
       round((count_1_year_num - last_1_year_num) / last_1_year_num, 4)       as count_1_year_rate,
       round((count_lifetime_num - last_lifetime_num) / last_lifetime_num, 4) as count_life_time_rate
from (select seller_id,
             num                                            as fb_web_asin_num,
             count_30_day as count_30_day_num,      
             count_1_year as count_1_year_num,
             count_lifetime as count_lifetime_num,
             country_name                                     as fb_country_name,
             date_format(updated_at, 'yyyy-MM-dd HH:mm:ss') as fb_crawl_date
      from ods_seller_account_feedback
      where site_name = '{self.site_name}'
        and date_type = '{self.date_type}'
        and date_info = '{self.date_info}'
        and length(seller_id) > 2 ) cur_fd
         left join (select seller_id,
                           count_30_day   as last_30_day_num,
                           count_1_year   as last_1_year_num,
                           count_lifetime as last_lifetime_num
                    from ods_seller_account_feedback
                    where site_name = '{self.site_name}'
                      and date_type = '{self.date_type}'
                      and date_info = '{self.last_month}'
                      and length(seller_id) > 2 ) last_fd
                   on cur_fd.seller_id = last_fd.seller_id"""
        self.df_fb_feedback = self.spark.sql(sqlQuery=sql)
        self.df_fb_feedback = self.df_fb_feedback.drop_duplicates(['seller_id']).cache()
        print(sql)

        # 获取我们内部的店铺与asin的数据库(从搜索词抓下来,店铺与asin的关系表)
        print("获取 ods_seller_asin_account")
        sql = f"""
            select seller_id,asin from ods_seller_asin_account 
            where site_name='{self.site_name}'
             and date_format(created_at,'yyyy-MM-dd') <= '{self.cal_date}' 
        """
        self.df_fb_asin = self.spark.sql(sqlQuery=sql)
        self.df_fb_asin = self.df_fb_asin.drop_duplicates(['seller_id', 'asin'])
        print(sql)

        # 获取店铺top 20 asin信息计算
        print("获取 ods_asin_detail_product")
        sql = f"""
            select 
            seller_id,
            asin,
            price,
            rating,
            total_comments,
            row_num as rank 
            from ods_asin_detail_product
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
            and row_num <= 20
        """
        self.df_top20_asin = self.spark.sql(sqlQuery=sql)
        self.df_top20_asin = self.df_top20_asin.drop_duplicates(['seller_id', 'asin'])
        print(sql)

        # 获取dim_cal_asin_history提取launch_time用于计算是否新品
        print("获取 dim_cal_asin_history")
        sql = f"""
            select
            asin,
            asin_title,
            asin_img_url,
            asin_price,
            asin_rating,
            asin_total_comments,
            asin_weight,
            asin_volume,
            asin_launch_time
            from dim_cal_asin_history_detail
            where site_name = '{self.site_name}'
        """
        self.df_asin_history = self.spark.sql(sqlQuery=sql)
        print(sql)

        # 获取ods_asin_variat提取parent_asin用于计算是多变体
        print("获取 dim_asin_variation_info")
        sql = f"select asin,parent_asin from dim_asin_variation_info " \
              f"where site_name='{self.site_name}'" \
              f" and asin != parent_asin "
        self.df_asin_parent = self.spark.sql(sqlQuery=sql)
        print(sql)

        # 获取ods_seller_account_syn提取account_name
        print("获取 ods_seller_account_syn")
        sql = f"select seller_id,account_name,id from ods_seller_account_syn " \
              f"where site_name='{self.site_name}'"
        self.df_seller_account = self.spark.sql(sqlQuery=sql)
        # 进行去重
        self.df_seller_account = self.df_seller_account.orderBy(self.df_seller_account.id.desc())
        self.df_seller_account = self.df_seller_account.drop_duplicates(['seller_id'])
        self.df_seller_account = self.df_seller_account.drop('id')
        print(sql)

        # 获取mysql:selection.accounts ,用于排除公司内部店铺
        print("获取 selection.accounts")
        sql = f"""
        select seller_id, 1 as is_self_fb from
            (select distinct seller_id  from selection.accounts) t1
        """
        conn_info = DBUtil.get_connection_info("mysql", "us")
        self.df_self_seller_id = SparkUtil.read_jdbc_query(
            session=self.spark,
            url=conn_info["url"],
            pwd=conn_info["pwd"],
            username=conn_info["username"],
            query=sql
        )

    def handle_fb_top_20(self):
        print("处理asin_detail_product的top20指标")
        self.df_top20_asin = self.df_top20_asin. \
            join(self.df_asin_history.select('asin', 'asin_launch_time'), on='asin', how='left')

        self.df_top20_asin = self.df_top20_asin.withColumn("is_asin_new",
                                                           self.udf_new_asin_flag(F.col('asin_launch_time'),
                                                                                  F.lit(self.cal_date)))
        self.df_top20_asin = self.df_top20_asin.groupby('seller_id').agg(
            F.avg('price').alias('top_20_avg_price'),
            F.avg('rating').alias('top_20_avg_rating'),
            F.avg('total_comments').alias('top_20_avg_total_comments'),
            F.sum('is_asin_new').alias('top_20_new_asin_num')
        )

    def handle_fb_cal_agg(self):
        print("处理店铺基础报表聚合数据")
        df_fb_join = self.df_fb_feedback.select('seller_id').join(
            self.df_fb_asin, on='seller_id', how='left'
        )

        df_fb_join = df_fb_join.join(
            self.df_asin_history, on='asin', how='left'
        ).join(
            self.df_asin_parent, on='asin', how='left'
        )

        # 计算是否新品
        df_fb_join = df_fb_join.withColumn("is_asin_new",
                                           self.udf_new_asin_flag(F.col('asin_launch_time'), F.lit(self.cal_date)))

        # 计算店铺-asin的打包数量
        df_fb_join = df_fb_join.withColumn('asin_package_quantity', self.u_judge_package_quantity(F.col('asin_title')))

        # 打包数量标记 打包数量>=2的商品数标识
        df_fb_join = df_fb_join.withColumn('is_pq_flag', F.when(F.col('asin_package_quantity') >= 2, F.lit(1)))

        # 补充parent_asin-如果为null则说明没有匹配到变体表中的asin,则设定parent_asin为自己
        df_fb_join = df_fb_join.withColumn('parent_asin', F.when(F.col('parent_asin').isNull(), F.col('asin'))
                                           .otherwise(F.col('parent_asin')))
        self.df_fb_asin_detail = df_fb_join.cache()

        fb_counts_agg = self.df_fb_asin_detail.groupby(['seller_id']).agg(
            F.count('asin').alias('fb_asin_total'),
            F.sum('is_asin_new').alias('fb_new_asin_num'),
            F.sum('is_pq_flag').alias('fb_pq_num')
        )

        # 计算新品比率
        fb_counts_agg = fb_counts_agg.withColumn('fb_new_asin_rate',
                                                 F.round(F.col('fb_new_asin_num') / F.col('fb_asin_total'), 4))

        # 计算打包数量比率
        fb_counts_agg = fb_counts_agg.withColumn('fb_pq_rate',
                                                 F.round(F.col('fb_pq_num') / F.col('fb_asin_total'), 4))

        fb_counts_agg = fb_counts_agg.select('seller_id', 'fb_new_asin_num', 'fb_pq_num',
                                             'fb_asin_total', 'fb_new_asin_rate', 'fb_pq_rate')

        # 计算店铺多数量占比 有变体asin数量/(有变体asin数量+单产品asin数量) 逻辑实现
        # 计算多变体比率
        df_variant_radio = self.df_fb_asin_detail.groupby(['seller_id', 'parent_asin']).agg(
            F.count('asin').alias('asin_son_count')
        )

        # 打上多变体标签 如果asin_son_count > 1则说明该店铺该asin存在多变体
        df_variant_radio = df_variant_radio.withColumn('is_variant_flag',
                                                       F.when(F.col('asin_son_count') > 1, F.lit(1)))

        # 按照seller_id再次聚合,得出多变体计算分子分母
        df_variant_radio = df_variant_radio.groupby(['seller_id']).agg(
            F.sum('is_variant_flag').alias('fb_more_variant_num'),
            F.count('parent_asin').alias('fb_variant_asin_total')
        )

        # 得出多变体比率
        df_variant_radio = df_variant_radio.withColumn('fb_variant_rate',
                                                       F.round(F.col('fb_more_variant_num') / F.col(
                                                           'fb_variant_asin_total'), 4))

        df_variant_radio = df_variant_radio.select('seller_id',
                                                   'fb_more_variant_num',
                                                   'fb_variant_asin_total',
                                                   'fb_variant_rate')

        # 合并计算结果
        self.df_fb_agg = self.df_fb_feedback.join(
            fb_counts_agg, on='seller_id', how='left'
        ).join(
            df_variant_radio, on='seller_id', how='left'
        ).join(
            self.df_top20_asin, on='seller_id', how='left'
        )

        # 关联公司店铺df,并标记是否公司内部店铺
        self.df_fb_agg = self.df_fb_agg.join(
            self.df_self_seller_id, on='seller_id', how='left'
        )

        # 没有关联上的赋值为0,则不是公司内部店铺
        self.df_fb_agg = self.df_fb_agg.na.fill({"is_self_fb": 0})

    # 输出数据集-report
    def save_data_report(self):
        # 关联ods_seller_account_syn,带回account_name-采用inner join过滤掉库中无店铺名称的数据
        df_save = self.df_fb_agg.join(self.df_seller_account, on='seller_id', how='inner')

        df_save = df_save.select(
            F.col('seller_id'),
            F.col('account_name'),
            F.col('fb_country_name'),
            F.col('fb_web_asin_num'),
            F.col('is_self_fb'),
            F.round('top_20_avg_price', 4).alias('top_20_avg_price'),
            F.round('top_20_avg_rating', 4).alias('top_20_avg_rating'),
            F.ceil('top_20_avg_total_comments').alias('top_20_avg_total_comments'),
            F.col('top_20_new_asin_num'),
            F.col('count_30_day_num'),
            F.col('count_1_year_num'),
            F.col('count_lifetime_num'),
            F.col('count_30_day_rate'),
            F.col('count_1_year_rate'),
            F.col('count_life_time_rate'),
            F.col('fb_new_asin_num'),
            F.col('fb_asin_total'),
            F.col('fb_new_asin_rate'),
            F.col('fb_variant_rate'),
            F.col('fb_more_variant_num'),
            F.col('fb_variant_asin_total'),
            F.col('fb_pq_rate'),
            F.col('fb_pq_num'),
            F.col('fb_crawl_date'),
            F.when(F.col('fb_country_name').isNull(), F.lit(0))
                .when(F.col('fb_country_name') == self.site_name.upper(), F.lit(1))
                .when(F.col('fb_country_name') == 'CN', F.lit(2))
                .otherwise(F.lit(3)).alias('fb_country_name_type'),
            F.when(F.col('fb_asin_total').isNull(), F.lit(0))
                .when(F.col('fb_asin_total') <= 300, F.lit(1))
                .when(F.col('fb_asin_total') <= 1000, F.lit(2))
                .otherwise(F.lit(3)).alias('fb_account_type'),
            F.when(F.col('fb_asin_total').isNull(), F.lit(0))
                .when(F.col('fb_asin_total') == 0, F.lit(1))
                .when(F.col('fb_asin_total') <= 50, F.lit(2))
                .when(F.col('fb_asin_total') <= 200, F.lit(3))
                .when(F.col('fb_asin_total') <= 500, F.lit(4))
                .when(F.col('fb_asin_total') <= 1000, F.lit(5))
                .when(F.col('fb_asin_total') <= 5000, F.lit(6))
                .when(F.col('fb_asin_total') <= 10000, F.lit(7))
                .otherwise(F.lit(8)).alias('fb_asin_total_type'),
            F.when(F.col('fb_new_asin_num').isNull(), F.lit(0))
                .when(F.col('fb_new_asin_num') == 0, F.lit(1))
                .when(F.col('fb_new_asin_num') <= 5, F.lit(2))
                .when(F.col('fb_new_asin_num') <= 10, F.lit(3))
                .when(F.col('fb_new_asin_num') <= 20, F.lit(4))
                .when(F.col('fb_new_asin_num') <= 30, F.lit(5))
                .when(F.col('fb_new_asin_num') <= 50, F.lit(6))
                .when(F.col('fb_new_asin_num') <= 100, F.lit(7))
                .otherwise(F.lit(8)).alias('fb_new_asin_num_type'),
            F.when(F.col('fb_new_asin_rate').isNull(), F.lit(0))
                .when(F.col('fb_new_asin_rate') == 0, F.lit(1))
                .when(F.col('fb_new_asin_rate') <= 0.05, F.lit(2))
                .when(F.col('fb_new_asin_rate') <= 0.1, F.lit(3))
                .when(F.col('fb_new_asin_rate') <= 0.2, F.lit(4))
                .when(F.col('fb_new_asin_rate') <= 0.5, F.lit(5))
                .otherwise(F.lit(6)).alias('fb_new_asin_rate_type'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
            F.lit(None).alias('usr_mask_type'),
            F.lit(None).alias('usr_mask_progress'),
            F.lit(self.site_name).alias('site_name'),
            F.lit(self.date_type).alias('date_type'),
            F.lit(self.date_info).alias('date_info')
        )

        # CommonUtil.check_schema(self.spark, df_save, self.hive_tb)

        print(f"清除hdfs目录中:{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)

        df_save = df_save.repartition(1)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")

    # 输出数据集-asin_info
    def save_data_asin_info(self):
        # 只保留新品的详情
        self.df_fb_asin_detail = self.df_fb_asin_detail.filter('is_asin_new = 1')
        # 关联ods_seller_account_syn,带回account_name-采用inner join过滤掉库中无店铺名称的数据;
        df_save_asin = self.df_fb_asin_detail.join(self.df_seller_account, on='seller_id', how='inner')
        df_save_asin = df_save_asin.select(
            F.col('seller_id'),
            F.col('account_name'),
            F.col('asin'),
            F.col('asin_title'),
            F.col('asin_launch_time'),
            F.col('is_asin_new'),
            F.col('asin_package_quantity'),
            F.col('is_pq_flag'),
            F.col('parent_asin'),
            F.col('asin_img_url'),
            F.col('asin_price'),
            F.col('asin_rating'),
            F.col('asin_total_comments'),
            F.col('asin_weight'),
            F.col('asin_volume'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
            F.lit(self.site_name).alias('site_name'),
            F.lit(self.date_type).alias('date_type'),
            F.lit(self.date_info).alias('date_info')
        )

        hive_tb_name = 'dwt_fb_asin_info'
        hdfs_path_asin_info = CommonUtil.build_hdfs_path(hive_tb_name, partition_dict=self.partition_dict)
        print(f"清除hdfs目录中:{hdfs_path_asin_info}")
        HdfsUtils.delete_file_in_folder(hdfs_path_asin_info)

        df_save_asin = df_save_asin.repartition(50)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{hive_tb_name},分区为{partition_by}", )
        df_save_asin.write.saveAsTable(name=hive_tb_name, format='hive', mode='append', partitionBy=partition_by)
        print("success")

    def run(self):
        self.read_data()
        self.handle_fb_top_20()
        self.handle_fb_cal_agg()
        self.save_data_report()
        self.save_data_asin_info()


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    obj = DwtFbBaseReport(site_name=site_name, date_type=date_type, date_info=date_info)
    obj.run()