1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
author: wangrui
description: 根据dim_st_asin_info表得到新增的zr类型asin信息并导出到pg数据库存储
table_read_name: dim_st_asin_info
table_save_name: dwt_zr_asin_info
table_save_level: dwt
version: 2.0
created_date: 2023-04-23
updated_date: 2023-04-23
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import re
from pyspark.storagelevel import StorageLevel
class DwtZrAsinInfo(Templates):
def __init__(self, site_name="us"):
super().__init__()
self.site_name = site_name
self.db_save = f"dwt_zr_asin_info"
self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}")
self.df_date = self.get_year_week_tuple()
self.get_date_info_tuple()
self.df_save = self.spark.sql(f"select 1+1;")
self.df_asin_info = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'asin_type']
self.reset_partitions(30)
self.u_get_zr_asin_type = self.spark.udf.register("u_get_zr_asin_type", self.udf_get_zr_asin_type, StringType())
self.u_change_data_type = self.spark.udf.register("u_change_data_type", self.udf_change_data_type, StringType())
@staticmethod
def udf_get_zr_asin_type(asin):
pattern_list = [
("^B0[1-6][A-Z0-9]*", "B01_B06"), ("^B07[0-9][A-Z0-9]*", "B070_B079"),
("^B07[A-Z][A-Z0-9]*", "B07A_B07Z"), ("^B08[0-9][A-Z0-9]*", "B080_B089"),
("^B08[A-Z][A-Z0-9]*", "B08A_B08Z"), ("^B09[0-9][A-Z0-9]*", "B090_B099"),
("^B09[A-N][A-Z0-9]*", "B09A_B09N"), ("^B09[O-Z][A-Z0-9]*", "B09O_B09Z"),
("^B0B[0-9][A-Z0-9]*", "B0B0_B0B9"), ("^B0B[A-G][A-Z0-9]*", "B0BA_B0BG"),
("^B0B[H-K][A-Z0-9]*", "B0BH_B0BK"), ("^B0B[L-N][A-Z0-9]*", "B0BL_B0BN"),
("^B0B[O-R][A-Z0-9]*", "B0BO_B0BR"), ("^B0B[S-V][A-Z0-9]*", "B0BS_B0BV"),
("^B0B[W-Z][A-Z0-9]*", "B0BW_B0BZ")
]
if asin is not None:
asin_type = 'OTHER'
asin = str(asin).upper()
for pattern, type_value in pattern_list:
if re.match(pattern, asin):
asin_type = type_value
break
return asin_type
@staticmethod
def udf_change_data_type(column):
column = str(column).replace("[", "{").replace("]", "}")
return column
def read_data(self):
print("1. 读取dwt_flow_asin表")
sql = f"""select asin, asin_zr_counts, asin_sp_counts, asin_sb_counts, round(asin_ao_val,3) as asin_ao_val, asin_price,
bsr_orders as asin_bsr_orders, round(sales, 2) as bsr_orders_sale, concat(date_info, '-15') as cur_time from dwt_flow_asin
where site_name='{self.site_name}' and date_type='month'
"""
print("sql:", sql)
self.df_asin_info = self.spark.sql(sqlQuery=sql)
self.df_asin_info = self.df_asin_info.drop_duplicates(['asin', 'cur_time'])
self.df_asin_info = self.df_asin_info.repartition(100).persist(StorageLevel.DISK_ONLY)
self.df_asin_info.show(10, truncate=False)
def get_asin_info_list(self):
self.df_asin_info = self.df_asin_info.na.fill(
{"asin_ao_val": -1.0, "asin_zr_counts": -1, "asin_sp_counts": -1, "asin_sb_counts": -1,
"asin_bsr_orders": -1, "asin_price": -1.0, "bsr_orders_sale": -1})
df_asin_info_agg = self.df_asin_info.groupby(['asin']).agg(F.sort_array(
F.collect_list(F.struct("cur_time", "asin_ao_val", "asin_zr_counts", "asin_sp_counts", "asin_sb_counts",
"asin_price", "asin_bsr_orders","bsr_orders_sale"))).alias("values"))
df_asin_info_agg = df_asin_info_agg.\
select("asin", F.expr("transform(values, x -> x.cur_time)").alias("times_array"),
F.expr("transform(values, x -> x.asin_ao_val)").alias("ao_array"),
F.expr("transform(values, x -> x.asin_zr_counts)").alias("zr_count_array"),
F.expr("transform(values, x -> x.asin_sp_counts)").alias("sp_count_array"),
F.expr("transform(values, x -> x.asin_sb_counts)").alias("sb_count_array"),
F.expr("transform(values, x -> x.asin_price)").alias("price_array"),
F.expr("transform(values, x -> x.asin_bsr_orders)").alias("bsr_orders_array"),
F.expr("transform(values, x -> x.bsr_orders_sale)").alias("bsr_orders_sale_array"))
for column in ['times_array', 'ao_array', 'zr_count_array',
'sp_count_array', 'sb_count_array', 'price_array', 'bsr_orders_array', 'bsr_orders_sale_array']:
new_column = column.replace("array", "list")
df_asin_info_agg = df_asin_info_agg.withColumn(new_column, self.u_change_data_type(df_asin_info_agg[column]))
df_asin_info_agg = df_asin_info_agg.drop(column).persist(StorageLevel.DISK_ONLY)
df_asin_info_agg.show(10, truncate=False)
self.df_save = df_asin_info_agg.withColumn("orders_list", F.lit(None))
def handle_data_group(self):
self.df_save = self.df_save.withColumn("asin_type", self.u_get_zr_asin_type(self.df_save.asin))
self.df_save = self.df_save.withColumn("created_time",
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \
withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
def handle_data(self):
self.get_asin_info_list()
self.handle_data_group()
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
handle_obj = DwtZrAsinInfo(site_name=site_name)
handle_obj.run()