1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
@Author : HuangJian
@SourceTable :
①ods_seller_account_syn
②ods_seller_asin_account
③ods_seller_account_feedback
@SinkTable :
①dim_seller_asin_info
@CreateTime : 2022/11/2 9:56
@UpdateTime : 2022/11/2 9:56
"""
import datetime
import traceback
import os
import sys
from datetime import date, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.sql.functions import ceil
from pyspark.sql.types import IntegerType
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
class DwdSellerAsinInfo(Templates):
def __init__(self, site_name='us', date_type="week", date_info='2022-40'):
super().__init__()
self.db_save = "dim_seller_asin_info"
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.spark = self.create_spark_object(
app_name=f"{self.db_save}:{self.site_name}_{self.date_type}_{self.date_info}")
self.df_date = self.get_year_week_tuple()
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.partitions_num = 20
self.reset_partitions(partitions_num=self.partitions_num)
# 初始化全局变量df--ods获取数据的原始df
self.df_seller_account_syn = self.spark.sql("select 1+1;")
self.df_seller_asin_account = self.spark.sql("select 1+1;")
self.df_seller_account_feedback = self.spark.sql("select 1+1;")
# 初始化全局变量df--dwd层转换输出的df
self.df_save = self.spark.sql(f"select 1+1;")
# 1.获取原始数据
def read_data(self):
# 获取ods_seller相关原始表
print("获取 ods_seller_account_syn")
sql = f"select id as account_id, account_name from ods_seller_account_syn where site_name='{self.site_name}' and date_type='{self.date_type}'and date_info='{self.date_info}' group by id,account_name"
self.df_seller_account_syn = self.spark.sql(sqlQuery=sql)
print("self.df_seller_account_syn:", self.df_seller_account_syn.show(10, truncate=False))
print("获取 ods_seller_asin_account")
sql = f"select account_name, asin from ods_seller_asin_account where site_name = '{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' group by account_name, asin"
self.df_seller_asin_account = self.spark.sql(sqlQuery=sql).cache()
print("self.df_seller_asin_account:", self.df_seller_asin_account.show(10, truncate=False))
print("获取 ods_seller_account_feedback")
# 抓取会有重复,因此需要全局去重,count取最大即是最近的一个数据
sql = f"select account_id, " \
f" country_name, " \
f" count_30_day as fb_count_30_day, " \
f" count_1_year as fb_count_1_year, " \
f" count_lifetime as fb_count_lifetime " \
f"from (select account_id,country_name,count_30_day, count_1_year, count_lifetime, " \
f" row_number() over (partition by account_id order by created_at desc) sort_flag " \
f" from ods_seller_account_feedback " \
f" where site_name = '{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' " \
f" ) t1 " \
f"where sort_flag = 1; "
self.df_seller_account_feedback = self.spark.sql(sqlQuery=sql)
# print("self.df_seller_account_feedback", self.df_seller_account_feedback.show(10, truncate=False))
def handle_data(self):
self.handle_seller_asin_base()
def handle_seller_asin_base(self):
# ods_seller_asin_account与ods_seller_account_syn关联,得到account_id
self.df_save = self.df_seller_asin_account.\
join(self.df_seller_account_syn, on='account_name', how='left')
# print("df_seller_asin_account join df_seller_account_syn: ", self.df_save.show(10, truncate=False))
self.df_save = self.df_save.\
join( self.df_seller_account_feedback, on='account_id', how='left')
# print("df_save join df_seller_account_feedback: ", self.df_save.show(10, truncate=False))
# 空值处理
# int类型空值处理
self.df_save = self.df_save. \
na.fill({"fb_count_30_day": 0, "fb_count_1_year": 0.0, "fb_count_lifetime": 0.0})
# String类型空值处理
self.df_save = self.df_save. \
na.fill({"country_name": "null"})
# 预留字段补全
self.df_save = self.df_save.withColumn("re_string_field1", F.lit("null"))
self.df_save = self.df_save.withColumn("re_string_field2", F.lit("null"))
self.df_save = self.df_save.withColumn("re_string_field3", F.lit("null"))
self.df_save = self.df_save.withColumn("re_int_field1", F.lit(0))
self.df_save = self.df_save.withColumn("re_int_field2", F.lit(0))
self.df_save = self.df_save.withColumn("re_int_field3", F.lit(0))
# 分区字段补全
self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
# print("self.df_save", self.df_save.show(10, truncate=False))
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1
handle_obj = DwdSellerAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()