import json
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil, DbTypes
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils

from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from pyspark.sql.types import ArrayType, StringType, MapType, IntegerType, BooleanType

from yswg_utils.udf_util import UdfUtil

_alphaCharsStr = 'A-Za-z\\xAA\\xB5\\xBA\\xC0-\\xD6\\xD8-\\xF6\\xF8-\u02C1\u02C6-\u02D1\u02E0-\u02E4\u02EC\u02EE\u0370-\u0374\u0376\u0377\u037A-\u037D\u037F\u0386\u0388-\u038A\u038C\u038E-\u03A1\u03A3-\u03F5\u03F7-\u0481\u048A-\u052F\u0531-\u0556\u0559\u0561-\u0587\u05D0-\u05EA\u05F0-\u05F2\u0620-\u064A\u066E\u066F\u0671-\u06D3\u06D5\u06E5\u06E6\u06EE\u06EF\u06FA-\u06FC\u06FF\u0710\u0712-\u072F\u074D-\u07A5\u07B1\u07CA-\u07EA\u07F4\u07F5\u07FA\u0800-\u0815\u081A\u0824\u0828\u0840-\u0858\u08A0-\u08B4\u0904-\u0939\u093D\u0950\u0958-\u0961\u0971-\u0980\u0985-\u098C\u098F\u0990\u0993-\u09A8\u09AA-\u09B0\u09B2\u09B6-\u09B9\u09BD\u09CE\u09DC\u09DD\u09DF-\u09E1\u09F0\u09F1\u0A05-\u0A0A\u0A0F\u0A10\u0A13-\u0A28\u0A2A-\u0A30\u0A32\u0A33\u0A35\u0A36\u0A38\u0A39\u0A59-\u0A5C\u0A5E\u0A72-\u0A74\u0A85-\u0A8D\u0A8F-\u0A91\u0A93-\u0AA8\u0AAA-\u0AB0\u0AB2\u0AB3\u0AB5-\u0AB9\u0ABD\u0AD0\u0AE0\u0AE1\u0AF9\u0B05-\u0B0C\u0B0F\u0B10\u0B13-\u0B28\u0B2A-\u0B30\u0B32\u0B33\u0B35-\u0B39\u0B3D\u0B5C\u0B5D\u0B5F-\u0B61\u0B71\u0B83\u0B85-\u0B8A\u0B8E-\u0B90\u0B92-\u0B95\u0B99\u0B9A\u0B9C\u0B9E\u0B9F\u0BA3\u0BA4\u0BA8-\u0BAA\u0BAE-\u0BB9\u0BD0\u0C05-\u0C0C\u0C0E-\u0C10\u0C12-\u0C28\u0C2A-\u0C39\u0C3D\u0C58-\u0C5A\u0C60\u0C61\u0C85-\u0C8C\u0C8E-\u0C90\u0C92-\u0CA8\u0CAA-\u0CB3\u0CB5-\u0CB9\u0CBD\u0CDE\u0CE0\u0CE1\u0CF1\u0CF2\u0D05-\u0D0C\u0D0E-\u0D10\u0D12-\u0D3A\u0D3D\u0D4E\u0D5F-\u0D61\u0D7A-\u0D7F\u0D85-\u0D96\u0D9A-\u0DB1\u0DB3-\u0DBB\u0DBD\u0DC0-\u0DC6\u0E01-\u0E30\u0E32\u0E33\u0E40-\u0E46\u0E81\u0E82\u0E84\u0E87\u0E88\u0E8A\u0E8D\u0E94-\u0E97\u0E99-\u0E9F\u0EA1-\u0EA3\u0EA5\u0EA7\u0EAA\u0EAB\u0EAD-\u0EB0\u0EB2\u0EB3\u0EBD\u0EC0-\u0EC4\u0EC6\u0EDC-\u0EDF\u0F00\u0F40-\u0F47\u0F49-\u0F6C\u0F88-\u0F8C\u1000-\u102A\u103F\u1050-\u1055\u105A-\u105D\u1061\u1065\u1066\u106E-\u1070\u1075-\u1081\u108E\u10A0-\u10C5\u10C7\u10CD\u10D0-\u10FA\u10FC-\u1248\u124A-\u124D\u1250-\u1256\u1258\u125A-\u125D\u1260-\u1288\u128A-\u128D\u1290-\u12B0\u12B2-\u12B5\u12B8-\u12BE\u12C0\u12C2-\u12C5\u12C8-\u12D6\u12D8-\u1310\u1312-\u1315\u1318-\u135A\u1380-\u138F\u13A0-\u13F5\u13F8-\u13FD\u1401-\u166C\u166F-\u167F\u1681-\u169A\u16A0-\u16EA\u16F1-\u16F8\u1700-\u170C\u170E-\u1711\u1720-\u1731\u1740-\u1751\u1760-\u176C\u176E-\u1770\u1780-\u17B3\u17D7\u17DC\u1820-\u1877\u1880-\u18A8\u18AA\u18B0-\u18F5\u1900-\u191E\u1950-\u196D\u1970-\u1974\u1980-\u19AB\u19B0-\u19C9\u1A00-\u1A16\u1A20-\u1A54\u1AA7\u1B05-\u1B33\u1B45-\u1B4B\u1B83-\u1BA0\u1BAE\u1BAF\u1BBA-\u1BE5\u1C00-\u1C23\u1C4D-\u1C4F\u1C5A-\u1C7D\u1CE9-\u1CEC\u1CEE-\u1CF1\u1CF5\u1CF6\u1D00-\u1DBF\u1E00-\u1F15\u1F18-\u1F1D\u1F20-\u1F45\u1F48-\u1F4D\u1F50-\u1F57\u1F59\u1F5B\u1F5D\u1F5F-\u1F7D\u1F80-\u1FB4\u1FB6-\u1FBC\u1FBE\u1FC2-\u1FC4\u1FC6-\u1FCC\u1FD0-\u1FD3\u1FD6-\u1FDB\u1FE0-\u1FEC\u1FF2-\u1FF4\u1FF6-\u1FFC\u2071\u207F\u2090-\u209C\u2102\u2107\u210A-\u2113\u2115\u2119-\u211D\u2124\u2126\u2128\u212A-\u212D\u212F-\u2139\u213C-\u213F\u2145-\u2149\u214E\u2183\u2184\u2C00-\u2C2E\u2C30-\u2C5E\u2C60-\u2CE4\u2CEB-\u2CEE\u2CF2\u2CF3\u2D00-\u2D25\u2D27\u2D2D\u2D30-\u2D67\u2D6F\u2D80-\u2D96\u2DA0-\u2DA6\u2DA8-\u2DAE\u2DB0-\u2DB6\u2DB8-\u2DBE\u2DC0-\u2DC6\u2DC8-\u2DCE\u2DD0-\u2DD6\u2DD8-\u2DDE\u2E2F\u3005\u3006\u3031-\u3035\u303B\u303C\u3041-\u3096\u309D-\u309F\u30A1-\u30FA\u30FC-\u30FF\u3105-\u312D\u3131-\u318E\u31A0-\u31BA\u31F0-\u31FF\u3400-\u4DB5\u4E00-\u9FD5\uA000-\uA48C\uA4D0-\uA4FD\uA500-\uA60C\uA610-\uA61F\uA62A\uA62B\uA640-\uA66E\uA67F-\uA69D\uA6A0-\uA6E5\uA717-\uA71F\uA722-\uA788\uA78B-\uA7AD\uA7B0-\uA7B7\uA7F7-\uA801\uA803-\uA805\uA807-\uA80A\uA80C-\uA822\uA840-\uA873\uA882-\uA8B3\uA8F2-\uA8F7\uA8FB\uA8FD\uA90A-\uA925\uA930-\uA946\uA960-\uA97C\uA984-\uA9B2\uA9CF\uA9E0-\uA9E4\uA9E6-\uA9EF\uA9FA-\uA9FE\uAA00-\uAA28\uAA40-\uAA42\uAA44-\uAA4B\uAA60-\uAA76\uAA7A\uAA7E-\uAAAF\uAAB1\uAAB5\uAAB6\uAAB9-\uAABD\uAAC0\uAAC2\uAADB-\uAADD\uAAE0-\uAAEA\uAAF2-\uAAF4\uAB01-\uAB06\uAB09-\uAB0E\uAB11-\uAB16\uAB20-\uAB26\uAB28-\uAB2E\uAB30-\uAB5A\uAB5C-\uAB65\uAB70-\uABE2\uAC00-\uD7A3\uD7B0-\uD7C6\uD7CB-\uD7FB\uF900-\uFA6D\uFA70-\uFAD9\uFB00-\uFB06\uFB13-\uFB17\uFB1D\uFB1F-\uFB28\uFB2A-\uFB36\uFB38-\uFB3C\uFB3E\uFB40\uFB41\uFB43\uFB44\uFB46-\uFBB1\uFBD3-\uFD3D\uFD50-\uFD8F\uFD92-\uFDC7\uFDF0-\uFDFB\uFE70-\uFE74\uFE76-\uFEFC\uFF21-\uFF3A\uFF41-\uFF5A\uFF66-\uFFBE\uFFC2-\uFFC7\uFFCA-\uFFCF\uFFD2-\uFFD7\uFFDA-\uFFDC'

