1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""
@Author : HuangJian
@Description : 店铺top20数据详情表
@SourceTable :
①ods_st_key
②dim_st_detail
@SinkTable : dwt_fb_top20_info
@CreateTime : 2022/07/24 14:55
@UpdateTime : 2022/07/24 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from pyspark.sql.types import IntegerType
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_udf import udf_new_asin_flag
class DwtFbTop20Info(object):
def __init__(self, site_name, date_type, date_info):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.hive_tb = f"dwt_fb_top20_asin_info"
self.partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
# 创建spark_session对象相关
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
# 获取不同维度日期下的计算日期YYYY-MM-DD
self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info)
# 初始化全局df
self.df_fb_top20_asin_info = self.spark.sql(f"select 1+1;")
self.df_seller_account = self.spark.sql(f"select 1+1;")
# 初始化UDF函数
self.udf_new_asin_flag = self.spark.udf.register("udf_new_asin_flag", udf_new_asin_flag, IntegerType())
def read_data(self):
# 获取店铺抓取top20的基础信息数据
sql = f"""
with base_table as(
select
seller_id,
asin,
title,
img_url,
price,
rating,
total_comments,
row_num
from ods_asin_detail_product
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
),
history_asin as(
select
asin,
asin_volume,
asin_weight,
asin_launch_time
from dim_cal_asin_history_detail
where site_name = '{self.site_name}'
)
select
base_table.seller_id,
base_table.asin,
base_table.title as asin_title,
base_table.img_url as asin_img_url,
base_table.price as asin_price,
base_table.rating as asin_rating,
base_table.total_comments as asin_total_comments,
base_table.row_num as fb_row_num,
history_asin.asin_volume,
history_asin.asin_weight,
history_asin.asin_launch_time,
udf_new_asin_flag(history_asin.asin_launch_time,'{self.cal_date}') as is_asin_new
from base_table
left join history_asin
on base_table.asin = history_asin.asin
"""
self.df_fb_top20_asin_info = self.spark.sql(sqlQuery=sql).cache()
print(sql)
self.df_fb_top20_asin_info = self.df_fb_top20_asin_info.drop_duplicates(['seller_id', 'asin'])
# print("self.df_fb_top20_asin_info", self.df_fb_top20_asin_info.show(10, truncate=False))
# 获取ods_seller_account_syn提取account_name
print("获取 ods_seller_account_syn")
sql = f"""
select
seller_id,
account_name,
id
from ods_seller_account_syn
where site_name='{self.site_name}'
"""
self.df_seller_account = self.spark.sql(sqlQuery=sql)
# 进行去重
self.df_seller_account = self.df_seller_account.orderBy(self.df_seller_account.id.desc())
self.df_seller_account = self.df_seller_account.drop_duplicates(['seller_id'])
self.df_seller_account = self.df_seller_account.drop('id')
print(sql)
def sava_data(self):
# 关联ods_seller_account_syn,带回account_name
df_save = self.df_fb_top20_asin_info.join(
self.df_seller_account, on='seller_id', how='inner'
)
df_save = df_save.select(
F.col('seller_id'),
F.col('account_name'),
F.col('asin'),
F.col('asin_title'),
F.col('asin_img_url'),
F.col('asin_price'),
F.col('asin_rating'),
F.col('asin_total_comments'),
F.col('fb_row_num'),
F.col('asin_volume'),
F.col('asin_weight'),
F.col('asin_launch_time'),
F.col('is_asin_new'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
)
# CommonUtil.check_schema(self.spark, df_save, self.hive_tb)
print(f"清除hdfs目录中:{self.hdfs_path}")
HdfsUtils.delete_file_in_folder(self.hdfs_path)
df_save = df_save.repartition(10)
partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
def run(self):
# 读取数据
self.read_data()
# 字段处理
self.sava_data()
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1
handle_obj = DwtFbTop20Info(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()