1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os
import sys
import re
import numpy as np
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, FloatType, StructType, StructField
class DwtThemeBsOrders(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2022-1'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dwt_theme_bs_orders'
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.df_theme = self.spark.sql(f"select 1+1;")
self.df_flow = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name']
self.partitions_num = 10
# 注册自定义函数 (UDF)
self.u_theme_pattern = F.udf(self.udf_theme_pattern, StringType())
@staticmethod
def udf_theme_pattern(title, theme_list_str):
found_themes = [theme.strip() for theme in eval(theme_list_str) if theme in title]
if found_themes:
return ','.join(set(found_themes))
else:
return None
def read_data(self):
# sql = f"select asin, title as asin_title, bsr_orders, dt as date_info from selection_off_line.dwt_asin_month where site='{self.site_name}' and " \
# f"dt in ('2022_7', '2022_8', '2022_9', '2022_10', '2022_11', '2022_12') " \
# f"union all " \
# f"select asin, asin_title, bsr_orders, date_info from dwt_flow_asin where site_name='{self.site_name}' and " \
# f"date_type='month' and date_info in ('2023-01', '2023-02', '2023-03', '2023-04', '2023-05', '2023-06');"
sql = f"select asin, asin_title, bsr_orders, date_info from dwt_flow_asin where site_name='{self.site_name}' and " \
f"date_type='month' and date_info >= '2023-01' and date_info <= '2023-12';"
# f"date_type='month' and date_info >= '2023-01' and date_info <= '2023-01' limit 1000000;"
print("sql:", sql)
self.df_flow = self.spark.sql(sql).cache()
self.df_flow.show(10, truncate=False)
sql = f"select id as theme_id, theme_type_en, theme_en, theme_en_lower, theme_ch from ods_theme where site_name='{self.site_name}'"
print("sql:", sql)
self.df_theme = self.spark.sql(sql).cache()
self.df_theme.show(10, truncate=False)
def handle_data_new2(self):
# 将主题列表转换为一个单列的DataFrame
pdf_theme = self.df_theme.toPandas()
theme_list = list(set(pdf_theme['theme_en_lower']))
# 创建一个包含所有主题的正则表达式模式
theme_pattern = '|'.join([f"\\b{theme}\\b" for theme in theme_list])
print(f"theme_pattern: {len(theme_list), theme_pattern[:100]}")
# 准备标题数据,转换为小写
self.df_flow = self.df_flow.withColumn("asin_title_lower", F.lower(F.col("asin_title")))
# 使用正则表达式在标题中查找主题
self.df_flow = self.df_flow.withColumn("matched_theme", F.regexp_extract(F.col("asin_title_lower"), theme_pattern, 0))
# 过滤出未找到匹配主题的行
self.df_flow = self.df_flow.filter(F.col("matched_theme") != "")
# 将找到的主题与主题DataFrame进行关联
self.df_flow = self.df_flow.join(self.df_theme, self.df_flow['matched_theme'] == self.df_theme['theme_en_lower'], 'inner')
# 去除重复项
self.df_flow = self.df_flow.dropDuplicates(['asin', 'date_info', 'theme_ch'])
# 按主题和日期对数据进行分组并聚合
# self.df_save = self.df_flow.groupBy("theme_en_lower", "date_info").agg(
# F.sum("bsr_orders").alias("total_bsr_orders"),
# F.count("asin").alias("total_asins")
# )
self.df_save = self.df_flow.groupBy("theme_ch").pivot("date_info").agg(
F.sum("bsr_orders"), F.count("asin")
)
# 添加站点名称列
self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))
# 缓存最终结果以提高性能
self.df_save = self.df_save.cache()
# 展示最终结果
self.df_save.show(50, truncate=False)
df = self.df_save.toPandas()
df.to_csv("/root/theme_new.csv", index=False)
def handle_data_new1(self):
# 将主题列表转换为DataFrame
pdf_theme = self.df_theme.toPandas()
theme_list = list(set(pdf_theme['theme_en_lower']))
df_themes = self.spark.createDataFrame([(theme,) for theme in theme_list], ['theme'])
# 准备标题数据,加上空格并转换为小写
self.df_flow = self.df_flow.withColumn("asin_title_lower",
F.lower(F.concat(F.lit(" "), F.col("asin_title"), F.lit(" "))))
# 为每个主题创建一个标志列,表示标题是否包含该主题
for theme in theme_list:
self.df_flow = self.df_flow.withColumn(f"theme_{theme}",
F.when(F.col("asin_title_lower").contains(f" {theme} "),
1).otherwise(0))
# 将主题DataFrame与流DataFrame关联,只保留匹配的行
for theme in theme_list:
self.df_flow = self.df_flow.join(df_themes, self.df_flow[f"theme_{theme}"] == 1, 'left_outer').drop(
f"theme_{theme}")
# 过滤出包含至少一个主题的记录
self.df_flow = self.df_flow.filter(self.df_flow['theme'].isNotNull())
# 将主题DataFrame中的其他列(如果有的话)添加到流DataFrame中
self.df_flow = self.df_flow.join(self.df_theme, self.df_flow['theme'] == self.df_theme['theme_en_lower'],
'inner')
# 去除重复项
self.df_flow = self.df_flow.dropDuplicates(['asin', 'date_info', 'theme'])
# 按主题和日期对数据进行分组并聚合
self.df_save = self.df_flow.groupBy("theme", "date_info").agg(
F.sum("bsr_orders").alias("total_bsr_orders"),
F.count("asin").alias("total_asins")
)
# 添加站点名称列
self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))
# 缓存最终结果以提高性能
self.df_save = self.df_save.cache()
# 展示最终结果
self.df_save.show(50, truncate=False)
def handle_data(self):
pdf_theme = self.df_theme.toPandas()
theme_list = list(set(pdf_theme.theme_en_lower))
self.theme_list_str = str([f" {theme} " for theme in theme_list])
print("self.theme_list_str:", self.theme_list_str)
# 小写
self.df_flow = self.df_flow.withColumn("asin_title_lower", F.lower(self.df_flow["asin_title"]))
# 过滤空值
self.df_flow = self.df_flow.filter("asin_title_lower is not null")
# 过滤null和none字符串
self.df_flow = self.df_flow.filter("asin_title_lower not in ('none', 'null', 'nan')")
self.df_flow = self.df_flow.withColumn("asin_title_lower", F.concat(F.lit(" "), "asin_title_lower", F.lit(" "))) # 标题两头加空字符串用来匹配整个词
self.df_flow = self.df_flow.withColumn("theme_en_lower", self.u_theme_pattern('asin_title_lower', F.lit(self.theme_list_str)))
# 将列拆分为数组多列
self.df_flow = self.df_flow.withColumn("theme_en_lower", F.split(self.df_flow["theme_en_lower"], ","))
# 将数组合并到多行
self.df_flow = self.df_flow.withColumn("theme_en_lower", F.explode(self.df_flow["theme_en_lower"]))
self.df_flow = self.df_flow.join(
self.df_theme, on=['theme_en_lower'], how='left' # 改成inner, 这样避免正则匹配结果不准
)
# self.df_flow.show(50, truncate=False)
# self.df_flow = self.df_flow.filter("bsr_orders >0")
# self.df_theme = self.df_theme.drop_duplicates(['asin', 'theme_ch'])
self.df_flow = self.df_flow.drop_duplicates(['asin', 'date_info', 'theme_ch'])
self.df_save = self.df_flow
# self.df_save = self.df_flow.join(
# self.df_theme, on='asin', how='inner'
# )
# self.df_save.show(30, truncate=False)
# pivot_df1 = self.df_asin_title.groupBy("asin").pivot("theme_type_en_counts").agg(
# F.expr("IFNULL(count(*), 0) AS value"))
self.df_save = self.df_save.groupBy("theme_ch").pivot("date_info").agg(
F.sum("bsr_orders"), F.count("asin")
)
# self.df_save.show(50, truncate=False)
self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))
self.df_save = self.df_save.cache()
self.df_save.show(50, truncate=False)
df = self.df_save.toPandas()
df.to_csv("/root/theme_new_2023.csv", index=False)
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
handle_obj = DwtThemeBsOrders(site_name=site_name)
handle_obj.run()