import os
import sys
import re


sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.sql.types import IntegerType, DoubleType
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.storagelevel import StorageLevel
from yswg_utils.common_udf import udf_parse_amazon_orders, udf_get_package_quantity


class DwdMerchantwordsMeasure(Templates):

    def __init__(self, site_name='us', date_type='day', date_info='2024-01-01', batch='2024-0'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.batch = batch
        self.db_save = 'dwd_merchantwords_measure_v2'
        self.spark = self.create_spark_object(
            app_name=f"DwdMerchantwordsMeasure: {self.site_name}, {self.date_type}, {self.date_info}, {self.batch}")
        self.partitions_num = 5
        self.partitions_by = ['site_name', 'batch']
        self.df_merchantwords_detail = self.spark.sql(f"select 1+1;")
        self.df_products_num = self.spark.sql(f"select 1+1;")
        self.df_search_term_type = self.spark.sql(f"select 1+1;")
        self.df_self_asin = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_asin_buy_data = self.spark.sql(f"select 1+1;")
        self.df_asin_count = self.spark.sql(f"select 1+1;")
        self.df_st_count = self.spark.sql(f"select 1+1;")
        self.df_st_buy_data = self.spark.sql(f"select 1+1;")
        self.df_st_detail = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.data_type_list = ['zr', 'sp', 'sb1', 'sb2', 'sb3', 'hr', 'bs', 'ac']
        # 引入公用udf
        self.u_parse_amazon_orders = self.spark.udf.register('u_parse_amazon_orders', udf_parse_amazon_orders, IntegerType())
        self.u_get_package_quantity = self.spark.udf.register('u_get_package_quantity', udf_get_package_quantity, IntegerType())
        # 自定义udf
        self.u_parse_asin_price = self.spark.udf.register('u_parse_asin_price', self.udf_parse_asin_price, DoubleType())
        self.u_parse_asin_rating = self.spark.udf.register('u_parse_asin_rating', self.udf_parse_asin_rating, DoubleType())
        self.u_parse_asin_reviews = self.spark.udf.register('u_parse_asin_reviews', self.udf_parse_asin_reviews, IntegerType())
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.db_save}/site_name={self.site_name}/batch={self.batch}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)

    @staticmethod
    def udf_parse_asin_price(price):
        if price:
            try:
                match_us = re.search(r'\$([\d,]+(?:\.\d+)?)', price)
                if match_us:
                    number_str = match_us.group(1).replace(',', '')
                    return float(number_str)
                match_de = re.search(r'([\d.]+),(\d+)\s*€', price)
                if match_de:
                    integer_part = match_de.group(1).replace('.', '')
                    decimal_part = match_de.group(2)
                    number_str = f"{integer_part}.{decimal_part}"
                    return float(number_str)
                else:
                    return None
            except ValueError:
                return None
        return None

    @staticmethod
    def udf_parse_asin_rating(site, rating):
        """
        解析asin详情页面的rating
        """
        if rating:
            if site == 'de':
                rating = re.findall(r"(.*) von", rating)[0]
            elif site == 'fr':
                rating = re.findall(r"(.*) sur", rating)[0]
            elif site == 'it':
                rating = re.findall(r"(.*) su", rating)[0]
            elif site == 'es':
                rating = re.findall(r"(.*) de", rating)[0]
            else:
                rating = re.findall(r"(.*) out", rating)[0]
            rating = rating.replace(',', '.')
            return float(rating)
        return None

    @staticmethod
    def udf_parse_asin_reviews(reviews):
        if reviews:
            try:
                match = re.search(r'\b(\d{1,3}(?:[,.]\d{3})*)(?:\s+\w+)?\b', reviews)
                if match:
                    number_str = match.group(1).replace(',', '').replace('.', '')
                    return int(number_str)
                else:
                    return None
            except ValueError:
                return None
        else:
            return None

    def read_data(self):
        print("1.读取dwt_merchantwords_st_detail")
        sql = f"""
        select 
            keyword, 
            volume, 
            avg_3m, 
            avg_12m, 
            depth, 
            results_count, 
            sponsored_ads_count, 
            page_1_reviews, 
            appearance, 
            last_seen, 
            update_time, 
            lang, 
            last_batch 
        from dwt_merchantwords_st_detail_merge 
        where site_name = '{self.site_name}'
        and batch = '2024-1';
        """
        self.df_merchantwords_detail = self.spark.sql(sqlQuery=sql)
        self.df_merchantwords_detail = self.df_merchantwords_detail.repartition(80).persist(StorageLevel.MEMORY_ONLY)
        self.df_merchantwords_detail.show(10, truncate=True)

        print("2.读取ods_merchantwords_brand_analytics,得到产品总数")
        sql = f"""
        select 
            search_term as keyword, 
            quantity_being_sold as asin_total_num, 
            updated_time 
        from ods_merchantwords_brand_analytics 
        where site_name = '{self.site_name}'
        and date_type = '{self.date_type}'
        and date_info = '{self.date_info}';
        """
        self.df_products_num = self.spark.sql(sqlQuery=sql)
        self.df_products_num = self.df_products_num.repartition(80).persist(StorageLevel.MEMORY_ONLY)
        self.df_products_num.show(10, truncate=True)

        print("3.读取ods_merchantwords_search_term_type,得到搜索词、asin、类型")
        for data_type in self.data_type_list:
            if data_type in ['zr', 'sp']:
                sql = f"""
                select 
                    search_term as keyword, 
                    asin, 
                    page, 
                    page_row, 
                    '{data_type}' as data_type, 
                    created_time, 
                    updated_time
                from ods_merchantwords_search_term_{data_type} 
                where site_name = '{self.site_name}'
                and date_type = '{self.date_type}'
                and date_info = '{self.date_info}';
                """
                df = self.spark.sql(sqlQuery=sql)
                df = df.repartition(80)
            elif data_type in ['sb1', 'sb2', 'sb3']:
                data_type_int = int(data_type[-1])
                sql = f"""
                select 
                    search_term as keyword, 
                    asin, 
                    page, 
                    '{data_type}' as data_type, 
                    created_time, 
                    updated_time
                from ods_merchantwords_search_term_sb 
                where site_name = '{self.site_name}'
                and date_type = '{self.date_type}'
                and date_info = '{self.date_info}'
                and data_type = {data_type_int};
                """
                df = self.spark.sql(sqlQuery=sql)
                df = df.repartition(80)
            else:
                sql = f"""
                select 
                    search_term as keyword, 
                    asin, 
                    page, 
                    '{data_type}' as data_type, 
                    created_time, 
                    updated_time
                from ods_merchantwords_search_term_{data_type} 
                where site_name = '{self.site_name}'
                and date_type = '{self.date_type}'
                and date_info = '{self.date_info}';
                """
                df = self.spark.sql(sqlQuery=sql)
                df = df.repartition(80)
            self.df_search_term_type = self.df_search_term_type.unionByName(df, allowMissingColumns=True)
        self.df_search_term_type = self.df_search_term_type.persist(StorageLevel.MEMORY_AND_DISK)
        self.df_search_term_type.show(10, truncate=True)

        print("4.读取ods_self_asin,得到公司内部产品")
        sql = f"""
        select 
            asin, 
            1 as is_self_asin 
        from ods_self_asin 
        where site_name = '{self.site_name}' 
        group by asin;
        """
        df_self_asin = self.spark.sql(sqlQuery=sql)
        self.df_self_asin = F.broadcast(df_self_asin)
        self.df_self_asin.show(10, truncate=True)

        print("5.读取ods_merchantwords_asin_detail,得到asin数据")
        sql = f"""
        select 
            asin, 
            title, 
            img, 
            price, 
            rating, 
            reviews, 
            updated_time 
        from ods_merchantwords_asin_detail 
        where site_name = '{self.site_name}'
        and date_type = '{self.date_type}'
        and date_info = '{self.date_info}';
        """
        self.df_asin_detail = self.spark.sql(sqlQuery=sql)
        self.df_asin_detail = self.df_asin_detail.repartition(80).persist(StorageLevel.MEMORY_ONLY)
        self.df_asin_detail.show(10, truncate=True)

        print("6.读取ods_merchantwords_other_search_term_data,得到asin月销")
        sql = f"""
        select 
            search_term as keyword, 
            asin, 
            buy_data, 
            page, 
            created_time, 
            updated_time
        from ods_merchantwords_other_search_term_data 
        where site_name = '{self.site_name}'
        and date_type = '{self.date_type}'
        and date_info = '{self.date_info}';
        """
        self.df_asin_buy_data = self.spark.sql(sqlQuery=sql)
        self.df_asin_buy_data = self.df_asin_buy_data.repartition(80).persist(StorageLevel.MEMORY_ONLY)
        self.df_asin_buy_data.show(10, truncate=True)

    def handle_data(self):
        # 处理产品总数
        self.handle_products_num()
        # df_search_term_type去重处理
        self.handle_search_term_asin_type()
        # 处理asin维度下类型词count
        self.df_asin_count = self.handle_st_asin_counts(cal_type="asin")
        # 处理st维度下asin_count + 内部asin_count
        self.df_st_count = self.handle_st_asin_counts(cal_type="st")
        # 处理ao值和zr流量占比
        self.handle_ao_and_zr_flow_proportion()
        # 处理月销
        self.handle_monthly_sales()
        # 处理asin_detail:价格、rating等
        self.handle_asin_detail()
        # 保存前字段处理
        self.handle_save()

    def handle_products_num(self):
        print("处理产品总数:")
        # 1.去重处理
        products_num_window = Window.partitionBy('keyword').orderBy(
            F.desc_nulls_last('updated_time')
        )
        self.df_products_num = self.df_products_num.withColumn(
            "u_rank",
            F.row_number().over(window=products_num_window)
        )
        self.df_products_num = self.df_products_num.filter('u_rank=1').drop('u_rank', 'updated_time')
        # 过滤出asin_total_num大于0的词
        self.df_products_num = self.df_products_num.filter('asin_total_num > 0')
        # 2.关联回df_save
        self.df_save = self.df_merchantwords_detail.join(
            self.df_products_num, on=['keyword'], how='inner'
        ).persist(StorageLevel.MEMORY_ONLY)
        # 3.释放资源
        self.df_merchantwords_detail.unpersist()
        self.df_products_num.unpersist()

    def handle_search_term_asin_type(self):
        print("df_search_term_type去重处理:")
        # 1.去重处理,防止爬虫重复抓取
        st_asin_window = Window.partitionBy(['data_type', 'keyword', 'page']).orderBy(
            F.desc_nulls_last('created_time'), F.desc_nulls_last('updated_time')
        )
        self.df_search_term_type = self.df_search_term_type.withColumn(
            "u_rank",
            F.rank().over(window=st_asin_window)
        )
        self.df_search_term_type = self.df_search_term_type.filter('u_rank=1')\
            .drop('u_rank', 'created_time', 'updated_time')

    def handle_st_asin_counts(self, cal_type="asin"):
        print(f"计算{cal_type}_counts")
        cal_type_complete = "keyword" if cal_type == "st" else cal_type
        self.df_search_term_type = self.df_search_term_type.withColumn(
            f"{cal_type}_data_type",
            F.concat(F.lit(f"{cal_type}_"), self.df_search_term_type.data_type, F.lit(f"_counts"))
        )
        df = self.df_search_term_type.groupby([f'{cal_type_complete}'])\
            .pivot(f"{cal_type}_data_type").count()
        df = df.fillna(0)
        df = df.withColumn(
            f"{cal_type}_sb_counts",
            df[f"{cal_type}_sb1_counts"] + df[f"{cal_type}_sb2_counts"] + df[f"{cal_type}_sb3_counts"]
        )
        df = df.withColumn(
            f"{cal_type}_adv_counts",
            df[f"{cal_type}_sb_counts"] + df[f"{cal_type}_sp_counts"]
        )
        df = df.repartition(80)
        if cal_type == "asin":
            df_asin_agg = self.df_search_term_type.groupby(['asin']).agg(
                F.count('keyword').alias("asin_st_counts")
            )
            df = df.join(
                df_asin_agg, on=['asin'], how='left'
            )
        if cal_type == "st":
            # 计算asin数量、内部asin数量及占比
            df_st_agg = self.df_search_term_type\
                 .select("keyword", "asin")\
                 .join(self.df_self_asin, on=['asin'], how='left')\
                 .withColumn("is_self_asin", F.when(F.col("is_self_asin").isNotNull(), F.lit(1)).otherwise(F.lit(0)))\
                 .groupby(['keyword'])\
                 .agg(F.sum('is_self_asin').alias("self_asin_num"),
                      F.count('asin').alias("asin_num"))
            df = df.join(
                df_st_agg, on=['keyword'], how='left'
            ).withColumn(
                "self_asin_proportion",
                F.round(F.col('self_asin_num')/F.col('asin_num'), 4)
            )
        df = df.persist(StorageLevel.MEMORY_AND_DISK)
        return df

    def handle_ao_and_zr_flow_proportion(self):
        print("计算ao+zr流量占比:")
        # 1.计算asin的ao值和zr流量占比
        self.df_asin_count = self.df_asin_count.withColumn(
            "asin_ao_val", F.round(self.df_asin_count.asin_adv_counts / self.df_asin_count.asin_zr_counts, 3)
        ).withColumn(
            "asin_zr_flow_proportion",
            F.when(F.col("asin_st_counts").isNotNull(), F.round(F.col("asin_zr_counts") / F.col("asin_st_counts"), 3))
        )
        # 2.计算st的ao值和zr流量占比--首页zr位asin的平均值
        df_asin_ao_and_zr_flow_proportion = self.df_asin_count.select("asin", "asin_ao_val", "asin_zr_flow_proportion")
        df_st_ao_and_zr_flow_proportion = self.df_search_term_type.filter("data_type='zr' and page=1").select("keyword", "asin").join(
            df_asin_ao_and_zr_flow_proportion, on=['asin'], how='left'
        )
        df_st_ao = df_st_ao_and_zr_flow_proportion\
            .filter("asin_ao_val is not null")\
            .groupby(["keyword"])\
            .agg(F.round(F.mean("asin_ao_val"), 3).alias("st_ao_val"))
        df_st_ao = df_st_ao.repartition(80)
        df_st_zr_flow_proportion = df_st_ao_and_zr_flow_proportion\
            .filter("asin_zr_flow_proportion is not null")\
            .groupby(["keyword"])\
            .agg(F.round(F.mean("asin_zr_flow_proportion"), 3).alias("st_zr_flow_proportion"))
        df_st_zr_flow_proportion = df_st_zr_flow_proportion.repartition(80)
        self.df_st_count = self.df_st_count.join(
            df_st_ao, on=['keyword'], how='left'
        ).join(
            df_st_zr_flow_proportion, on=['keyword'], how='left'
        )
        # 3.关联回df_save
        self.df_save = self.df_save.join(
            self.df_st_count, on=['keyword'], how='left'
        ).persist(StorageLevel.MEMORY_ONLY)
        # 4.释放资源
        self.df_asin_count.unpersist()
        self.df_st_count.unpersist()

    def handle_monthly_sales(self):
        print("计算st月销:")
        # 去重处理
        asin_buy_data_window = Window.partitionBy(['keyword', 'page']).orderBy(
            F.desc_nulls_last('created_time'), F.desc_nulls_last('updated_time')
        )
        self.df_asin_buy_data = self.df_asin_buy_data.withColumn(
            "u_rank",
            F.rank().over(window=asin_buy_data_window)
        )
        self.df_asin_buy_data = self.df_asin_buy_data.filter('u_rank=1')\
            .drop('u_rank', 'created_time', 'updated_time', 'page')
        # 计算月销
        self.df_st_buy_data = self.df_asin_buy_data.withColumn(
            'asin_monthly_sales',
            self.u_parse_amazon_orders('buy_data')
        ).groupby(['keyword']).agg(
            F.sum('asin_monthly_sales').alias("st_monthly_sales")
        )
        # 关联回df_save
        self.df_save = self.df_save.join(
            self.df_st_buy_data, on=['keyword'], how='left'
        ).persist(StorageLevel.MEMORY_ONLY)
        # 释放资源
        self.df_asin_buy_data.unpersist()

    def handle_asin_detail(self):
        print("处理asin_detail:")
        # 1.去重取最新记录
        asin_detail_window = Window.partitionBy('asin').orderBy(
            F.desc_nulls_last('updated_time')
        )
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "u_rank",
            F.row_number().over(window=asin_detail_window)
        )
        self.df_asin_detail = self.df_asin_detail.filter('u_rank=1').drop('u_rank', 'updated_time')
        # 2.字段清洗解析
        self.df_asin_detail = self.df_asin_detail.withColumn(
            'site',
            F.lit(self.site_name)
        ).withColumn(
            'price',
            self.u_parse_asin_price('price')
        ).withColumn(
            'rating',
            self.u_parse_asin_rating('site', 'rating')
        ).withColumn(
            'reviews',
            self.u_parse_asin_reviews('reviews')
        ).withColumn(
            'package_quantity',
            F.when(
                F.col('title').isNotNull(), self.u_get_package_quantity('title')
            ).otherwise(1)
        )
        df_st_asin_detail = self.df_search_term_type.select("keyword", "asin").join(
            self.df_asin_detail, on=['asin'], how='left'
        )
        self.df_st_detail = df_st_asin_detail.groupby(['keyword']).agg(
            F.round(F.avg('rating'), 2).alias("rating_avg"),
            F.round(F.avg('price'), 2).alias("price_avg"),
            F.round(F.avg('reviews'), 0).alias("reviews_avg"),
            F.round(F.count(F.col('package_quantity') > 1)/F.count('asin'), 4).alias("package_quantity"),
        )
        # 3.关联回df_save
        self.df_save = self.df_save.join(
            self.df_st_detail, on=['keyword'], how='left'
        ).persist(StorageLevel.MEMORY_ONLY)
        # 4.释放资源
        self.df_asin_detail.unpersist()
        self.df_search_term_type.unpersist()

    def handle_save(self):
        # 存储前补充字段
        self.df_save = self.df_save.withColumn(
            'listing_sales_avg',
            F.round(F.col("st_monthly_sales")/F.col("asin_num"), 0)
        ).withColumn(
            'site_name',
            F.lit(self.site_name)
        ).withColumn(
            'batch',
            F.lit(self.batch)
        )
        # 空值处理
        self.df_save = self.df_save.na.fill({
            "st_ao_val": -1,
            "st_zr_flow_proportion": -1,
            "asin_total_num": -1,
            "asin_num": -1,
            "self_asin_num": -1,
            "self_asin_proportion": -1,
            "st_sp_counts": -1,
            "st_zr_counts": -1,
            "st_monthly_sales": -1,
            "listing_sales_avg": -1,
            "reviews_avg": -1,
            "rating_avg": -1,
            "price_avg": -1,
            "package_quantity": -1
        })
        self.df_save = self.df_save.select(
            "keyword", "lang", "st_ao_val", "st_zr_flow_proportion", "volume", "avg_3m", "avg_12m", "asin_total_num",
            "asin_num", "self_asin_num", "self_asin_proportion", "st_sp_counts", "st_zr_counts", "st_monthly_sales",
            "listing_sales_avg", "reviews_avg", "rating_avg", "price_avg", "depth", "results_count",
            "sponsored_ads_count", "page_1_reviews", "appearance", "last_seen", "update_time", "last_batch",
            "package_quantity", "site_name", "batch"
        )


if __name__ == '__main__':
    site_name = sys.argv[1]
    date_type = sys.argv[2]
    date_info = sys.argv[3]
    batch = sys.argv[4]
    handle_obj = DwdMerchantwordsMeasure(site_name=site_name, date_type=date_type, date_info=date_info, batch=batch)
    handle_obj.run()