1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import count, col
class WordFrequency(object):
def __init__(self):
self.spark = SparkUtil.get_spark_session("us_aba_last365_word_frequency")
def run(self):
sql1 = f"""
select search_term, date_info
from dwt_aba_st_analytics
where site_name = 'us'
and date_type = 'month'
and date_info in
('2024-10', '2024-09', '2024-08', '2024-07', '2024-06', '2024-05',
'2024-04', '2024-03', '2024-02', '2024-01', '2023-12', '2023-11')
and rank <= 1000000
and st_brand_label = 1;
"""
df_st = self.spark.sql(sql1).cache()
print("df_st数量是:")
print(df_st.count())
sql2 = f"""
select search_term, first_match_brand as brand, date_info
from dws_st_brand_info
where site_name = 'us'
and date_type = 'month'
and date_info in
('2024-10', '2024-09', '2024-08', '2024-07', '2024-06', '2024-05',
'2024-04', '2024-03', '2024-02', '2024-01', '2023-12', '2023-11')
and st_brand_label = 1;
"""
df_brand = self.spark.sql(sql2).cache()
print("df_brand数量是:")
print(df_brand.count())
df_save = df_st.join(
df_brand, on=['date_info', 'search_term'], how='left'
).drop('date_info')
print("df_save数量是:")
print(df_save.count())
df_save = df_save.groupby(['brand']).agg(
count('brand').alias('frequency')
).orderBy('frequency', ascending=False)
df_save.show(20, False)
df_save = df_save.withColumn("frequency", col("frequency").cast("int"))
total_sum = df_save.select("frequency").groupBy().sum().collect()[0][0]
if total_sum == df_st.count():
print('验证成功')
else:
print('验证失败')
output_path = "hdfs:///user/chenyuanjie/test1/"
df_save.write.mode("overwrite").format("csv").option("delimiter", "^").option("lineSep", "\n").option("header", "false").option("compression", "none").save(output_path)
if __name__ == '__main__':
obj = WordFrequency()
obj.run()