"""
导入词
"""


def snowball_stem_word(string: str, ignore_stopwords: bool = False):
    """
    此处后面可能需要维护一个近义词词典
    对输入的英文短语 利用 snowball 进行词性还原和分词,返回还原后的词及分词list及命中的停止词即介词;
    停止词文件是相关资源文件在指定目录下
    代码模仿 snowball 官方推荐代码 https://snowballstem.org/demo.html#English
    :param string: 返回的词性还原后的字符串词干
    :param ignore_stopwords: 是否忽略停止词
    :return:
    """
    import re
    from nltk.stem.snowball import SnowballStemmer
    stemmer = SnowballStemmer("english", ignore_stopwords=ignore_stopwords)
    result = ''
    i = 0
    pattern = f"([{_alphaCharsStr}']+)"
    hint_word = []
    hint_stop_word = []
    for word_match in re.finditer(pattern, string):
        word = word_match.group()
        first_index = word_match.span()[0]
        stem_word = stemmer.stem(word)
        if stem_word not in stemmer.stopwords:
            result += re.sub("/[ &<>\n]/g", " ", string[i:first_index])
            hint_word.append(stem_word)
            result += stem_word
        else:
            hint_stop_word.append(stem_word)
        i = first_index + len(word)

    if i < len(string):
        result += string[i:len(string)]

    result = result.strip()
    return result, hint_word, hint_stop_word


