"""
   @Author      : HuangJian
   @SourceTable :
                  ①ods_seller_account_syn
                  ②ods_seller_asin_account
                  ③ods_seller_account_feedback
   @SinkTable   :
                  ①dim_fd_asin_info
   @CreateTime  : 2022/12/19 9:56
   @UpdateTime  : 2022/12/19 9:56
"""

import os
import sys
import re
from functools import reduce



sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from pyspark.sql.types import StringType, IntegerType, DoubleType
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil


class DwtFdAsinInfo(object):
    def __init__(self, site_name='us'):
        super().__init__()
        self.hive_tb = "dim_fd_asin_info"
        self.site_name = site_name
        self.partition_dict = {
            "site_name": site_name,
        }
        # 落表路径校验
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
        app_name = f"{self.hive_tb}:{self.site_name}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.partitions_num = CommonUtil.reset_partitions(self.site_name, 80)


        # 初始化全局变量df--ods获取数据的原始df
        self.df_seller_account_syn = self.spark.sql("select 1+1;")
        self.df_seller_account_feedback = self.spark.sql("select 1+1;")
        self.df_fd_asin = self.spark.sql("select 1+1;")

        # 初始化全局变量df--dwd层转换输出的df
        self.df_save = self.spark.sql(f"select 1+1;")
        # 注册自定义函数 (UDF)
        self.u_handle_seller_unique = F.udf(self.udf_handle_seller_unique, StringType())

    @staticmethod
    def udf_handle_seller_unique(fd_url):
        return fd_url.split("seller=")[-1].split('&')[0]

    # 1.获取原始数据
    def read_data(self):
        # 获取ods_seller相关原始表
        print("获取 ods_seller_account_syn")
        sql = f"select id as fd_account_id,seller_id as unique_id, account_name as fd_account_name," \
              f"lower(account_name) as fd_account_name_lower, url as fd_url, created_at, updated_at from " \
              f" ods_seller_account_syn where site_name='{self.site_name}' " \
              # f"group by id,account_name"
        self.df_seller_account_syn = self.spark.sql(sqlQuery=sql)
        print(sql)
        # 避免重复数据
        self.df_seller_account_syn = self.df_seller_account_syn.orderBy(self.df_seller_account_syn.fd_account_id.desc())
        self.df_seller_account_syn = self.df_seller_account_syn.drop_duplicates(['unique_id'])
        # print("self.df_seller_account_syn:", self.df_seller_account_syn.show(10, truncate=False))


        print("获取 ods_seller_account_feedback")
        # 抓取会有重复,因此需要全局去重,count取最大即是最近的一个数据
        sql = f"select unique_id, " \
              f"       country_name as fd_country_name " \
              f"from (select seller_id as unique_id,country_name, " \
              f"         row_number() over (partition by seller_id order by created_at desc) sort_flag " \
              f"         from ods_seller_account_feedback  " \
              f"         where site_name = '{self.site_name}'" \
              f"    ) t1 " \
              f"where sort_flag = 1;  "
        self.df_seller_account_feedback = self.spark.sql(sqlQuery=sql)
        print(sql)
        # print("self.df_seller_account_feedback", self.df_seller_account_feedback.show(10, truncate=False))

        # 获取我们内部的店铺与asin的数据库(从搜索词抓下来,店铺与asin的关系表)
        print("获取 ods_seller_asin_account")
        sql = f"""
                    select seller_id as unique_id,asin from ods_seller_asin_account 
                    where site_name='{self.site_name}'
                """
        self.df_fd_asin = self.spark.sql(sqlQuery=sql)
        self.df_fd_asin = self.df_fd_asin.drop_duplicates(['unique_id', 'asin'])
        print(sql)


    def sava_data(self):
        # 店铺+asin 通过fd_account_name 拼接获取到fd_unique
        df_save = self.df_seller_account_syn.join(self.df_seller_account_feedback, on='unique_id', how='left')

        df_save = df_save.join(
            self.df_fd_asin, on='unique_id', how='left'
        )

        df_save = df_save.select(
            F.col('fd_account_id'),
            F.col('unique_id').alias('fd_unique'),
            F.col('fd_account_name'),
            F.col('fd_account_name_lower'),
            F.col('fd_country_name'),
            F.col('fd_url'),
            F.col('asin'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_at'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_at'),
            F.lit(self.site_name).alias('site_name'),
        )

        print(f"清除hdfs目录中:{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)
        df_save = df_save.repartition(self.partitions_num)
        partition_by = ["site_name"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")

    def run(self):
        self.read_data()
        self.sava_data()

if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    # date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    # date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtFdAsinInfo(site_name=site_name)
    handle_obj.run()