dwd_title_matching_degree.py 4.95 KB
import os
import sys
import re

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from textblob import Word

class DwdTitleMatchingDegree(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2023-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dwd_title_matching_degree'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.reset_partitions(partitions_num=250)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.df_asin_title = self.spark.sql(f"select 1+1;")
        self.df_asin_search_term = self.spark.sql(f"select 1+1;")
        self.df_joined = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.u_check_contains = self.spark.udf.register('u_check_contains', self.check_contains, IntegerType())

    @staticmethod
    def check_contains(title, search_term):
        if search_term is None or search_term == "":
            return 0
        title = title.lower()
        # search_term = search_term.replace('(', ' ').replace(')', ' ').replace('*', ' ').lower()
        regex_symbols = r'[*+?|(){}\[\]\\]'
        search_term = re.sub(regex_symbols, ' ', search_term).lower()
        pattern_str = ""
        search_term_split = [word for word in search_term.split(" ") if word.strip()]
        for i in range(len(search_term_split)):
            # 复数还原单数
            search_term_change = Word(search_term_split[i]).lemmatize("n")

            if search_term_split[i][-1] == 's':
                # s结尾的词,所有格识别
                search_term_i = search_term_split[i]
                search_term_without_s = search_term_i[:-1]
                term = r"(?:" \
                       + search_term_split[i] + "|" \
                       + search_term_change + "|" \
                       + search_term_without_s + "(?:'s)" \
                       + r")"
            else:
                term = r"(?:" \
                       + search_term_split[i] + "|" \
                       + search_term_change \
                       + r")"
            # 拼接完整的 pattern_str
            if i < len(search_term_split) - 1:
                pattern_str = pattern_str + term + " *"
            else:
                pattern_str = pattern_str + term
        pattern_str = r"\b(" + pattern_str + r")\b"
        matches = re.findall(pattern_str, title)
        if matches:
            return 1
        else:
            return 0

    def read_data(self):
        # 读取dim_asin_detail的asin标题
        sql = f"""
        select 
            asin,
            asin_title 
        from 
            dim_asin_detail
        where
            site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}';
        """
        print(sql)
        self.df_asin_title = self.spark.sql(sqlQuery=sql).cache()

        # 读取dim_st_asin_info的asin的搜索词
        sql = f"""
        select 
            asin,
            search_term 
        from 
            dim_st_asin_info
        where
            site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}'
        group by asin,search_term;
        """
        print(sql)
        self.df_asin_search_term = self.spark.sql(sqlQuery=sql).cache()

    def handle_data(self):
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)

        # 将df_asin_title与df_asin_search_term进行关联
        self.df_asin_title = self.df_asin_title.filter(self.df_asin_title["asin_title"].isNotNull())
        self.df_joined = self.df_asin_title.join(self.df_asin_search_term, 'asin', 'inner')

        # 调用udf u_check_contains
        self.df_joined = self.df_joined.withColumn("contains_flag",
                                                   self.u_check_contains(F.col("asin_title"), F.col("search_term")))

        self.df_joined = self.df_joined.withColumn("site_name", F.lit(self.site_name))
        self.df_joined = self.df_joined.withColumn("date_type", F.lit(self.date_type))
        self.df_joined = self.df_joined.withColumn("date_info", F.lit(self.date_info))
        self.df_save = self.df_joined

if __name__ == '__main__':
    site_name = sys.argv[1]
    date_type = sys.argv[2]
    date_info = sys.argv[3]
    handle_obj = DwdTitleMatchingDegree(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()