def import_langs():
    print("import_langs ..........")
    spark = SparkUtil.get_spark_session("app_name")

    en_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/en.txt")
    de_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/de.txt")
    es_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/es.txt")
    fr_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/fr.txt")

    en_words = en_words.withColumn("lang", F.lit("en"))
    de_words = de_words.withColumn("lang", F.lit("de"))
    es_words = es_words.withColumn("lang", F.lit("es"))
    fr_words = fr_words.withColumn("lang", F.lit("fr"))

    all_words = en_words.unionByName(de_words).unionByName(es_words).unionByName(fr_words)

    all_words = all_words.withColumn("word", F.lower("value"))
    all_words = all_words.drop_duplicates(['word', 'lang'])
    all_words = all_words.groupby("word").agg(
        F.collect_list(F.col("lang")).alias("langs")
    ).select(
        F.col("word"),
        F.col("langs"),
        F.lit("all").alias("part")
    )

    all_words = all_words.repartition(1)

    hive_tb = 'tmp_lang_word'

    partition_dict = {
        "part": "all"
    }
    hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
    HdfsUtils.delete_hdfs_file(hdfs_path)
    partition_by = list(partition_dict.keys())
    print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
    all_words.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
    print("success")


def udf_pack_word_frequency_map(frequency_list):
    lang_word_map = {row['lang']: row['frequency'] for row in frequency_list}
    # 获取最大的那个
    if len(lang_word_map) > 1:
        lang, frequency = sorted(lang_word_map.items(), key=lambda it: (it[1]), reverse=True)[0]
        min_frequency = int(frequency / 100)
        # 过滤
        result_map = {}
        for key in lang_word_map.keys():
            lang_frequency = lang_word_map.get(key)
            if lang_frequency == 1 or lang_frequency > min_frequency:
                result_map[key] = lang_frequency
        return result_map

    return lang_word_map


