1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.functions import concat_ws
class DwtAmazonReport(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2021-10'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dwt_amazon_report'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.reset_partitions(partitions_num=120)
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.df_dwd_new = self.spark.sql(f"select 1+1;")
self.df_dwd_old = self.spark.sql(f"select 1+1;")
self.df_joined = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def read_data(self):
# 从dwd层读取本月数据
sql1 = f"""
select
asin,
monthly_sales as new_monthly_sales,
zr_count as new_zr_count,
sp_count as new_sp_count,
total_count as new_total_count,
date_info as new_date_info_list
from
dwd_amazon_report
where
site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}';
"""
print(sql1)
self.df_dwd_new = self.spark.sql(sqlQuery=sql1).repartition(15, 'asin').cache()
self.df_dwd_new.show(10, truncate=True)
# 从dwt层读取上月数据
date_info_pre = CommonUtil.get_month_offset(self.date_info, -1)
sql2 = f"""
select
asin,
monthly_sales as old_monthly_sales,
zr_count as old_zr_count,
sp_count as old_sp_count,
total_count as old_total_count,
date_info_list as old_date_info_list
from
dwt_amazon_report
where
site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{date_info_pre}';
"""
print(sql2)
self.df_dwd_old = self.spark.sql(sqlQuery=sql2).repartition(15, 'asin').cache()
self.df_dwd_old.show(10, truncate=True)
def handle_data(self):
hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
# 关联后的列名
join_columns = ['monthly_sales', 'zr_count', 'sp_count', 'total_count', 'date_info_list']
# 获取历史df对象中,date_info的数量,用来确定关联不到的历史asin填充多少个 -1
old_date_info_first = self.df_dwd_old.select('old_date_info_list').distinct().first()
if old_date_info_first is None:
old_date_info_list = None
old_date_info_list_len = 0
else:
old_date_info_list = old_date_info_first[0]
old_date_info_list_len = len(old_date_info_list.split(','))
fillna_old = ('-1,' * old_date_info_list_len).rstrip(',')
# 本月数据如果关联不上,填充一个 -1
fillna_new = '-1'
# 关联df,并填充null值
self.df_joined = self.df_dwd_new.join(
self.df_dwd_old, on='asin', how='full'
)
for col in join_columns:
self.df_joined = self.df_joined.fillna({'old_' + col: fillna_old})
self.df_joined = self.df_joined.fillna({'new_' + col: fillna_new})
# 填充date_info_list
self.df_joined = self.df_joined.withColumn(
"old_date_info_list", F.lit(old_date_info_list)
).withColumn(
"new_date_info_list", F.lit(self.date_info)
)
# 拼接历史数据和本月数据,生成新的列
if old_date_info_first is None:
for col in join_columns:
self.df_joined = self.df_joined.withColumn(
col,
self.df_joined['new_' + col]
)
else:
for col in join_columns:
self.df_joined = self.df_joined.withColumn(
col,
concat_ws(',', self.df_joined['old_' + col], self.df_joined['new_' + col])
)
# 选择需要的列
selected_columns = ['asin'] + join_columns
self.df_save = self.df_joined.select(selected_columns)
self.df_save = self.df_save.withColumn(
"weekly_sales", F.lit(None)
).withColumn(
"weekly_views", F.lit(None)
).withColumn(
"monthly_views", F.lit(None)
).withColumn(
"site_name", F.lit(self.site_name)
).withColumn(
"date_type", F.lit(self.date_type)
).withColumn(
"date_info", F.lit(self.date_info)
)
if __name__ == '__main__':
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
if (site_name in ['us', 'uk', 'de']) and (date_type == 'month') and (date_info >= '2024-04'):
handle_obj = DwtAmazonReport(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
else:
print("暂不计算该维度数据!")
quit()