dwd_title_matching_degree.py 4.95 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
import os
import sys
import re

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from textblob import Word

class DwdTitleMatchingDegree(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2023-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dwd_title_matching_degree'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.reset_partitions(partitions_num=250)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.df_asin_title = self.spark.sql(f"select 1+1;")
        self.df_asin_search_term = self.spark.sql(f"select 1+1;")
        self.df_joined = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.u_check_contains = self.spark.udf.register('u_check_contains', self.check_contains, IntegerType())

    @staticmethod
    def check_contains(title, search_term):
        if search_term is None or search_term == "":
            return 0
        title = title.lower()
        # search_term = search_term.replace('(', ' ').replace(')', ' ').replace('*', ' ').lower()
        regex_symbols = r'[*+?|(){}\[\]\\]'
        search_term = re.sub(regex_symbols, ' ', search_term).lower()
        pattern_str = ""
        search_term_split = [word for word in search_term.split(" ") if word.strip()]
        for i in range(len(search_term_split)):
            # 复数还原单数
            search_term_change = Word(search_term_split[i]).lemmatize("n")

            if search_term_split[i][-1] == 's':
                # s结尾的词,所有格识别
                search_term_i = search_term_split[i]
                search_term_without_s = search_term_i[:-1]
                term = r"(?:" \
                       + search_term_split[i] + "|" \
                       + search_term_change + "|" \
                       + search_term_without_s + "(?:'s)" \
                       + r")"
            else:
                term = r"(?:" \
                       + search_term_split[i] + "|" \
                       + search_term_change \
                       + r")"
            # 拼接完整的 pattern_str
            if i < len(search_term_split) - 1:
                pattern_str = pattern_str + term + " *"
            else:
                pattern_str = pattern_str + term
        pattern_str = r"\b(" + pattern_str + r")\b"
        matches = re.findall(pattern_str, title)
        if matches:
            return 1
        else:
            return 0

    def read_data(self):
        # 读取dim_asin_detail的asin标题
        sql = f"""
        select 
            asin,
            asin_title 
        from 
            dim_asin_detail
        where
            site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}';
        """
        print(sql)
        self.df_asin_title = self.spark.sql(sqlQuery=sql).cache()

        # 读取dim_st_asin_info的asin的搜索词
        sql = f"""
        select 
            asin,
            search_term 
        from 
            dim_st_asin_info
        where
            site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}'
        group by asin,search_term;
        """
        print(sql)
        self.df_asin_search_term = self.spark.sql(sqlQuery=sql).cache()

    def handle_data(self):
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)

        # 将df_asin_title与df_asin_search_term进行关联
        self.df_asin_title = self.df_asin_title.filter(self.df_asin_title["asin_title"].isNotNull())
        self.df_joined = self.df_asin_title.join(self.df_asin_search_term, 'asin', 'inner')

        # 调用udf u_check_contains
        self.df_joined = self.df_joined.withColumn("contains_flag",
                                                   self.u_check_contains(F.col("asin_title"), F.col("search_term")))

        self.df_joined = self.df_joined.withColumn("site_name", F.lit(self.site_name))
        self.df_joined = self.df_joined.withColumn("date_type", F.lit(self.date_type))
        self.df_joined = self.df_joined.withColumn("date_info", F.lit(self.date_info))
        self.df_save = self.df_joined

if __name__ == '__main__':
    site_name = sys.argv[1]
    date_type = sys.argv[2]
    date_info = sys.argv[3]
    handle_obj = DwdTitleMatchingDegree(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()