import os import sys import re sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from utils.templates import Templates from pyspark.sql import functions as F from pyspark.sql.types import IntegerType from textblob import Word class DwdTitleMatchingDegree(Templates): def __init__(self, site_name='us', date_type="month", date_info='2023-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dwd_title_matching_degree' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.reset_partitions(partitions_num=250) self.partitions_by = ['site_name', 'date_type', 'date_info'] self.df_asin_title = self.spark.sql(f"select 1+1;") self.df_asin_search_term = self.spark.sql(f"select 1+1;") self.df_joined = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") self.u_check_contains = self.spark.udf.register('u_check_contains', self.check_contains, IntegerType()) @staticmethod def check_contains(title, search_term): if search_term is None or search_term == "": return 0 title = title.lower() # search_term = search_term.replace('(', ' ').replace(')', ' ').replace('*', ' ').lower() regex_symbols = r'[*+?|(){}\[\]\\]' search_term = re.sub(regex_symbols, ' ', search_term).lower() pattern_str = "" search_term_split = [word for word in search_term.split(" ") if word.strip()] for i in range(len(search_term_split)): # 复数还原单数 search_term_change = Word(search_term_split[i]).lemmatize("n") if search_term_split[i][-1] == 's': # s结尾的词,所有格识别 search_term_i = search_term_split[i] search_term_without_s = search_term_i[:-1] term = r"(?:" \ + search_term_split[i] + "|" \ + search_term_change + "|" \ + search_term_without_s + "(?:'s)" \ + r")" else: term = r"(?:" \ + search_term_split[i] + "|" \ + search_term_change \ + r")" # 拼接完整的 pattern_str if i < len(search_term_split) - 1: pattern_str = pattern_str + term + " *" else: pattern_str = pattern_str + term pattern_str = r"\b(" + pattern_str + r")\b" matches = re.findall(pattern_str, title) if matches: return 1 else: return 0 def read_data(self): # 读取dim_asin_detail的asin标题 sql = f""" select asin, asin_title from dim_asin_detail where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ print(sql) self.df_asin_title = self.spark.sql(sqlQuery=sql).cache() # 读取dim_st_asin_info的asin的搜索词 sql = f""" select asin, search_term from dim_st_asin_info where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' group by asin,search_term; """ print(sql) self.df_asin_search_term = self.spark.sql(sqlQuery=sql).cache() def handle_data(self): hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}" print(f"清除hdfs目录中.....{hdfs_path}") HdfsUtils.delete_hdfs_file(hdfs_path) # 将df_asin_title与df_asin_search_term进行关联 self.df_asin_title = self.df_asin_title.filter(self.df_asin_title["asin_title"].isNotNull()) self.df_joined = self.df_asin_title.join(self.df_asin_search_term, 'asin', 'inner') # 调用udf u_check_contains self.df_joined = self.df_joined.withColumn("contains_flag", self.u_check_contains(F.col("asin_title"), F.col("search_term"))) self.df_joined = self.df_joined.withColumn("site_name", F.lit(self.site_name)) self.df_joined = self.df_joined.withColumn("date_type", F.lit(self.date_type)) self.df_joined = self.df_joined.withColumn("date_info", F.lit(self.date_info)) self.df_save = self.df_joined if __name__ == '__main__': site_name = sys.argv[1] date_type = sys.argv[2] date_info = sys.argv[3] handle_obj = DwdTitleMatchingDegree(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()