def import_langs_frequency():
    print("import_langs_frequency ..........")
    spark = SparkUtil.get_spark_session("app_name")
    udf_pack_word_frequency_map_reg = F.udf(udf_pack_word_frequency_map, MapType(StringType(), IntegerType()))

    en_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/en_50k.txt")
    de_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/de_50k.txt")
    es_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/es_50k.txt")
    fr_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/fr_50k.txt")
    #
    en_words = en_words.withColumn("lang", F.lit("en"))
    de_words = de_words.withColumn("lang", F.lit("de"))
    es_words = es_words.withColumn("lang", F.lit("es"))
    fr_words = fr_words.withColumn("lang", F.lit("fr"))

    all_words = en_words.unionByName(de_words).unionByName(es_words).unionByName(fr_words)
    all_words = all_words.withColumn("split", F.split(F.col("value"), "\\s"))

    # 此处对英文词汇进行补全

    all_words = all_words.select(
        F.lower(F.col("split").getItem(0)).alias("word"),
        F.col("split").getItem(1).cast(IntegerType()).alias("frequency"),
        F.col("lang")
    )
    # 此处对英文词汇进行补全
    en_word_all = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/en.txt")
    en_word_all = en_word_all.select(
        F.lower(F.col("value")).alias("word"),
        F.lit(1).alias("frequency"),
        F.lit("en").alias("lang"),
    )

    en_word_all = en_word_all.join(all_words.where("lang == 'en'"), on=['word'], how='left_anti')
    all_words = all_words.unionByName(en_word_all)

    # 此处进行词干提取
    def stem_word():
        pass

    F.udf(UdfUtil.snowball_stem_word, )

    all_words = all_words.groupby("word").agg(
        F.collect_list(F.struct(F.col("lang"), F.col("frequency"))).alias("frequency_list")
    ).select(
        F.col("word"),
        udf_pack_word_frequency_map_reg(F.col("frequency_list")).alias("langs"),
        F.lit("all").alias("part")
    )

    all_words = all_words.repartition(1)

    hive_tb = 'tmp_lang_word_frequency'

    partition_dict = {
        "part": "all"
    }
    hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
    HdfsUtils.delete_hdfs_file(hdfs_path)
    partition_by = list(partition_dict.keys())
    print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
    all_words.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
    print("success")


def test_stem_word():
    print("import_langs_frequency ..........")
    spark = SparkUtil.get_spark_session("app_name")
    udf_pack_word_frequency_map_reg = F.udf(udf_pack_word_frequency_map, MapType(StringType(), IntegerType()))

    en_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/en_50k.txt")
    de_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/de_50k.txt")
    es_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/es_50k.txt")
    fr_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/fr_50k.txt")
    #
    en_words = en_words.withColumn("lang", F.lit("en"))
    de_words = de_words.withColumn("lang", F.lit("de"))
    es_words = es_words.withColumn("lang", F.lit("es"))
    fr_words = fr_words.withColumn("lang", F.lit("fr"))

    all_words = en_words.unionByName(de_words).unionByName(es_words).unionByName(fr_words)
    all_words = all_words.withColumn("split", F.split(F.col("value"), "\\s"))

    # 此处对英文词汇进行补全

    all_words = all_words.select(
        F.lower(F.col("split").getItem(0)).alias("word"),
        F.col("split").getItem(1).cast(IntegerType()).alias("frequency"),
        F.col("lang")
    )
    # 此处对英文词汇进行补全
    en_word_all = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/en.txt")
    en_word_all = en_word_all.select(
        F.lower(F.col("value")).alias("word"),
        F.lit(1).alias("frequency"),
        F.lit("en").alias("lang"),
    )

    en_word_all = en_word_all.join(all_words.where("lang == 'en'"), on=['word'], how='left_anti')
    all_words = all_words.unionByName(en_word_all)

    # 此处进行词干提取
    def stem_word(word: str):
        return snowball_stem_word(word, False)[0]

    udf_stem_word = F.udf(stem_word, StringType())
    all_words = all_words.withColumn("stem_word", udf_stem_word(F.col("word")))

    all_words = all_words.withColumn("max_frequency", F.max("frequency").over(Window.partitionBy(['stem_word', 'lang'])))

    all_words = all_words.groupby("stem_word").agg(
        F.collect_list(F.struct(F.col("lang"), F.col("max_frequency").alias('frequency'))).alias("frequency_list"),
        F.collect_list(F.col('word')).alias("word_before"),
    ).select(
        F.col("stem_word"),
        F.col("word_before"),
        udf_pack_word_frequency_map_reg(F.col("frequency_list")).alias("langs"),
        F.lit("all").alias("part")
    )

    all_words = all_words.repartition(1)

    hive_tb = 'tmp_test_stem_word'

    partition_dict = {
        "part": "all"
    }
    hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
    HdfsUtils.delete_hdfs_file(hdfs_path)
    partition_by = list(partition_dict.keys())
    print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
    all_words.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
    print("success")


