import json import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.db_util import DBUtil, DbTypes from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F, Window from pyspark.sql.types import ArrayType, StringType, MapType, IntegerType, BooleanType from yswg_utils.udf_util import UdfUtil _alphaCharsStr = 'A-Za-z\\xAA\\xB5\\xBA\\xC0-\\xD6\\xD8-\\xF6\\xF8-\u02C1\u02C6-\u02D1\u02E0-\u02E4\u02EC\u02EE\u0370-\u0374\u0376\u0377\u037A-\u037D\u037F\u0386\u0388-\u038A\u038C\u038E-\u03A1\u03A3-\u03F5\u03F7-\u0481\u048A-\u052F\u0531-\u0556\u0559\u0561-\u0587\u05D0-\u05EA\u05F0-\u05F2\u0620-\u064A\u066E\u066F\u0671-\u06D3\u06D5\u06E5\u06E6\u06EE\u06EF\u06FA-\u06FC\u06FF\u0710\u0712-\u072F\u074D-\u07A5\u07B1\u07CA-\u07EA\u07F4\u07F5\u07FA\u0800-\u0815\u081A\u0824\u0828\u0840-\u0858\u08A0-\u08B4\u0904-\u0939\u093D\u0950\u0958-\u0961\u0971-\u0980\u0985-\u098C\u098F\u0990\u0993-\u09A8\u09AA-\u09B0\u09B2\u09B6-\u09B9\u09BD\u09CE\u09DC\u09DD\u09DF-\u09E1\u09F0\u09F1\u0A05-\u0A0A\u0A0F\u0A10\u0A13-\u0A28\u0A2A-\u0A30\u0A32\u0A33\u0A35\u0A36\u0A38\u0A39\u0A59-\u0A5C\u0A5E\u0A72-\u0A74\u0A85-\u0A8D\u0A8F-\u0A91\u0A93-\u0AA8\u0AAA-\u0AB0\u0AB2\u0AB3\u0AB5-\u0AB9\u0ABD\u0AD0\u0AE0\u0AE1\u0AF9\u0B05-\u0B0C\u0B0F\u0B10\u0B13-\u0B28\u0B2A-\u0B30\u0B32\u0B33\u0B35-\u0B39\u0B3D\u0B5C\u0B5D\u0B5F-\u0B61\u0B71\u0B83\u0B85-\u0B8A\u0B8E-\u0B90\u0B92-\u0B95\u0B99\u0B9A\u0B9C\u0B9E\u0B9F\u0BA3\u0BA4\u0BA8-\u0BAA\u0BAE-\u0BB9\u0BD0\u0C05-\u0C0C\u0C0E-\u0C10\u0C12-\u0C28\u0C2A-\u0C39\u0C3D\u0C58-\u0C5A\u0C60\u0C61\u0C85-\u0C8C\u0C8E-\u0C90\u0C92-\u0CA8\u0CAA-\u0CB3\u0CB5-\u0CB9\u0CBD\u0CDE\u0CE0\u0CE1\u0CF1\u0CF2\u0D05-\u0D0C\u0D0E-\u0D10\u0D12-\u0D3A\u0D3D\u0D4E\u0D5F-\u0D61\u0D7A-\u0D7F\u0D85-\u0D96\u0D9A-\u0DB1\u0DB3-\u0DBB\u0DBD\u0DC0-\u0DC6\u0E01-\u0E30\u0E32\u0E33\u0E40-\u0E46\u0E81\u0E82\u0E84\u0E87\u0E88\u0E8A\u0E8D\u0E94-\u0E97\u0E99-\u0E9F\u0EA1-\u0EA3\u0EA5\u0EA7\u0EAA\u0EAB\u0EAD-\u0EB0\u0EB2\u0EB3\u0EBD\u0EC0-\u0EC4\u0EC6\u0EDC-\u0EDF\u0F00\u0F40-\u0F47\u0F49-\u0F6C\u0F88-\u0F8C\u1000-\u102A\u103F\u1050-\u1055\u105A-\u105D\u1061\u1065\u1066\u106E-\u1070\u1075-\u1081\u108E\u10A0-\u10C5\u10C7\u10CD\u10D0-\u10FA\u10FC-\u1248\u124A-\u124D\u1250-\u1256\u1258\u125A-\u125D\u1260-\u1288\u128A-\u128D\u1290-\u12B0\u12B2-\u12B5\u12B8-\u12BE\u12C0\u12C2-\u12C5\u12C8-\u12D6\u12D8-\u1310\u1312-\u1315\u1318-\u135A\u1380-\u138F\u13A0-\u13F5\u13F8-\u13FD\u1401-\u166C\u166F-\u167F\u1681-\u169A\u16A0-\u16EA\u16F1-\u16F8\u1700-\u170C\u170E-\u1711\u1720-\u1731\u1740-\u1751\u1760-\u176C\u176E-\u1770\u1780-\u17B3\u17D7\u17DC\u1820-\u1877\u1880-\u18A8\u18AA\u18B0-\u18F5\u1900-\u191E\u1950-\u196D\u1970-\u1974\u1980-\u19AB\u19B0-\u19C9\u1A00-\u1A16\u1A20-\u1A54\u1AA7\u1B05-\u1B33\u1B45-\u1B4B\u1B83-\u1BA0\u1BAE\u1BAF\u1BBA-\u1BE5\u1C00-\u1C23\u1C4D-\u1C4F\u1C5A-\u1C7D\u1CE9-\u1CEC\u1CEE-\u1CF1\u1CF5\u1CF6\u1D00-\u1DBF\u1E00-\u1F15\u1F18-\u1F1D\u1F20-\u1F45\u1F48-\u1F4D\u1F50-\u1F57\u1F59\u1F5B\u1F5D\u1F5F-\u1F7D\u1F80-\u1FB4\u1FB6-\u1FBC\u1FBE\u1FC2-\u1FC4\u1FC6-\u1FCC\u1FD0-\u1FD3\u1FD6-\u1FDB\u1FE0-\u1FEC\u1FF2-\u1FF4\u1FF6-\u1FFC\u2071\u207F\u2090-\u209C\u2102\u2107\u210A-\u2113\u2115\u2119-\u211D\u2124\u2126\u2128\u212A-\u212D\u212F-\u2139\u213C-\u213F\u2145-\u2149\u214E\u2183\u2184\u2C00-\u2C2E\u2C30-\u2C5E\u2C60-\u2CE4\u2CEB-\u2CEE\u2CF2\u2CF3\u2D00-\u2D25\u2D27\u2D2D\u2D30-\u2D67\u2D6F\u2D80-\u2D96\u2DA0-\u2DA6\u2DA8-\u2DAE\u2DB0-\u2DB6\u2DB8-\u2DBE\u2DC0-\u2DC6\u2DC8-\u2DCE\u2DD0-\u2DD6\u2DD8-\u2DDE\u2E2F\u3005\u3006\u3031-\u3035\u303B\u303C\u3041-\u3096\u309D-\u309F\u30A1-\u30FA\u30FC-\u30FF\u3105-\u312D\u3131-\u318E\u31A0-\u31BA\u31F0-\u31FF\u3400-\u4DB5\u4E00-\u9FD5\uA000-\uA48C\uA4D0-\uA4FD\uA500-\uA60C\uA610-\uA61F\uA62A\uA62B\uA640-\uA66E\uA67F-\uA69D\uA6A0-\uA6E5\uA717-\uA71F\uA722-\uA788\uA78B-\uA7AD\uA7B0-\uA7B7\uA7F7-\uA801\uA803-\uA805\uA807-\uA80A\uA80C-\uA822\uA840-\uA873\uA882-\uA8B3\uA8F2-\uA8F7\uA8FB\uA8FD\uA90A-\uA925\uA930-\uA946\uA960-\uA97C\uA984-\uA9B2\uA9CF\uA9E0-\uA9E4\uA9E6-\uA9EF\uA9FA-\uA9FE\uAA00-\uAA28\uAA40-\uAA42\uAA44-\uAA4B\uAA60-\uAA76\uAA7A\uAA7E-\uAAAF\uAAB1\uAAB5\uAAB6\uAAB9-\uAABD\uAAC0\uAAC2\uAADB-\uAADD\uAAE0-\uAAEA\uAAF2-\uAAF4\uAB01-\uAB06\uAB09-\uAB0E\uAB11-\uAB16\uAB20-\uAB26\uAB28-\uAB2E\uAB30-\uAB5A\uAB5C-\uAB65\uAB70-\uABE2\uAC00-\uD7A3\uD7B0-\uD7C6\uD7CB-\uD7FB\uF900-\uFA6D\uFA70-\uFAD9\uFB00-\uFB06\uFB13-\uFB17\uFB1D\uFB1F-\uFB28\uFB2A-\uFB36\uFB38-\uFB3C\uFB3E\uFB40\uFB41\uFB43\uFB44\uFB46-\uFBB1\uFBD3-\uFD3D\uFD50-\uFD8F\uFD92-\uFDC7\uFDF0-\uFDFB\uFE70-\uFE74\uFE76-\uFEFC\uFF21-\uFF3A\uFF41-\uFF5A\uFF66-\uFFBE\uFFC2-\uFFC7\uFFCA-\uFFCF\uFFD2-\uFFD7\uFFDA-\uFFDC' """ 导入词 """ def snowball_stem_word(string: str, ignore_stopwords: bool = False): """ 此处后面可能需要维护一个近义词词典 对输入的英文短语 利用 snowball 进行词性还原和分词,返回还原后的词及分词list及命中的停止词即介词; 停止词文件是相关资源文件在指定目录下 代码模仿 snowball 官方推荐代码 https://snowballstem.org/demo.html#English :param string: 返回的词性还原后的字符串词干 :param ignore_stopwords: 是否忽略停止词 :return: """ import re from nltk.stem.snowball import SnowballStemmer stemmer = SnowballStemmer("english", ignore_stopwords=ignore_stopwords) result = '' i = 0 pattern = f"([{_alphaCharsStr}']+)" hint_word = [] hint_stop_word = [] for word_match in re.finditer(pattern, string): word = word_match.group() first_index = word_match.span()[0] stem_word = stemmer.stem(word) if stem_word not in stemmer.stopwords: result += re.sub("/[ &<>\n]/g", " ", string[i:first_index]) hint_word.append(stem_word) result += stem_word else: hint_stop_word.append(stem_word) i = first_index + len(word) if i < len(string): result += string[i:len(string)] result = result.strip() return result, hint_word, hint_stop_word def import_langs(): print("import_langs ..........") spark = SparkUtil.get_spark_session("app_name") en_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/en.txt") de_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/de.txt") es_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/es.txt") fr_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/fr.txt") en_words = en_words.withColumn("lang", F.lit("en")) de_words = de_words.withColumn("lang", F.lit("de")) es_words = es_words.withColumn("lang", F.lit("es")) fr_words = fr_words.withColumn("lang", F.lit("fr")) all_words = en_words.unionByName(de_words).unionByName(es_words).unionByName(fr_words) all_words = all_words.withColumn("word", F.lower("value")) all_words = all_words.drop_duplicates(['word', 'lang']) all_words = all_words.groupby("word").agg( F.collect_list(F.col("lang")).alias("langs") ).select( F.col("word"), F.col("langs"), F.lit("all").alias("part") ) all_words = all_words.repartition(1) hive_tb = 'tmp_lang_word' partition_dict = { "part": "all" } hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict) HdfsUtils.delete_hdfs_file(hdfs_path) partition_by = list(partition_dict.keys()) print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", ) all_words.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") def udf_pack_word_frequency_map(frequency_list): lang_word_map = {row['lang']: row['frequency'] for row in frequency_list} # 获取最大的那个 if len(lang_word_map) > 1: lang, frequency = sorted(lang_word_map.items(), key=lambda it: (it[1]), reverse=True)[0] min_frequency = int(frequency / 100) # 过滤 result_map = {} for key in lang_word_map.keys(): lang_frequency = lang_word_map.get(key) if lang_frequency == 1 or lang_frequency > min_frequency: result_map[key] = lang_frequency return result_map return lang_word_map def import_langs_frequency(): print("import_langs_frequency ..........") spark = SparkUtil.get_spark_session("app_name") udf_pack_word_frequency_map_reg = F.udf(udf_pack_word_frequency_map, MapType(StringType(), IntegerType())) en_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/en_50k.txt") de_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/de_50k.txt") es_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/es_50k.txt") fr_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/fr_50k.txt") # en_words = en_words.withColumn("lang", F.lit("en")) de_words = de_words.withColumn("lang", F.lit("de")) es_words = es_words.withColumn("lang", F.lit("es")) fr_words = fr_words.withColumn("lang", F.lit("fr")) all_words = en_words.unionByName(de_words).unionByName(es_words).unionByName(fr_words) all_words = all_words.withColumn("split", F.split(F.col("value"), "\\s")) # 此处对英文词汇进行补全 all_words = all_words.select( F.lower(F.col("split").getItem(0)).alias("word"), F.col("split").getItem(1).cast(IntegerType()).alias("frequency"), F.col("lang") ) # 此处对英文词汇进行补全 en_word_all = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/en.txt") en_word_all = en_word_all.select( F.lower(F.col("value")).alias("word"), F.lit(1).alias("frequency"), F.lit("en").alias("lang"), ) en_word_all = en_word_all.join(all_words.where("lang == 'en'"), on=['word'], how='left_anti') all_words = all_words.unionByName(en_word_all) # 此处进行词干提取 def stem_word(): pass F.udf(UdfUtil.snowball_stem_word, ) all_words = all_words.groupby("word").agg( F.collect_list(F.struct(F.col("lang"), F.col("frequency"))).alias("frequency_list") ).select( F.col("word"), udf_pack_word_frequency_map_reg(F.col("frequency_list")).alias("langs"), F.lit("all").alias("part") ) all_words = all_words.repartition(1) hive_tb = 'tmp_lang_word_frequency' partition_dict = { "part": "all" } hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict) HdfsUtils.delete_hdfs_file(hdfs_path) partition_by = list(partition_dict.keys()) print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", ) all_words.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") def test_stem_word(): print("import_langs_frequency ..........") spark = SparkUtil.get_spark_session("app_name") udf_pack_word_frequency_map_reg = F.udf(udf_pack_word_frequency_map, MapType(StringType(), IntegerType())) en_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/en_50k.txt") de_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/de_50k.txt") es_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/es_50k.txt") fr_words = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/frequency/fr_50k.txt") # en_words = en_words.withColumn("lang", F.lit("en")) de_words = de_words.withColumn("lang", F.lit("de")) es_words = es_words.withColumn("lang", F.lit("es")) fr_words = fr_words.withColumn("lang", F.lit("fr")) all_words = en_words.unionByName(de_words).unionByName(es_words).unionByName(fr_words) all_words = all_words.withColumn("split", F.split(F.col("value"), "\\s")) # 此处对英文词汇进行补全 all_words = all_words.select( F.lower(F.col("split").getItem(0)).alias("word"), F.col("split").getItem(1).cast(IntegerType()).alias("frequency"), F.col("lang") ) # 此处对英文词汇进行补全 en_word_all = spark.read.text("file:///tmp/wjc_py/yswg_utils/resource/en.txt") en_word_all = en_word_all.select( F.lower(F.col("value")).alias("word"), F.lit(1).alias("frequency"), F.lit("en").alias("lang"), ) en_word_all = en_word_all.join(all_words.where("lang == 'en'"), on=['word'], how='left_anti') all_words = all_words.unionByName(en_word_all) # 此处进行词干提取 def stem_word(word: str): return snowball_stem_word(word, False)[0] udf_stem_word = F.udf(stem_word, StringType()) all_words = all_words.withColumn("stem_word", udf_stem_word(F.col("word"))) all_words = all_words.withColumn("max_frequency", F.max("frequency").over(Window.partitionBy(['stem_word', 'lang']))) all_words = all_words.groupby("stem_word").agg( F.collect_list(F.struct(F.col("lang"), F.col("max_frequency").alias('frequency'))).alias("frequency_list"), F.collect_list(F.col('word')).alias("word_before"), ).select( F.col("stem_word"), F.col("word_before"), udf_pack_word_frequency_map_reg(F.col("frequency_list")).alias("langs"), F.lit("all").alias("part") ) all_words = all_words.repartition(1) hive_tb = 'tmp_test_stem_word' partition_dict = { "part": "all" } hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict) HdfsUtils.delete_hdfs_file(hdfs_path) partition_by = list(partition_dict.keys()) print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", ) all_words.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") def download(): spark = SparkUtil.get_spark_session("download") df_all = spark.sql(""" select * from tmp_keyword_lang_test """) path = CommonUtil.df_export_csv(spark, df_all, "tmp_keyword_lang_test", limit=100 * 10000) print(f"success : {path}") pass def udf_detect_phrase_reg(lang_word_map, word_translate_map: dict): def detect_phrase(phrase: str): import re # phrase = re.sub(r'(\d+\.?\d*|-|\'|\"|,|,|?|\?|/|、|)', '', phrase).strip() # 分词 from nltk.tokenize import word_tokenize wordList = list(filter(lambda x: len(x) >= 2, word_tokenize(phrase, "english"))) tmp_map = { "en": {"frequency": 0, "word": []}, "fr": {"frequency": 0, "word": []}, "es": {"frequency": 0, "word": []}, "de": {"frequency": 0, "word": []}, } for word in wordList: lang_rank_map: dict = lang_word_map.get(word) if lang_rank_map is not None: for lang in lang_rank_map.keys(): frequency = lang_rank_map[lang] tmp_map[lang]["frequency"] = tmp_map[lang]["frequency"] + frequency tmp_map[lang]["word"].append(word) pass # 先根据word名称个数倒序后根据分数 lang, hint_word_map = sorted(tmp_map.items(), key=lambda it: (len(it[1]['word']), it[1]['frequency']), reverse=True)[0] if hint_word_map['frequency'] == 0: return {"lang": None, "hint_word": None, "translate_val": None} else: hint_word_list = hint_word_map['word'] hint_word = " ".join(hint_word_list) if len(hint_word) <= 2: return {"lang": None, "hint_word": None, "translate_val": None} translate_val = None if lang == 'en' and word_translate_map is not None: translate_val = phrase for tmpword in hint_word_list: # 翻译 translatearr = word_translate_map.get(tmpword) if translatearr is not None and len(translatearr) >= 1: translate_val = re.sub(rf"\b{tmpword}\b", translatearr[0], translate_val) pass pass return {"lang": lang, "hint_word": hint_word, "translate_val": translate_val} pass return F.udf(detect_phrase, MapType(StringType(), StringType())) def calc_keyword_lang(): spark = SparkUtil.get_spark_session("calc_keyword_lang") lang_word_list = spark.sql(""" select word, langs from big_data_selection.tmp_lang_word_frequency """).collect() en_cn_dict_list = spark.sql(""" select word, simple_cn from tmp_en_dict """).collect() # lang_word_df => 转为map lang_word_map = {row['word']: row['langs'] for row in lang_word_list} # 翻译 =>文件 en_cn_dict_map = {row['word']: row['simple_cn'] for row in en_cn_dict_list} keyword_df = spark.sql(""" select keyword from big_data_selection.dwt_merchantwords_st_detail where site_name='us' limit 1000000 """).cache() keyword_df = keyword_df.withColumn("resultMap", udf_detect_phrase_reg(lang_word_map, en_cn_dict_map)(F.col("keyword"))) keyword_df = keyword_df.select( F.col("keyword"), F.col("resultMap").getField("lang").alias("lang"), F.col("resultMap").getField("hint_word").alias("hint_word"), F.col("resultMap").getField("translate_val").alias("translate_val"), ) keyword_df.write.saveAsTable(name="tmp_keyword_lang_test", format='hive', mode='append') print("success") def udf_test_reg(lang_word_map): def detect_phrase(phrase: str): # 分词后的词干 wordList = UdfUtil.snowball_stem_word(phrase, False)[1] tmp_map = { "en": {"frequency": 0, "word": []}, "fr": {"frequency": 0, "word": []}, "es": {"frequency": 0, "word": []}, "de": {"frequency": 0, "word": []}, } for word in wordList: lang_rank_map: dict = lang_word_map.get(word) if lang_rank_map is not None: for lang in lang_rank_map.keys(): frequency = lang_rank_map[lang] tmp_map[lang]["frequency"] = tmp_map[lang]["frequency"] + frequency tmp_map[lang]["word"].append(word) pass # 先根据word名称个数倒序后根据分数 lang, hint_word_map = sorted(tmp_map.items(), key=lambda it: (len(it[1]['word']), it[1]['frequency']), reverse=True)[0] if hint_word_map['frequency'] == 0: return {"lang": None, "hint_word": None, } else: hint_word_list = hint_word_map['word'] hint_word = " ".join(hint_word_list) if len(hint_word) <= 2: return {"lang": None, "hint_word": None, } return {"lang": lang, "hint_word": hint_word} pass return F.udf(detect_phrase, MapType(StringType(), StringType())) def build_word_frequency_map(lang_word_list): word_result_map = {} stem_result_map = {} for row in lang_word_list: word = row['word'] stem = row['stem'] frequency = row['frequency'] lang = row['lang'] lang_map1 = word_result_map.get(word) or {} lang_map1[lang] = frequency word_result_map[word] = lang_map1 lang_map2 = stem_result_map.get(stem) or {} lang_map2[lang] = max(lang_map2.get(lang, 0), frequency) stem_result_map[word] = lang_map2 for tmp_map in [word_result_map, stem_result_map]: # 去重差异较大值 for key, value in tmp_map.items(): max_frequency = max(value.values()) # 删除 for key in value.copy().keys(): frequency = value.get(key) if frequency != 1 and int(max_frequency / frequency > 100): del value[key] pass return word_result_map, stem_result_map def calc_aba_keyword_lang(): spark = SparkUtil.get_spark_session("calc_aba_keyword_lang") word_sql = f""" select word, stem, frequency, lang from lang_word_frequency """ conn_info = DBUtil.get_connection_info(db_type=DbTypes.postgresql_test.name, site_name="us") word_frequency_df = SparkUtil.read_jdbc_query( session=spark, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=word_sql ).collect() word_result_map, stem_result_map = build_word_frequency_map(word_frequency_df) brand_sql = f""" select asin_brand_name from tmp_amazon_brand_dict """ brand_all_list = spark.sql(brand_sql).cache().collect() # 所有的品牌名 brand_list = {str(row['asin_brand_name']).lower() for row in brand_all_list} aba_keyword_df = spark.sql(""" select search_term, site_name from big_data_selection.ods_st_key where site_name in ( 'uk', 'us' ) """).cache() aba_keyword_df = aba_keyword_df.withColumn("resultMap", udf_test_reg(stem_result_map)(F.col("search_term"))) def udf_has_brand_reg(brand_list): # def detect_phrase(keyword: str): # # keyword # # pass # pass # # aba_keyword_df = aba_keyword_df.withColumn("resultMap", udf_test_reg(stem_result_map)(F.col("search_term"))) # # # aba_keyword_df = aba_keyword_df.select( # F.col("search_term"), # F.col("resultMap").getField("lang").alias("lang"), # F.col("resultMap").getField("hint_word").alias("hint_word"), # F.col("site_name") # ) # # aba_keyword_df.write.saveAsTable(name="tmp_aba_keyword_lang_test_v2", format='hive', mode='append') print("success") def calc_brand_name_list(): spark = SparkUtil.get_spark_session("calc_aba_keyword_lang") brand_name_list = spark.sql(""" select asin_brand_name as brand_name from big_data_selection.dim_cal_asin_history_detail where site_name='us' """) brand_name_list = brand_name_list.withColumn("asin_brand_name", F.lower(F.trim(F.col("asin_brand_name")))) brand_name_list = brand_name_list.drop_duplicates() brand_name_list.write.saveAsTable(name="tmp_brand_name_list", format='hive', mode='append') pass def ge_all_err_word(): spark = SparkUtil.get_spark_session("ge_all_err_word") keyword_df = spark.sql(""" select keyword from big_data_selection.dwt_merchantwords_st_detail where site_name='us' """) def has_cn(word): import re pattern = re.compile(r'[\u4e00-\u9fa5]|[\u0800-\u4e00]|[\uac00-\ud7ff]') return len(pattern.findall(word)) > 0 udf_has_cn_reg = F.udf(has_cn, BooleanType()) keyword_df = keyword_df.withColumn("cn_flag", udf_has_cn_reg(F.col("keyword"))) keyword_df = keyword_df.where("cn_flag == true") keyword_df.write.saveAsTable(name="tmp_error_keyword", format='hive', mode='append') print("success") pass def get_simple_translation(translation: str): result = [] for line in translation.split("||"): import re first_cn = re.split(r'[;,,]', line)[0] # 去掉所有[] () ()包裹的词语 first_cn = re.sub(r'\(.*\)|\[.*\]|(.*)', "", first_cn) first_cn = re.sub(r'[a-z]+.', "", first_cn) # 去掉所有的词性符号 result.append(first_cn) pass return result # 英文字典翻译 def build_en_dict(): print("build_en_dict ..........") spark = SparkUtil.get_spark_session("build_en_dict") en_dict_df = spark.read.json("file:///tmp/wjc_py/yswg_utils/resource/en_dict_all.json") # \n换行符冲突 改为使用 ||替换 en_dict_df = en_dict_df.withColumn("translation", F.trim( F.regexp_replace(F.regexp_replace(F.col("translation"), r"(\t|\u0020)", ""), r"\n", "||")) ) udf_get_simple_translation_reg = F.udf(get_simple_translation, ArrayType(StringType())) # word 单词名称 # phonetic 音标,以英语英标为主 # definition 单词释义(英文),每行一个释义 # translation 单词释义(中文),每行一个释义 # pos 词语位置,用 "/" 分割不同位置 # collins 柯林斯星级 # oxford 是否是牛津三千核心词汇 # tag 字符串标签:zk/中考,gk/高考,cet4/四级 等等标签,空格分割 # bnc 英国国家语料库词频顺序 # frq 当代语料库词频顺序 # exchange 时态复数等变换,使用 "/" 分割不同项目,见后面表格 # detail json 扩展信息,字典形式保存例句(待添加) # audio 读音音频 url (待添加) en_dict_df = en_dict_df.select( F.trim(F.regexp_replace(F.col("word"), r"[\n\t]", "")).alias("word"), F.trim(F.regexp_replace(F.col("sw"), r"[\n\t]", "")).alias("sw"), F.trim(F.regexp_replace(F.col("exchange"), r"[\n\t]", "")).alias("exchange"), F.col("translation"), udf_get_simple_translation_reg(F.col("translation")).alias("simple_cn"), F.trim(F.regexp_replace(F.col("collins"), r"[\n\t]", "")).alias("collins"), ) \ .orderBy(F.col("collins").desc()) # en_dict_df = en_dict_df.where("word=='account'").show(truncate=False) en_dict_df.write.saveAsTable(name="tmp_en_dict", format='hive', mode='append') pass print("success") pass def ge_all_aba_err_word(): """ aba查看乱码数据 :return: """ print("ge_all_aba_err_word ..........") spark = SparkUtil.get_spark_session("ge_all_aba_err_word") keyword_df = spark.sql(""" select search_term as keyword from big_data_selection.ods_st_key """) def has_cn(word): import re pattern = re.compile(r'[\u4e00-\u9fa5]|[\u0800-\u4e00]|[\uac00-\ud7ff]') return len(pattern.findall(word)) > 0 udf_has_cn_reg = F.udf(has_cn, BooleanType()) keyword_df = keyword_df.withColumn("cn_flag", udf_has_cn_reg(F.col("keyword"))) keyword_df = keyword_df.where("cn_flag == true") keyword_df.write.saveAsTable(name="tmp_aba_error_keyword", format='hive', mode='append') print("success") pass def title_contain_brand_name(): spark = SparkUtil.get_spark_session("title_contain_brand_name") flag_df = spark.sql(""" select flag, count(flag) from ( select title, brand_name, if(locate(lower(brand_name), lower(title)) > 0, 1, 0) as flag from big_data_selection.dwt_bsr_asin_detail where date_info = '2023-11' and site_name = 'us' and title is not null and brand_name is not null ) group by flag """) flag_df.show() pass def split_word(word: str): word = word.replace("+", " ") word = word.replace("\"", "") from nltk.tokenize import word_tokenize return word_tokenize(word, "english") def calc_amazon_word_frequency(): spark = SparkUtil.get_spark_session("amazon_word_frequency") keyword_df = spark.sql(""" select keyword from dwt_merchantwords_st_detail; """).cache() keyword_df.write.saveAsTable(name="tmp_amazon_word_frequency", format='hive', mode='append') pass def build_brand_name_dict(): spark = SparkUtil.get_spark_session("build_brand_name_dict") save_df = spark.sql(""" select distinct trim(asin_brand_name) as asin_brand_name from big_data_selection.dim_cal_asin_history_detail where site_name = 'us' and asin_brand_name is not null """).cache() save_df.write.saveAsTable(name="tmp_amazon_brand_dict", format='hive', mode='append') print('success') pass if __name__ == '__main__': calc_aba_top100w_asin() # calc_aba_keyword_lang() # build_brand_name_dict() # arr = [ # "24 oz", # "skechers+skechers+skechers+skechers+ skechers+ skechers 4runner rtx 3060 ti", # "the shark® ai robot vacmop™ rv2002wd", # "stella & chewy’s carnivore cravings purrfect pate cans – grain free, protein rich wet cat food (case of 24)" # ] # for s in arr: # print(split_word(s))