import os
import re
import sys

import pandas as pd

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType, IntegerType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from textblob import Word


class DwtAbaAnalyticsTemp(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dwt_aba_analytics_temp'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_st = self.spark.sql(f"select 1+1;")
        self.df_st_dup = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.u_words_pattern = F.udf(self.udf_words_pattern, StringType())
        # 分区参数
        self.partitions_by = ['site_name']
        self.partitions_num = 20

    @staticmethod
    def udf_words_pattern(search_term, words_list_str):
        words_list = []
        for words in eval(words_list_str):
            if "@@" in words:
                words = words.replace("@@", "")
                w_list = words.split("and")
                w_len = len(w_list)
                num = 0
                for w in w_list:
                    if w in search_term:
                        num += 1
                if w_len == num:
                    words_list.append(words.strip())
            else:
                if words in search_term:
                    words_list.append(words.strip())
        if words_list:
            return ';'.join(set(words_list))
        else:
            return None

    def read_data(self):
        sql = f"select search_term, rank, bsr_orders, date_info from dwt_aba_st_analytics where site_name='{self.site_name}' and date_type='month' and date_info between '2022-07' and '2023-06';"
        self.df_st = self.spark.sql(sql).cache()
        self.df_st_dup = self.df_st.drop_duplicates(['search_term']).select('search_term')
        # print(self.df_st.count(), self.df_st_dup.count())  # 21536435 4703181
        self.df_st.show()

        sql = f"select market_type, search_words, search_words_ch from ods_theme_aba where site_name='{self.site_name}';"
        self.df_words = self.spark.sql(sql).cache()
        self.df_words = self.df_words.withColumn("search_words_lower", F.lower("search_words"))
        self.df_words.show()

        df_words = self.df_words.toPandas()
        # self.df_words = pd.read_excel(f"/mnt/data/img_data/aba_data_2023-07-25.xlsx", sheet_name='Sheet2')
        df_words['search_words_lower'] = df_words.search_words.apply(lambda x: x.lower())
        # 颜色主题加上@@符号标记

        df_words_1 = df_words.loc[df_words.market_type=='颜色']
        df_words_1.search_words_lower = df_words_1.search_words_lower.apply(lambda x: x+"@@")
        df_words_2 = df_words.loc[df_words.market_type!='颜色']
        words_list1 = list(set(df_words_1.search_words_lower))
        words_list2 = list(set(df_words_2.search_words_lower))

        words_list1.extend(words_list2)

        # words_list = list(set(df_words.search_words_lower))
        self.words_list_str = str([f" {words} " for words in words_list1])

        # print("self.words_list_str:", self.words_list_str)
        # # 根据'search_words_ch'分组,并将'search_words'列的多个值合并成一行
        # df_result = self.df_words.groupby(['market_type', 'search_words_ch'])['search_words'].apply(';'.join).reset_index()
        # self.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

        # self.df_words = self.spark.createDataFrame(self.df_words)

    def handle_data(self):
        self.df_st_dup = self.df_st_dup.withColumn("search_term_lower", F.lower(self.df_st_dup["search_term"]))
        self.df_st_dup = self.df_st_dup.withColumn("search_term_lower", F.concat(F.lit(" "), "search_term_lower", F.lit(" ")))  # 标题两头加空字符串用来匹配整个词
        # self.df_st_dup.filter("search_term_lower like '% dark grey %'").show()
        # self.df_st_dup.show()
        # 匹配
        self.df_st_dup = self.df_st_dup.withColumn("words_en_lower", self.u_words_pattern('search_term_lower', F.lit(self.words_list_str)))
        # 将列拆分为数组多列
        self.df_st_dup = self.df_st_dup.withColumn("words_en_lower", F.split(self.df_st_dup["words_en_lower"], ";"))
        # 将数组合并到多行
        self.df_st_dup = self.df_st_dup.withColumn("search_words_lower", F.explode(self.df_st_dup["words_en_lower"]))
        # self.df_st_dup.show(30, truncate=False)
        self.df_st_dup = self.df_st_dup.join(
            self.df_words, on=['search_words_lower'], how='left'  # 改成inner, 这样避免正则匹配结果不准
        )
        # self.df_st_dup.show(30, truncate=False)
        # self.df_st_dup.filter("search_term_lower like '% dark grey %'").show()
        # quit()
        # self.df_st_dup.filter("search_term_lower='mini electric chopper'").show(30, truncate=False)

        # df_st_dup = self.df_st_dup.toPandas()
        # df_st_dup = df_st_dup.merge(
        #     self.df_words, on=['search_words_lower'], how='left'
        # )
        # print(df_st_dup.head(20))

        self.df_st = self.df_st.join(
            self.df_st_dup, on='search_term', how='left'
        )
        self.df_st.show(30, truncate=False)
        self.df_st.filter("search_term_lower like '% dark grey %'").show()

        self.df_save = self.df_st.groupBy(['search_words_ch']).pivot("date_info").agg(
            F.min("rank"),
            F.count("search_term"),
            F.sum("bsr_orders")
        )
        # self.df_save.show(30, truncate=False)
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        df_save = self.df_save.toPandas()
        df_save.to_csv("/root/theme.csv", index=False)


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    handle_obj = DwtAbaAnalyticsTemp(site_name=site_name)
    handle_obj.run()