def download():
    spark = SparkUtil.get_spark_session("download")

    df_all = spark.sql("""
    select *
    from tmp_keyword_lang_test
    """)
    path = CommonUtil.df_export_csv(spark, df_all, "tmp_keyword_lang_test", limit=100 * 10000)
    print(f"success : {path}")
    pass


def udf_detect_phrase_reg(lang_word_map, word_translate_map: dict):
    def detect_phrase(phrase: str):
        import re
        # phrase = re.sub(r'(\d+\.?\d*|-|\'|\"|,|,|?|\?|/|、|)', '', phrase).strip()
        # 分词
        from nltk.tokenize import word_tokenize
        wordList = list(filter(lambda x: len(x) >= 2, word_tokenize(phrase, "english")))
        tmp_map = {
            "en": {"frequency": 0, "word": []},
            "fr": {"frequency": 0, "word": []},
            "es": {"frequency": 0, "word": []},
            "de": {"frequency": 0, "word": []},
        }
        for word in wordList:
            lang_rank_map: dict = lang_word_map.get(word)
            if lang_rank_map is not None:
                for lang in lang_rank_map.keys():
                    frequency = lang_rank_map[lang]
                    tmp_map[lang]["frequency"] = tmp_map[lang]["frequency"] + frequency
                    tmp_map[lang]["word"].append(word)
            pass

        #  先根据word名称个数倒序后根据分数
        lang, hint_word_map = sorted(tmp_map.items(), key=lambda it: (len(it[1]['word']), it[1]['frequency']), reverse=True)[0]

        if hint_word_map['frequency'] == 0:
            return {"lang": None, "hint_word": None, "translate_val": None}
        else:
            hint_word_list = hint_word_map['word']
            hint_word = " ".join(hint_word_list)
            if len(hint_word) <= 2:
                return {"lang": None, "hint_word": None, "translate_val": None}

            translate_val = None
            if lang == 'en' and word_translate_map is not None:
                translate_val = phrase
                for tmpword in hint_word_list:
                    # 翻译
                    translatearr = word_translate_map.get(tmpword)
                    if translatearr is not None and len(translatearr) >= 1:
                        translate_val = re.sub(rf"\b{tmpword}\b", translatearr[0], translate_val)
                    pass
                pass

            return {"lang": lang, "hint_word": hint_word, "translate_val": translate_val}
        pass

    return F.udf(detect_phrase, MapType(StringType(), StringType()))


