1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
import os
import sys
from collections import Counter
import inflect
import re
from textblob import TextBlob
from textblob import Word
sys.path.append(os.path.dirname(sys.path[0]))
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, MapType, IntegerType, StructType, StructField, DoubleType
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from pyspark.sql import functions as F
from yswg_utils.common_udf import udf_ele_mattch
import numpy as np
class DwtStThemeAgg(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.hive_tb = "dwt_st_theme_agg"
# self.hive_tb = "tmp_st_theme_agg"
self.partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
# self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
# 创建spark_session对象相关
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
# 注册自定义函数 (UDF)
self.u_theme_pattern = F.udf(udf_ele_mattch, StringType())
self.u_theme_contain_judge = F.udf(self.udf_theme_contain_judge, IntegerType())
self.u_judge_twin_words = F.udf(self.udf_judge_twin_words, IntegerType())
self.u_filter_sec_pattern_words = F.udf(self.udf_filter_sec_pattern_words, IntegerType())
# self.u_split_words = F.udf(self.udf_split_words, StringType())
# 全局df初始化
self.df_st_base = self.spark.sql(f"select 1+1;")
self.df_base_filter_date = self.spark.sql(f"select 1+1;")
self.df_pattern_words_base = self.spark.sql(f"select 1+1;")
self.df_sec_words = self.spark.sql(f"select 1+1;")
self.df_third_words = self.spark.sql(f"select 1+1;")
self.df_theme = self.spark.sql(f"select 1+1;")
self.df_st_theme = self.spark.sql(f"select 1+1;")
self.df_st_theme_base = self.spark.sql(f"select 1+1;")
self.df_st_theme_vertical = self.spark.sql(f"select 1+1;")
self.df_st_filter = self.spark.sql(f"select 1+1;")
self.df_pattern_st_agg = self.spark.sql(f"select 1+1;")
self.df_pattern_st_words = self.spark.sql(
f"select null as pattern_st,id as st_key,search_term,bsr_orders from dwt_aba_st_analytics limit 0;")
self.combine_df = self.spark.sql(
f"select id as st_key,search_term,bsr_orders,'' as pattern_st from dwt_aba_st_analytics limit 0;")
self.df_st_theme_agg = self.spark.sql(f"select 1+1;")
self.df_st_topic_base = self.spark.sql(f"select 1+1;")
self.df_st_match_topic_detail = self.spark.sql(f"select 1+1;")
self.df_st_match_topic_agg = self.spark.sql(f"select 1+1;")
self.df_match_brand = self.spark.sql(f"select 1+1;")
self.df_match_blacklist = self.spark.sql(f"select 1+1;")
# 其他变量
self.brand_pattern = str() # 正则匹配
self.theme_list_str = str() # 正则匹配
self.st_word_list = []
@staticmethod
def udf_unionAll(*dfs):
return reduce(DataFrame.unionAll, dfs)
@staticmethod
def udf_theme_contain_judge(pattern_word, pattern_list):
count = sum(1 for word in pattern_list if pattern_word in word)
# 如果匹配到的pattern_word大于1则说明有已经匹配过的单词
return 0 if count > 1 else 1
@staticmethod
def udf_inflect_word():
p = inflect.engine()
def udf_split_words(st_word):
words = st_word.split(" ")
word_list = []
for word in words:
word_list.append(word)
word_list.append(p.plural(word))
word_list.sort()
return ','.join(set(word_list))
return F.udf(udf_split_words, StringType())
@staticmethod
def udf_word_restoration():
def udf_restoration_words(st_word):
word_list = []
blob = TextBlob(st_word)
for word in blob.words:
word_list.append(word.lemmatize())
word_list.sort()
return ','.join(set(word_list))
return F.udf(udf_restoration_words, StringType())
@staticmethod
def udf_judge_twin_words(st_word):
words = st_word.split(" ")
judge_flag = 0
# 先判断是否完全一致的同属性叠词,如gun gun;这种用户不需要
if len(set(words)) == 1:
judge_flag = 1
return judge_flag
@staticmethod
def udf_theme_regex(pattern):
def udf_theme_pattern(match_text):
ele_list = re.findall(pattern, match_text)
if ele_list:
return ','.join(set(ele_list))
else:
return None
return F.udf(udf_theme_pattern, StringType())
@staticmethod
def st_word_filter_condition(search_term):
base_keywords = search_term.split(" ")
# 生成排序后的单数和复数列表
singular_plural_list = []
for word in base_keywords:
sort_list = [word, CommonUtil.convert_singular_plural(word)]
sort_list.sort()
singular_plural_word = "|".join(sort_list)
singular_plural_list.append(singular_plural_word)
# 统计排序后的单数和复数的词频
condition_counts = Counter(singular_plural_list)
# 构建重复词和唯一词的条件
result_condition = ""
condition_list = []
for word, count in condition_counts.items():
word_condition = f"search_term rlike '\\\\b({word})\\\\b"
word_condition += "".join([f".*\\\\b({word})\\\\b" for _ in range(count - 1)]) + "'"
if count > 1:
condition_list.append(word_condition)
else:
condition_list.append(word_condition)
if condition_list:
result_condition = (" and ").join(condition_list)
return result_condition
@staticmethod
def filter_blacklist_words(blacklist):
# 获取二三级词包含词黑名单
pd_contain_list = blacklist.loc[blacklist.specical_match_type == '1']
contain_word_list = list(pd_contain_list.st_blacklist_word_lower)
contain_word_pattern = re.compile(
r'(?<!\+|\*|\-|\%|\.)\b({})\b'.format('|'.join([re.escape(x) for x in contain_word_list])),
flags=re.IGNORECASE)
# 获取二三级词搜索词黑名单
pd_st_blacklist = blacklist.loc[blacklist.specical_match_type == '2']
st_blacklist = list(pd_st_blacklist.st_blacklist_word_lower)
def udf_filter_blacklist(st_word):
# 包含词匹配
black_flag = 0
pattern_flag = re.search(contain_word_pattern, st_word)
if pattern_flag:
black_flag = 1
# 黑名单词匹配--此处不能使用二元表达,重新置为 0
if black_flag == 0:
if st_word in st_blacklist:
black_flag = 1
return black_flag
return F.udf(udf_filter_blacklist, IntegerType())
@staticmethod
def udf_filter_sec_pattern_words(st_word, pattern_list):
# 标记一些特殊情况指定的二级词,方便后期过滤
filter_flag = 0
theme_list = ['combination', 'size']
if pattern_list:
if any(theme in str(pattern_list) for theme in theme_list):
# 说明匹配到了组合和size两种匹配词,则需要给标记
return 1
# 进行单项 数字+month/months的所有二级词 和 数字连接t+ boys/girls的二级词特殊匹配
date_pattern = re.compile(r"(\d+(?:\.\d+)?) +(month|months)\b", flags=re.IGNORECASE)
numt_pattern = re.compile("r'((?:\d+)t)(?: +)(boys|girls|boy|girl)\b'", flags=re.IGNORECASE)
for_pattern = re.compile(r"\bfor\b", flags=re.IGNORECASE)
if re.search(date_pattern, st_word):
return 1
if re.search(numt_pattern, st_word):
return 1
if re.search(for_pattern, st_word):
return 1
return filter_flag
def read_data(self):
print("======================查询sql如下======================")
# 获取搜索词基础数据
sql = f"""
select
id as st_key,
lower(search_term) search_term,
bsr_orders
from dwt_aba_st_analytics
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and st_bsr_cate_1_id_new is not null
and st_bsr_cate_1_id_new not in ("audible", "books","digital-text","dmusic","mobile-apps","movies-tv","music","software","videogames")
"""
print(sql)
self.df_st_base = self.spark.sql(sql).cache()
# self.df_st_base.show(10, truncate=False)
# 获取搜索的二级词和三级词原始过滤数据
sql = f"""
select search_term,st_word_num,rank,st_brand_label from (
select search_term,
regexp_replace(search_term,' ','') as search_term_without_space,
st_word_num,
rank,
st_movie_label,
st_brand_label
from dwt_aba_st_analytics
where site_name = '{site_name}'
and date_type = '{date_type}'
and date_info = '{date_info}'
and st_bsr_cate_1_id_new is not null
and st_bsr_cate_1_id_new not in
("audible", "books", "digital-text", "dmusic", "mobile-apps", "movies-tv", "music", "software",
"videogames")
and st_word_num <= 3
and st_word_num >= 2
and st_movie_label < 3
and st_brand_label <= 1
) t1
where search_term_without_space rlike '^[0-9a-zA-Z]*$'
"""
self.df_pattern_words_base = self.spark.sql(sql)
# 提前处理给叠词打上标签,并对不需要的叠词直接过滤
self.df_pattern_words_base = self.df_pattern_words_base.withColumn('twin_words_flag',self.u_judge_twin_words(F.col('search_term')))
self.df_pattern_words_base = self.df_pattern_words_base.filter(" twin_words_flag == 0").cache()
sql = f"""
select
st_key,
search_term,
theme_ch,
theme_en,
theme_label_ch,
theme_label_en,
pattern_type,
theme_label_num_info,
theme_label_unit_info
from big_data_selection.dws_st_theme
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_st_theme_base = self.spark.sql(sql).cache()
# 获取主题词
sql = f"""
select
search_term,
concat_ws(",",collect_list(theme_label_en)) as pattern_list
from big_data_selection.dws_st_theme
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
group by st_key,search_term
"""
self.df_theme = self.spark.sql(sql).cache()
# sql获取最终品牌词匹配需保留得品牌词库
pg_sql = f"""
select lower(trim(character_name)) as st_brand_name_lower
from match_character_dict where match_type = '二三级词专用品牌词库'
"""
conn_info = DBUtil.get_connection_info("mysql", "us")
self.df_match_brand = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=pg_sql
)
pdf_match_brand = self.df_match_brand.toPandas()
match_brand = list(set(pdf_match_brand.st_brand_name_lower))
self.brand_pattern = re.compile(r'(?<!\+|\*|\-|\%|\.)\b({})\b'.format('|'.join([re.escape(x) for x in match_brand])),
flags=re.IGNORECASE)
# sql获取二三级词黑名单库
pg_sql = f"""
select lower(trim(character_name)) as st_blacklist_word_lower,specical_match_type
from match_character_dict where match_type = '二三级词匹配黑名单'
"""
conn_info = DBUtil.get_connection_info("mysql", "us")
self.df_match_blacklist = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=pg_sql
)
def handle_data(self):
self.read_data()
self.handle_base_pattern_data()
self.handle_sec_st()
self.handle_third_st()
self.handle_st_filter_table()
self.handle_st_pattern_common_agg()
self.handle_st_pattern_special_agg()
self.save_data()
# 处理二级词和三级词的通用逻辑
def handle_base_pattern_data(self):
# 用于处理二级词和三级词条件一致的逻辑
self.df_base_filter_date = self.df_pattern_words_base
self.df_base_filter_date = self.df_base_filter_date.withColumn('similar_word_list',
self.udf_inflect_word()(F.col('search_term')))
similar_words_window = Window.partitionBy(["similar_word_list"]).orderBy(
self.df_base_filter_date.rank.asc_nulls_last()
)
self.df_base_filter_date = self.df_base_filter_date.withColumn('row_num',
F.row_number().over(window=similar_words_window))
# CommonUtil.df_export_csv(self.spark, self.df_sec_words, 'export_sec_words_2023_10_26_detail', 100 * 10000)
self.df_base_filter_date = self.df_base_filter_date.filter("row_num=1")
self.df_base_filter_date = self.df_base_filter_date.drop(*['similar_word_list', 'row_num'])
# 第二次过滤相似词 采用textblob词库词性还原方式过滤
self.df_base_filter_date = self.df_base_filter_date.withColumn('similar_word_list',
self.udf_word_restoration()(
F.col('search_term')))
similar_words_window = Window.partitionBy(["similar_word_list"]).orderBy(
self.df_base_filter_date.rank.asc_nulls_last()
)
self.df_base_filter_date = self.df_base_filter_date.withColumn('row_num',
F.row_number().over(window=similar_words_window))
# CommonUtil.df_export_csv(self.spark, self.df_sec_words, 'export_sec_words_2023_10_26_detail', 100 * 10000)
self.df_base_filter_date = self.df_base_filter_date.filter("row_num=1").cache()
df_without_brand_words = self.df_base_filter_date.filter("st_brand_label = 0")
# 单独处理品牌词内的数据逻辑
df_brand_words = self.df_base_filter_date.filter("st_brand_label = 1")
df_brand_words = df_brand_words.withColumn("brand_match_detail",
self.udf_theme_regex(self.brand_pattern)(
F.col("search_term")))
df_brand_words = df_brand_words.filter("brand_match_detail is not null")
df_brand_words = df_brand_words.drop('brand_match_detail')
# 将处理后的品牌词与非品牌词合并
self.df_base_filter_date = df_without_brand_words.unionByName(df_brand_words)
# 处理二三级词包含词的过滤逻辑和二三级黑名单词的过滤逻辑
pd_match_blacklist = self.df_match_blacklist.toPandas()
self.df_base_filter_date = self.df_base_filter_date.withColumn("st_blacklist_flag",
self.filter_blacklist_words(pd_match_blacklist)(
"search_term"))
# 取出非黑名单标记的数据
self.df_base_filter_date = self.df_base_filter_date.filter("st_blacklist_flag != 1")
# 处理二级词
def handle_sec_st(self):
self.df_sec_words = self.df_base_filter_date.filter("st_word_num = 2")
self.df_sec_words = self.df_sec_words.join(
self.df_theme, on=['search_term'], how='left'
)
self.df_sec_words = self.df_sec_words.withColumn("filter_flag",
self.u_filter_sec_pattern_words(F.col("search_term"),
F.col("pattern_list")))
# 过滤掉被标记为1的数据
self.df_sec_words = self.df_sec_words.filter("filter_flag != 1")
self.df_sec_words = self.df_sec_words.select("search_term").cache()
# CommonUtil.df_export_csv(self.spark, self.df_sec_words, 'export_sec_words_2023_11_30', 100 * 10000)
# 处理三级词
def handle_third_st(self):
self.df_third_words = self.df_base_filter_date.filter("st_word_num = 3")
self.df_third_words = self.df_third_words.join(
self.df_theme, on=['search_term'], how='left'
)
# 过滤匹配到功能词的三级词
self.df_third_words = self.df_third_words.filter("pattern_list is null")
self.df_third_words = self.df_third_words.select("search_term").cache()
def handle_st_filter_table(self):
df_st_filter_base = self.df_st_base.select(
F.col('st_key'),
F.col('search_term'),
F.col('bsr_orders'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
).coalesce(1).cache()
# 将二级词和三级词进行合并
pattern_words = self.df_sec_words.unionByName(self.df_third_words)
# 将数据转换成pandas_df
dict_df = pattern_words.toPandas()
# 提取二级词和是否叠词标签转换成list[dict{}]
self.st_word_list = dict_df.to_dict(orient='records')
# self.st_word_list = dict_df["search_term"].values.tolist()
row_size = 40000
batch_size = 200
# 落表路径校验
del_hdfs_path = CommonUtil.build_hdfs_path('tmp_pattern_st_info', partition_dict=self.partition_dict)
print(f"清除hdfs目录中:{del_hdfs_path}")
HdfsUtils.delete_file_in_folder(del_hdfs_path)
partition_by = ["site_name", "date_type", "date_info"]
word_batches = [self.st_word_list[i:i + row_size] for i in range(0, len(self.st_word_list), row_size)]
for word_batch in word_batches:
# for word_batch in word_batches[:1]:
df_list = [] # 用于存储 DataFrame
for row in word_batch:
# print(f"self.st_word_list.index(word):{self.st_word_list.index(word)}, word:{word}")
# 获取处理后的多级词
pattern_st = row["search_term"]
# 通过方法拆分,获取完全匹配的过滤条件
filter_condition = self.st_word_filter_condition(pattern_st)
filter_condition_expr = F.expr(filter_condition)
df_union_filter = df_st_filter_base.filter(filter_condition_expr)
df_union_filter = df_union_filter.withColumn("pattern_st", F.lit(pattern_st))
df_list.append(df_union_filter)
for i in range(0, len(df_list), batch_size):
print(f"当前是word_batches的轮回:f{word_batches.index(word_batch)},当前写入表的df索引位置:{i + 1}")
tmp_df = []
tmp_df = df_list[i:i + batch_size]
result_df = self.udf_unionAll(*tmp_df)
result_df = result_df.repartition(1)
result_df.write.saveAsTable(name='tmp_pattern_st_info', format='hive', mode='append', partitionBy=partition_by)
# print(f"test_df:{len(test_df)}")
sql = f"""
select
st_key,
search_term,
bsr_orders,
pattern_st
from big_data_selection.tmp_pattern_st_info
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_pattern_st_words = self.spark.sql(sql).cache()
self.df_pattern_st_words.show(20, truncate=False)
# print(f"combined_df:{combined_df.count()}")
self.df_pattern_st_words = self.df_pattern_st_words.cache()
# self.df_pattern_st_words.show(20, truncate=False)
# print("匹配后的表数据有:", self.df_pattern_st_words.count())
# 计算二级词下的总销量和匹配到的aba词个数
self.df_pattern_st_agg = self.df_pattern_st_words.groupBy(['pattern_st']).agg(
F.sum("bsr_orders").alias("pattern_bsr_orders_total"),
F.count("search_term").alias("pattern_st_count")
).cache()
def handle_st_pattern_common_agg(self):
# # 临时使用添加
# sql = f"""
# select
# st_key,
# search_term,
# bsr_orders,
# pattern_st
# from big_data_selection.tmp_pattern_st_info
# where site_name = '{self.site_name}'
# and date_type = '{self.date_type}'
# and date_info = '{self.date_info}'
# """
# self.df_pattern_st_words = self.spark.sql(sql).cache()
# self.df_pattern_st_words.show(20, truncate=False)
# # print(f"combined_df:{combined_df.count()}")
# self.df_pattern_st_words = self.df_pattern_st_words.cache()
# self.df_pattern_st_words.show(20, truncate=False)
# print("匹配后的表数据有:", self.df_pattern_st_words.count())
# 计算二级词下的总销量和匹配到的aba词个数
self.df_pattern_st_agg = self.df_pattern_st_words.groupBy(['pattern_st']).agg(
F.sum("bsr_orders").alias("pattern_bsr_orders_total"),
F.count("search_term").alias("pattern_st_count")
).cache()
# 将二级词匹配明细和主题功能词标签明细进行匹配;pattern_type=0的情况
df_common_st_theme = self.df_st_theme_base.filter("pattern_type = 0")
self.df_st_theme_agg = self.df_pattern_st_words.join(
df_common_st_theme, on=['st_key', 'search_term'], how='left'
)
# 那些搜索词匹配不到功能词需过滤掉
self.df_st_theme_agg = self.df_st_theme_agg.filter("theme_en is not null")
# 进行分组累加(按照匹配词中文进行累加,业务要求中文含义为准计数)
self.df_st_theme_agg = self.df_st_theme_agg.groupBy(['pattern_st', 'theme_label_ch', 'theme_en', 'theme_ch']).agg(
F.count("st_key").alias("theme_label_counts"),
F.sum("bsr_orders").alias("theme_label_bsr_orders"),
F.collect_set("theme_label_en").alias("theme_label_en_list")
)
# 转换成字符串拼接
self.df_st_theme_agg = self.df_st_theme_agg.withColumn('label_en_str',
F.concat_ws("/", F.col('theme_label_en_list')))
# 给pattern_st拼接pattern_st总的bsr_orders和st_count
self.df_st_theme_agg = self.df_st_theme_agg.join(
self.df_pattern_st_agg, on=['pattern_st'], how='left'
)
# 计算占比
self.df_st_theme_agg = self.df_st_theme_agg.withColumn('pattern_bsr_orders_rate',
F.when(F.col('pattern_bsr_orders_total') > 0,
F.round(F.col('theme_label_bsr_orders') / F.col(
'pattern_bsr_orders_total'), 4))
.otherwise(F.lit(0.0)))
self.df_st_theme_agg = self.df_st_theme_agg.withColumn('pattern_num_rate',
F.when(F.col('pattern_st_count') > 0,
F.round(F.col('theme_label_counts') / F.col(
'pattern_st_count'), 4))
.otherwise(F.lit(0.0)))
self.df_st_theme_agg.show(10, truncate=False)
self.df_st_theme_agg = self.df_st_theme_agg.select(
F.col('pattern_st'),
F.col('pattern_bsr_orders_total'),
F.col('pattern_st_count'),
F.col('theme_ch'),
F.col('theme_en'),
F.col('theme_label_ch'),
F.col('label_en_str').alias('theme_label_en'),
F.col('theme_label_bsr_orders'),
F.col('theme_label_counts'),
F.col('pattern_bsr_orders_rate'),
F.col('pattern_num_rate')
)
pass
def handle_st_pattern_special_agg(self):
# 将二级词匹配明细和主题功能词标签明细进行匹配;pattern_type=1的情况
df_special_st_theme = self.df_st_theme_base.filter("pattern_type = 1")
self.df_st_match_topic_detail = self.df_pattern_st_words.join(
df_special_st_theme, on=['st_key', 'search_term'], how='left'
)
self.df_st_match_topic_detail = self.df_st_match_topic_detail.filter("theme_label_en is not null")
df_st_match_agg = self.df_st_match_topic_detail.groupby(
['pattern_st', 'theme_ch', 'theme_en', 'theme_label_ch', 'theme_label_num_info', 'theme_label_unit_info']).agg(
F.count('bsr_orders').alias("same_info_count"),
F.sum('bsr_orders').alias("same_info_bsr_orders")
)
df_st_match_no_num_agg = df_st_match_agg.filter("theme_label_num_info is null")
df_st_match_no_num_info = df_st_match_no_num_agg.groupby(
['pattern_st', 'theme_ch', 'theme_en', 'theme_label_ch', 'theme_label_unit_info']).agg(
F.sum('same_info_count').alias("st_label_num"),
F.sum('same_info_bsr_orders').alias("st_label_bsr_orders"),
F.col('theme_label_unit_info').alias("label_info")
)
df_st_match_no_num_info = df_st_match_no_num_info.drop("theme_label_unit_info")
df_st_match_no_unit_agg = df_st_match_agg.filter("theme_label_unit_info in ('x', 'by')")
df_st_match_no_unit_info = df_st_match_no_unit_agg.groupby(
['pattern_st', 'theme_ch', 'theme_en', 'theme_label_ch', 'theme_label_num_info']).agg(
F.sum('same_info_count').alias("st_label_num"),
F.sum('same_info_bsr_orders').alias("st_label_bsr_orders"),
F.col("theme_label_num_info").alias("label_info")
)
df_st_match_no_unit_info = df_st_match_no_unit_info.drop("theme_label_num_info")
df_st_match_complete_agg = df_st_match_agg.filter(
(F.col("theme_label_num_info").isNotNull()) & (F.col("theme_label_unit_info").isNotNull()) & (F.col("theme_label_unit_info") != 'x') & (
F.col("theme_label_unit_info") != 'by'))
df_st_match_complete_agg = df_st_match_complete_agg.withColumn("complete_info",
F.concat_ws(' ', F.col("theme_label_num_info"),
F.col("theme_label_unit_info")))
df_st_match_complete_info = df_st_match_complete_agg.groupby(
['pattern_st', 'theme_ch', 'theme_en', 'theme_label_ch', 'theme_label_num_info']).agg(
F.sum('same_info_count').alias("st_label_num"),
F.sum('same_info_bsr_orders').alias("st_label_bsr_orders"),
F.concat_ws("/", F.collect_set(F.col("complete_info"))).alias("label_info")
)
df_st_match_complete_info = df_st_match_complete_info.drop("theme_label_num_info")
self.df_st_match_topic_agg = df_st_match_no_num_info.unionByName(df_st_match_no_unit_info).unionByName(
df_st_match_complete_info)
self.df_st_match_topic_agg = self.df_st_match_topic_agg.join(
self.df_pattern_st_agg, on=['pattern_st'], how='left'
)
self.df_st_match_topic_agg = self.df_st_match_topic_agg.withColumn("pattern_bsr_orders_rate",
F.when(F.col("pattern_bsr_orders_total") > 0,
F.round((F.col(
"st_label_bsr_orders") / F.col(
"pattern_bsr_orders_total")),
4)).otherwise(F.lit(0.0)))
self.df_st_match_topic_agg = self.df_st_match_topic_agg.withColumn("pattern_num_rate",
F.when(F.col("pattern_st_count") > 0,
F.round(
(F.col("st_label_num") / F.col(
"pattern_st_count")),
4)).otherwise(F.lit(0.0)))
self.df_st_match_topic_agg = self.df_st_match_topic_agg.select(
F.col('pattern_st'),
F.col('pattern_bsr_orders_total'),
F.col('pattern_st_count'),
F.col('theme_ch'),
F.col('theme_en'),
F.col('theme_label_ch'),
F.col('label_info').alias('theme_label_en'),
F.col('st_label_bsr_orders').alias('theme_label_bsr_orders'),
F.col('st_label_num').alias('theme_label_counts'),
F.col('pattern_bsr_orders_rate'),
F.col('pattern_num_rate')
)
def save_data(self):
hdfs_path_asin_info = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
print(f"清除hdfs目录中:{hdfs_path_asin_info}")
HdfsUtils.delete_file_in_folder(hdfs_path_asin_info)
self.df_st_theme_agg = self.df_st_theme_agg.unionByName(self.df_st_match_topic_agg)
# 添加逻辑;如果二级词/三级词自身也有匹配词,则相应的统计需要过滤掉
df_agg_filter = self.df_st_theme_base.select(
F.col('search_term'),
F.col('theme_label_en').alias('theme_label_en_join'),
F.lit(1).alias('join_flag')
)
self.df_st_theme_agg = self.df_st_theme_agg.join(
df_agg_filter, on=(self.df_st_theme_agg.pattern_st == df_agg_filter.search_term) & (self.df_st_theme_agg.theme_label_en == df_agg_filter.theme_label_en_join), how='left'
)
# join_flag 如果为1则说明结果集匹配到了二级词/三级词自身的相关标签,因此需要过滤
self.df_st_theme_agg = self.df_st_theme_agg.filter(F.col('join_flag').isNull())
self.df_st_theme_agg = self.df_st_theme_agg.select(
F.col('pattern_st'),
F.col('pattern_bsr_orders_total'),
F.col('pattern_st_count'),
F.col('theme_ch'),
F.col('theme_en'),
F.col('theme_label_ch'),
F.col('theme_label_en'),
F.col('theme_label_bsr_orders'),
F.col('theme_label_counts'),
F.col('pattern_bsr_orders_rate'),
F.col('pattern_num_rate'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
)
self.df_st_theme_agg = self.df_st_theme_agg.repartition(20)
partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
self.df_st_theme_agg.write.saveAsTable(name=self.hive_tb, format='hive', mode='append',
partitionBy=partition_by)
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None) # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
obj = DwtStThemeAgg(site_name=site_name, date_type=date_type, date_info=date_info)
obj.handle_data()