1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import os
import sys
import re
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from textblob import Word
class DwdTitleMatchingDegree(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2023-01'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dwd_title_matching_degree'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.reset_partitions(partitions_num=250)
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.df_asin_title = self.spark.sql(f"select 1+1;")
self.df_asin_search_term = self.spark.sql(f"select 1+1;")
self.df_joined = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.u_check_contains = self.spark.udf.register('u_check_contains', self.check_contains, IntegerType())
@staticmethod
def check_contains(title, search_term):
if search_term is None or search_term == "":
return 0
title = title.lower()
# search_term = search_term.replace('(', ' ').replace(')', ' ').replace('*', ' ').lower()
regex_symbols = r'[*+?|(){}\[\]\\]'
search_term = re.sub(regex_symbols, ' ', search_term).lower()
pattern_str = ""
search_term_split = [word for word in search_term.split(" ") if word.strip()]
for i in range(len(search_term_split)):
# 复数还原单数
search_term_change = Word(search_term_split[i]).lemmatize("n")
if search_term_split[i][-1] == 's':
# s结尾的词,所有格识别
search_term_i = search_term_split[i]
search_term_without_s = search_term_i[:-1]
term = r"(?:" \
+ search_term_split[i] + "|" \
+ search_term_change + "|" \
+ search_term_without_s + "(?:'s)" \
+ r")"
else:
term = r"(?:" \
+ search_term_split[i] + "|" \
+ search_term_change \
+ r")"
# 拼接完整的 pattern_str
if i < len(search_term_split) - 1:
pattern_str = pattern_str + term + " *"
else:
pattern_str = pattern_str + term
pattern_str = r"\b(" + pattern_str + r")\b"
matches = re.findall(pattern_str, title)
if matches:
return 1
else:
return 0
def read_data(self):
# 读取dim_asin_detail的asin标题
sql = f"""
select
asin,
asin_title
from
dim_asin_detail
where
site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}';
"""
print(sql)
self.df_asin_title = self.spark.sql(sqlQuery=sql).cache()
# 读取dim_st_asin_info的asin的搜索词
sql = f"""
select
asin,
search_term
from
dim_st_asin_info
where
site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
group by asin,search_term;
"""
print(sql)
self.df_asin_search_term = self.spark.sql(sqlQuery=sql).cache()
def handle_data(self):
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
# 将df_asin_title与df_asin_search_term进行关联
self.df_asin_title = self.df_asin_title.filter(self.df_asin_title["asin_title"].isNotNull())
self.df_joined = self.df_asin_title.join(self.df_asin_search_term, 'asin', 'inner')
# 调用udf u_check_contains
self.df_joined = self.df_joined.withColumn("contains_flag",
self.u_check_contains(F.col("asin_title"), F.col("search_term")))
self.df_joined = self.df_joined.withColumn("site_name", F.lit(self.site_name))
self.df_joined = self.df_joined.withColumn("date_type", F.lit(self.date_type))
self.df_joined = self.df_joined.withColumn("date_info", F.lit(self.date_info))
self.df_save = self.df_joined
if __name__ == '__main__':
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
handle_obj = DwdTitleMatchingDegree(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()