def calc_keyword_lang():
    spark = SparkUtil.get_spark_session("calc_keyword_lang")

    lang_word_list = spark.sql("""
        select word, langs
        from big_data_selection.tmp_lang_word_frequency
    """).collect()

    en_cn_dict_list = spark.sql("""
        select word, simple_cn from tmp_en_dict
    """).collect()

    # lang_word_df => 转为map
    lang_word_map = {row['word']: row['langs'] for row in lang_word_list}
    # 翻译 =>文件
    en_cn_dict_map = {row['word']: row['simple_cn'] for row in en_cn_dict_list}

    keyword_df = spark.sql("""
            select keyword
            from big_data_selection.dwt_merchantwords_st_detail
            where site_name='us' limit 1000000
    """).cache()

    keyword_df = keyword_df.withColumn("resultMap", udf_detect_phrase_reg(lang_word_map, en_cn_dict_map)(F.col("keyword")))

    keyword_df = keyword_df.select(
        F.col("keyword"),
        F.col("resultMap").getField("lang").alias("lang"),
        F.col("resultMap").getField("hint_word").alias("hint_word"),
        F.col("resultMap").getField("translate_val").alias("translate_val"),
    )

    keyword_df.write.saveAsTable(name="tmp_keyword_lang_test", format='hive', mode='append')

    print("success")



def udf_test_reg(lang_word_map):
    def detect_phrase(phrase: str):
        # 分词后的词干
        wordList = UdfUtil.snowball_stem_word(phrase, False)[1]

        tmp_map = {
            "en": {"frequency": 0, "word": []},
            "fr": {"frequency": 0, "word": []},
            "es": {"frequency": 0, "word": []},
            "de": {"frequency": 0, "word": []},
        }
        for word in wordList:
            lang_rank_map: dict = lang_word_map.get(word)
            if lang_rank_map is not None:
                for lang in lang_rank_map.keys():
                    frequency = lang_rank_map[lang]
                    tmp_map[lang]["frequency"] = tmp_map[lang]["frequency"] + frequency
                    tmp_map[lang]["word"].append(word)
            pass

        #  先根据word名称个数倒序后根据分数
        lang, hint_word_map = sorted(tmp_map.items(), key=lambda it: (len(it[1]['word']), it[1]['frequency']), reverse=True)[0]

        if hint_word_map['frequency'] == 0:
            return {"lang": None, "hint_word": None, }
        else:
            hint_word_list = hint_word_map['word']
            hint_word = " ".join(hint_word_list)
            if len(hint_word) <= 2:
                return {"lang": None, "hint_word": None, }
            return {"lang": lang, "hint_word": hint_word}
        pass

    return F.udf(detect_phrase, MapType(StringType(), StringType()))


def build_word_frequency_map(lang_word_list):
    word_result_map = {}
    stem_result_map = {}
    for row in lang_word_list:
        word = row['word']
        stem = row['stem']
        frequency = row['frequency']
        lang = row['lang']
        lang_map1 = word_result_map.get(word) or {}
        lang_map1[lang] = frequency
        word_result_map[word] = lang_map1

        lang_map2 = stem_result_map.get(stem) or {}
        lang_map2[lang] = max(lang_map2.get(lang, 0), frequency)
        stem_result_map[word] = lang_map2

    for tmp_map in [word_result_map, stem_result_map]:
        # 去重差异较大值
        for key, value in tmp_map.items():
            max_frequency = max(value.values())
            # 删除
            for key in value.copy().keys():
                frequency = value.get(key)
                if frequency != 1 and int(max_frequency / frequency > 100):
                    del value[key]
            pass

    return word_result_map, stem_result_map


def calc_aba_keyword_lang():
    spark = SparkUtil.get_spark_session("calc_aba_keyword_lang")

    word_sql = f"""
        select word, stem, frequency, lang
        from lang_word_frequency
        """

    conn_info = DBUtil.get_connection_info(db_type=DbTypes.postgresql_test.name, site_name="us")
    word_frequency_df = SparkUtil.read_jdbc_query(
        session=spark,
        url=conn_info["url"],
        pwd=conn_info["pwd"],
        username=conn_info["username"],
        query=word_sql
    ).collect()

    word_result_map, stem_result_map = build_word_frequency_map(word_frequency_df)

    brand_sql = f"""
        select asin_brand_name
        from tmp_amazon_brand_dict
        """
    brand_all_list = spark.sql(brand_sql).cache().collect()
    # 所有的品牌名
    brand_list = {str(row['asin_brand_name']).lower() for row in brand_all_list}

    aba_keyword_df = spark.sql("""
select search_term,
	   site_name
from big_data_selection.ods_st_key
where site_name in (
					'uk',
					'us'
	)
    """).cache()

    aba_keyword_df = aba_keyword_df.withColumn("resultMap", udf_test_reg(stem_result_map)(F.col("search_term")))

