1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
from utils.hdfs_utils import HdfsUtils
from pyspark.sql.types import DoubleType, StringType
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_df import get_bsr_tree_full_name_df
"""
利润率相关配置表
"""
class DimProfitConfig(object):
def __init__(self, site_name):
app_name = f"{self.__class__.__name__}"
self.spark = SparkUtil.get_spark_session(app_name)
self.udf_parse_num_reg = F.udf(self.udf_parse_num, DoubleType())
self.udf_lower_category_name_reg = F.udf(self.udf_lower_category_name, StringType())
self.hive_tb = "dim_profit_config"
self.site_name = site_name
pass
@staticmethod
def udf_parse_num(num: str):
if num is None:
return None
return round(float(num.replace("%", "")) / 100, 4)
@staticmethod
def udf_lower_category_name(category_name: str):
if category_name is None:
return None
category_name = category_name.replace("_", "")
category_name = category_name.replace("&", "")
category_name = category_name.replace("\"", "")
category_name = category_name.replace(",", "")
category_name = category_name.replace("'", "")
category_name = category_name.replace(" ", "")
return category_name.lower()
def run(self):
name_df = get_bsr_tree_full_name_df(self.site_name, self.spark) \
.select(
F.expr("replace(full_name,' ', '')").alias("full_name"),
F.col("category_id"),
F.col("en_name").alias("category_name"),
F.col("category_first_id"),
)
sql_old = f"""
with old_tb as (
select replace(asin_category_desc, " ", "") as full_name,
node_id as category_id,
category_first_id
from dim_category_desc_id
where site_name='us' and asin_category_desc is not null
),
name_tb as (
select category_id, max(en_name) as en_name
from dim_bsr_category_tree
where site_name = '{self.site_name}'
group by category_id
)
select old_tb.full_name,
old_tb.category_id,
name_tb.en_name as category_name,
old_tb.category_first_id
from old_tb
left join name_tb on old_tb.category_id = name_tb.category_id
"""
old_name_df = self.spark.sql(sql_old)
# 去重
name_df = name_df.unionByName(old_name_df).drop_duplicates(['full_name'])
name_df = name_df.withColumn("category_name_low", self.udf_lower_category_name_reg(F.col("category_name")))
sql = f"""
select name as full_name, cost, avg_cost
from us_profit_cost_new
"""
conn_info = DBUtil.get_connection_info("postgresql", "us")
profit_cost_df = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=sql
).cache()
sql = f"""
select categoy_name as category_name,
calc_type,
config_json as fba_config_json,
referral_fee_formula
from us_profit_fba_config
"""
conn_info = DBUtil.get_connection_info("postgresql", "us")
fba_config_df = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=sql
).cache()
# fba 相关
fba_config_df = fba_config_df.join(name_df, on=['category_name'], how='left').select(
F.col("category_name"),
F.col("calc_type"),
F.col("fba_config_json"),
F.col("referral_fee_formula"),
F.col("category_id"),
F.col("category_first_id"),
).drop_duplicates(['category_id', 'category_first_id'])
# 广告配置项
sql = f"""
select category as category_name,
adv
from us_profit_adv
"""
conn_info = DBUtil.get_connection_info("postgresql", "us")
adv_config_df = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=sql
).cache()
# 退款率原始数据
sql = f"""
select category as category_name,
round(avg(return_ratio), 4) as return_ratio
from us_aba_profit_category_insights
where year_week = '2023-22'
group by category
"""
conn_info = DBUtil.get_connection_info("postgresql", "us")
return_config_df = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=sql
).cache()
return_config_df = return_config_df.withColumn("category_name_low", self.udf_lower_category_name_reg(F.col("category_name")))
# 关联
return_config_df = return_config_df.join(name_df, on=['category_name_low'], how='left').select(
name_df["category_name"].alias("category_name"),
F.col("return_ratio"),
F.col("category_id"),
F.col("category_first_id"),
).drop_duplicates(['category_id', 'category_first_id'])
# return_config_df.show(truncate=False)
# fba 相关
adv_config_df = adv_config_df.join(name_df, on=['category_name'], how='left').select(
F.col("category_name"),
F.col("adv"),
F.col("category_id"),
F.col("category_first_id"),
).drop_duplicates(['category_id', 'category_first_id'])
profit_cost_df = profit_cost_df.join(name_df, on=['full_name'], how='left').select(
F.col("full_name"),
name_df['category_name'].alias("category_name"),
self.udf_parse_num_reg(F.col("cost")).alias("cost"),
self.udf_parse_num_reg(F.col("avg_cost")).alias("avg_cost"),
F.col("category_id"),
F.col("category_first_id"),
).drop_duplicates(['category_id', 'category_first_id'])
# 退款todo
df_save = profit_cost_df \
.join(fba_config_df, on=['category_id', 'category_first_id'], how='fullouter') \
.join(adv_config_df, on=['category_id', 'category_first_id'], how='fullouter') \
.join(return_config_df, on=['category_id', 'category_first_id'], how='fullouter') \
.select(
# 分类
F.col("full_name"),
F.coalesce(
profit_cost_df['category_name'],
fba_config_df['category_name'],
adv_config_df['category_name']
).alias("category_name"),
F.col("category_id"),
F.col("category_first_id"),
# 成本
F.col("cost"),
F.col("avg_cost"),
# fba计算配置类型
F.col("calc_type"),
F.col("fba_config_json"),
F.col("referral_fee_formula"),
# 广告
F.col("adv"),
# 退款率
F.col("return_ratio"),
F.lit(self.site_name).alias("site_name")
)
df_save = df_save.repartition(1)
partition_dict = {
"site_name": self.site_name,
}
# 删除或更新
CommonUtil.save_or_update_table(self.spark, self.hive_tb, partition_dict, df_save)
pass
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
obj = DimProfitConfig(site_name)
obj.run()