1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import os
import re
import sys
import pandas as pd
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType, IntegerType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from textblob import Word
class DwtAbaAnalyticsTemp(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dwt_aba_analytics_temp'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.df_st = self.spark.sql(f"select 1+1;")
self.df_st_dup = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.u_words_pattern = F.udf(self.udf_words_pattern, StringType())
# 分区参数
self.partitions_by = ['site_name']
self.partitions_num = 20
@staticmethod
def udf_words_pattern(search_term, words_list_str):
words_list = []
for words in eval(words_list_str):
if "@@" in words:
words = words.replace("@@", "")
w_list = words.split("and")
w_len = len(w_list)
num = 0
for w in w_list:
if w in search_term:
num += 1
if w_len == num:
words_list.append(words.strip())
else:
if words in search_term:
words_list.append(words.strip())
if words_list:
return ';'.join(set(words_list))
else:
return None
def read_data(self):
sql = f"select search_term, rank, bsr_orders, date_info from dwt_aba_st_analytics where site_name='{self.site_name}' and date_type='month' and date_info between '2022-07' and '2023-06';"
self.df_st = self.spark.sql(sql).cache()
self.df_st_dup = self.df_st.drop_duplicates(['search_term']).select('search_term')
# print(self.df_st.count(), self.df_st_dup.count()) # 21536435 4703181
self.df_st.show()
sql = f"select market_type, search_words, search_words_ch from ods_theme_aba where site_name='{self.site_name}';"
self.df_words = self.spark.sql(sql).cache()
self.df_words = self.df_words.withColumn("search_words_lower", F.lower("search_words"))
self.df_words.show()
df_words = self.df_words.toPandas()
# self.df_words = pd.read_excel(f"/mnt/data/img_data/aba_data_2023-07-25.xlsx", sheet_name='Sheet2')
df_words['search_words_lower'] = df_words.search_words.apply(lambda x: x.lower())
# 颜色主题加上@@符号标记
df_words_1 = df_words.loc[df_words.market_type=='颜色']
df_words_1.search_words_lower = df_words_1.search_words_lower.apply(lambda x: x+"@@")
df_words_2 = df_words.loc[df_words.market_type!='颜色']
words_list1 = list(set(df_words_1.search_words_lower))
words_list2 = list(set(df_words_2.search_words_lower))
words_list1.extend(words_list2)
# words_list = list(set(df_words.search_words_lower))
self.words_list_str = str([f" {words} " for words in words_list1])
# print("self.words_list_str:", self.words_list_str)
# # 根据'search_words_ch'分组,并将'search_words'列的多个值合并成一行
# df_result = self.df_words.groupby(['market_type', 'search_words_ch'])['search_words'].apply(';'.join).reset_index()
# self.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# self.df_words = self.spark.createDataFrame(self.df_words)
def handle_data(self):
self.df_st_dup = self.df_st_dup.withColumn("search_term_lower", F.lower(self.df_st_dup["search_term"]))
self.df_st_dup = self.df_st_dup.withColumn("search_term_lower", F.concat(F.lit(" "), "search_term_lower", F.lit(" "))) # 标题两头加空字符串用来匹配整个词
# self.df_st_dup.filter("search_term_lower like '% dark grey %'").show()
# self.df_st_dup.show()
# 匹配
self.df_st_dup = self.df_st_dup.withColumn("words_en_lower", self.u_words_pattern('search_term_lower', F.lit(self.words_list_str)))
# 将列拆分为数组多列
self.df_st_dup = self.df_st_dup.withColumn("words_en_lower", F.split(self.df_st_dup["words_en_lower"], ";"))
# 将数组合并到多行
self.df_st_dup = self.df_st_dup.withColumn("search_words_lower", F.explode(self.df_st_dup["words_en_lower"]))
# self.df_st_dup.show(30, truncate=False)
self.df_st_dup = self.df_st_dup.join(
self.df_words, on=['search_words_lower'], how='left' # 改成inner, 这样避免正则匹配结果不准
)
# self.df_st_dup.show(30, truncate=False)
# self.df_st_dup.filter("search_term_lower like '% dark grey %'").show()
# quit()
# self.df_st_dup.filter("search_term_lower='mini electric chopper'").show(30, truncate=False)
# df_st_dup = self.df_st_dup.toPandas()
# df_st_dup = df_st_dup.merge(
# self.df_words, on=['search_words_lower'], how='left'
# )
# print(df_st_dup.head(20))
self.df_st = self.df_st.join(
self.df_st_dup, on='search_term', how='left'
)
self.df_st.show(30, truncate=False)
self.df_st.filter("search_term_lower like '% dark grey %'").show()
self.df_save = self.df_st.groupBy(['search_words_ch']).pivot("date_info").agg(
F.min("rank"),
F.count("search_term"),
F.sum("bsr_orders")
)
# self.df_save.show(30, truncate=False)
self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
df_save = self.df_save.toPandas()
df_save.to_csv("/root/theme.csv", index=False)
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
handle_obj = DwtAbaAnalyticsTemp(site_name=site_name)
handle_obj.run()