1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql.window import Window
from pyspark.storagelevel import StorageLevel
from pyspark.sql import functions as F
class DwtMerchantwordsStDetailMerge(Templates):
def __init__(self, site_name='us'):
super().__init__()
self.site_name = site_name
self.batch = '2024-1'
self.db_save = 'dwt_merchantwords_st_detail_merge'
self.spark = self.create_spark_object(
app_name=f"DwtMerchantwordsStDetailMerge: {self.site_name}, {self.batch}")
self.partitions_num = 15
self.partitions_by = ['site_name', 'batch']
self.df = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.db_save}/site_name={self.site_name}/batch={self.batch}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
def read_data(self):
print("读取dwt_merchantwords_st_detail数据")
sql = f"""
select
keyword,
volume,
avg_3m,
avg_12m,
depth,
results_count,
sponsored_ads_count,
page_1_reviews,
appearance,
last_seen,
update_time,
lang,
batch as last_batch
from dwt_merchantwords_st_detail
where site_name = '{self.site_name}'
and batch in ('2023-1', '2024-1');
"""
self.df = self.spark.sql(sqlQuery=sql)
self.df = self.df.repartition(80).persist(StorageLevel.MEMORY_ONLY)
self.df.show(10, truncate=True)
def handle_data(self):
window = Window.partitionBy('keyword').orderBy(
F.desc_nulls_last('last_batch')
)
self.df = self.df.withColumn("u_rank", F.row_number().over(window=window))
self.df = self.df.filter('u_rank=1').drop('u_rank')
self.df_save = self.df.withColumn(
'site_name',
F.lit(self.site_name)
).withColumn(
'batch',
F.lit(self.batch)
)
if __name__ == '__main__':
site_name = sys.argv[1]
handle_obj = DwtMerchantwordsStDetailMerge(site_name=site_name)
handle_obj.run()