def udf_has_brand_reg(brand_list):
    # def detect_phrase(keyword: str):
    #     # keyword
    #
    #     pass
    # pass
    #
    # aba_keyword_df = aba_keyword_df.withColumn("resultMap", udf_test_reg(stem_result_map)(F.col("search_term")))
    #
    #
    # aba_keyword_df = aba_keyword_df.select(
    #     F.col("search_term"),
    #     F.col("resultMap").getField("lang").alias("lang"),
    #     F.col("resultMap").getField("hint_word").alias("hint_word"),
    #     F.col("site_name")
    # )
    #
    # aba_keyword_df.write.saveAsTable(name="tmp_aba_keyword_lang_test_v2", format='hive', mode='append')
    print("success")


def calc_brand_name_list():
    spark = SparkUtil.get_spark_session("calc_aba_keyword_lang")
    brand_name_list = spark.sql("""
    select asin_brand_name as brand_name
    from big_data_selection.dim_cal_asin_history_detail
    where site_name='us'
    """)

    brand_name_list = brand_name_list.withColumn("asin_brand_name", F.lower(F.trim(F.col("asin_brand_name"))))
    brand_name_list = brand_name_list.drop_duplicates()
    brand_name_list.write.saveAsTable(name="tmp_brand_name_list", format='hive', mode='append')

    pass


def ge_all_err_word():
    spark = SparkUtil.get_spark_session("ge_all_err_word")
    keyword_df = spark.sql("""
    select keyword
    from big_data_selection.dwt_merchantwords_st_detail
    where site_name='us'
    """)

    def has_cn(word):
        import re
        pattern = re.compile(r'[\u4e00-\u9fa5]|[\u0800-\u4e00]|[\uac00-\ud7ff]')
        return len(pattern.findall(word)) > 0

    udf_has_cn_reg = F.udf(has_cn, BooleanType())

    keyword_df = keyword_df.withColumn("cn_flag", udf_has_cn_reg(F.col("keyword")))
    keyword_df = keyword_df.where("cn_flag == true")
    keyword_df.write.saveAsTable(name="tmp_error_keyword", format='hive', mode='append')
    print("success")
    pass


def get_simple_translation(translation: str):
    result = []
    for line in translation.split("||"):
        import re
        first_cn = re.split(r'[;,,]', line)[0]
        # 去掉所有[] () ()包裹的词语
        first_cn = re.sub(r'\(.*\)|\[.*\]|(.*)', "", first_cn)
        first_cn = re.sub(r'[a-z]+.', "", first_cn)
        # 去掉所有的词性符号
        result.append(first_cn)
        pass

    return result


# 英文字典翻译
def build_en_dict():
    print("build_en_dict ..........")
    spark = SparkUtil.get_spark_session("build_en_dict")
    en_dict_df = spark.read.json("file:///tmp/wjc_py/yswg_utils/resource/en_dict_all.json")

    #  \n换行符冲突 改为使用 ||替换
    en_dict_df = en_dict_df.withColumn("translation",
                                       F.trim(
                                           F.regexp_replace(F.regexp_replace(F.col("translation"), r"(\t|\u0020)", ""), r"\n", "||"))
                                       )

    udf_get_simple_translation_reg = F.udf(get_simple_translation, ArrayType(StringType()))

    # word	单词名称
    # phonetic	音标,以英语英标为主
    # definition	单词释义(英文),每行一个释义
    # translation	单词释义(中文),每行一个释义
    # pos	词语位置,用 "/" 分割不同位置
    # collins	柯林斯星级
    # oxford	是否是牛津三千核心词汇
    # tag	字符串标签:zk/中考,gk/高考,cet4/四级 等等标签,空格分割
    # bnc	英国国家语料库词频顺序
    # frq	当代语料库词频顺序
    # exchange	时态复数等变换,使用 "/" 分割不同项目,见后面表格
    # detail	json 扩展信息,字典形式保存例句(待添加)
    # audio	读音音频 url (待添加)
    en_dict_df = en_dict_df.select(
        F.trim(F.regexp_replace(F.col("word"), r"[\n\t]", "")).alias("word"),
        F.trim(F.regexp_replace(F.col("sw"), r"[\n\t]", "")).alias("sw"),
        F.trim(F.regexp_replace(F.col("exchange"), r"[\n\t]", "")).alias("exchange"),
        F.col("translation"),
        udf_get_simple_translation_reg(F.col("translation")).alias("simple_cn"),
        F.trim(F.regexp_replace(F.col("collins"), r"[\n\t]", "")).alias("collins"),
    ) \
        .orderBy(F.col("collins").desc())

    # en_dict_df = en_dict_df.where("word=='account'").show(truncate=False)

    en_dict_df.write.saveAsTable(name="tmp_en_dict", format='hive', mode='append')

    pass

    print("success")
    pass


