1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.storagelevel import StorageLevel
class DwtMerchantwordsMerge(Templates):
def __init__(self, site_name='us', batch='2024-00-00'):
super().__init__()
self.site_name = site_name
self.batch = batch
self.db_save = 'dwt_merchantwords_merge'
self.spark = self.create_spark_object(app_name=f"DwtMerchantwordsMerge: {self.site_name}, {self.batch}")
self.partitions_num = 120
self.partitions_by = ['site_name', 'batch']
self.df_partition = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.db_save}/site_name={self.site_name}/batch={self.batch}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
def read_data(self):
print("读取dwd_merchantwords_measure_v2的分区数据")
sql = f"""
select
keyword,
lang,
st_ao_val,
st_zr_flow_proportion,
volume,
avg_3m,
avg_12m,
asin_total_num,
asin_num,
self_asin_num,
self_asin_proportion,
st_sp_counts,
st_zr_counts,
st_monthly_sales,
listing_sales_avg,
reviews_avg,
rating_avg,
price_avg,
depth,
results_count,
sponsored_ads_count,
page_1_reviews,
appearance,
last_seen,
update_time,
last_batch,
package_quantity,
batch as measure_batch
from dwd_merchantwords_measure_v2
where site_name = '{self.site_name}';
"""
self.df_partition = self.spark.sql(sqlQuery=sql)
self.df_partition = self.df_partition.repartition(80).persist(StorageLevel.MEMORY_ONLY)
self.df_partition.show(10, truncate=True)
def handle_data(self):
window = Window.partitionBy('keyword').orderBy(
F.desc_nulls_last('measure_batch')
)
self.df_partition = self.df_partition.withColumn("u_rank", F.row_number().over(window=window))
self.df_partition = self.df_partition.filter('u_rank=1').drop('u_rank')
self.df_partition = self.df_partition.filter('asin_num>0')
self.df_save = self.df_partition.withColumn(
'volume',
F.when(F.col('last_batch') == '2023-1', 0)
.otherwise(F.col('volume'))
).withColumn(
'asin_total_num',
F.greatest('asin_total_num', 'results_count')
).withColumn(
'site_name',
F.lit(self.site_name)
).withColumn(
'batch',
F.lit(self.batch)
)
if __name__ == '__main__':
site_name = sys.argv[1]
batch = sys.argv[2]
handle_obj = DwtMerchantwordsMerge(site_name=site_name, batch=batch)
handle_obj.run()