def ge_all_aba_err_word():
    """
    aba查看乱码数据
    :return:
    """
    print("ge_all_aba_err_word ..........")

    spark = SparkUtil.get_spark_session("ge_all_aba_err_word")
    keyword_df = spark.sql("""
        select search_term as keyword
        from big_data_selection.ods_st_key
        """)

    def has_cn(word):
        import re
        pattern = re.compile(r'[\u4e00-\u9fa5]|[\u0800-\u4e00]|[\uac00-\ud7ff]')
        return len(pattern.findall(word)) > 0

    udf_has_cn_reg = F.udf(has_cn, BooleanType())

    keyword_df = keyword_df.withColumn("cn_flag", udf_has_cn_reg(F.col("keyword")))
    keyword_df = keyword_df.where("cn_flag == true")
    keyword_df.write.saveAsTable(name="tmp_aba_error_keyword", format='hive', mode='append')
    print("success")
    pass


def title_contain_brand_name():
    spark = SparkUtil.get_spark_session("title_contain_brand_name")
    flag_df = spark.sql("""
        select flag,
               count(flag)
        from (
                 select title,
                        brand_name,
                        if(locate(lower(brand_name), lower(title)) > 0, 1, 0) as flag
                 from big_data_selection.dwt_bsr_asin_detail
                 where date_info = '2023-11'
                   and site_name = 'us'
                   and title is not null
                   and brand_name is not null
             )
        group by flag
        """)
    flag_df.show()

    pass


def split_word(word: str):
    word = word.replace("+", " ")
    word = word.replace("\"", "")
    from nltk.tokenize import word_tokenize
    return word_tokenize(word, "english")


def calc_amazon_word_frequency():
    spark = SparkUtil.get_spark_session("amazon_word_frequency")
    keyword_df = spark.sql("""
        select keyword  from dwt_merchantwords_st_detail;
        """).cache()

    keyword_df.write.saveAsTable(name="tmp_amazon_word_frequency", format='hive', mode='append')
    pass


def build_brand_name_dict():
    spark = SparkUtil.get_spark_session("build_brand_name_dict")
    save_df = spark.sql("""
        select distinct trim(asin_brand_name) as asin_brand_name
        from big_data_selection.dim_cal_asin_history_detail
        where site_name = 'us'
          and asin_brand_name is not null
        """).cache()

    save_df.write.saveAsTable(name="tmp_amazon_brand_dict", format='hive', mode='append')
    print('success')
    pass





if __name__ == '__main__':
    calc_aba_top100w_asin()
    # calc_aba_keyword_lang()

    # build_brand_name_dict()
    # arr = [
    #     "24 oz",
    #     "skechers+skechers+skechers+skechers+ skechers+ skechers 4runner rtx 3060 ti",
    #     "the shark® ai robot vacmop™ rv2002wd",
    #     "stella & chewy’s carnivore cravings purrfect pate cans – grain free, protein rich wet cat food (case of 24)"
    # ]
    # for s in arr:
    #     print(split_word(s))