1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""
asinVarNum bsrOrders bsrOrdersSale pageInventory asinBoughtMonth sellerJson
"""
from utils.templates import Templates
from yswg_utils.common_udf import udf_parse_seller_json
class AddFields(Templates):
def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='lastest', topic_name="us_asin_detail", batch_size=100000):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.consumer_type = consumer_type # 消费实时还是消费历史
self.df_asin_bs = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.app_name = self.get_app_name()
self.spark = self.create_spark_object(
app_name=f"{self.app_name}")
self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema)
def read_data(self):
# 计算bsrOrders
print("1.2 读取ods_one_category_report表")
if int(self.year) == 2022 and int(self.month) < 3:
sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as asin_bsr_orders from ods_one_category_report " \
f"where site_name='{self.site_name}' and date_type='month' and date_info='2022-12';"
else:
sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as asin_bsr_orders from ods_one_category_report " \
f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';"
print("sql:", sql)
self.df_bs_report = self.spark.sql(sqlQuery=sql).cache()
self.df_bs_report.show(10, truncate=False)
# 处理配送方式
def handle_asin_buy_box_seller_type(self, df):
df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json))
df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn(
"account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed", "seller_json")
return df
def add_fields(self, df):
# df 消费kafka数据的df对象
df_save = df.join(
self.df_asin_bs, on='node_id', how='left'
)
df_save = df_save.withColumn("asin_bsr_orders_sale", df_save.price * df_save.asin_bsr_orders)
# 重命名
df_save = df_save.withColumnRenamed("variat_num", "asinVarNum")
df_save = df_save.withColumnRenamed("asin_bsr_orders", "bsrOrders")
df_save = df_save.withColumnRenamed("asin_bsr_orders_sale", "bsrOrdersSale")
df_save = df_save.withColumnRenamed("page_inventory", "pageInventory")
df_save = df_save.withColumnRenamed("buy_sales", "asinBoughtMonth")
df_save = self.handle_asin_buy_box_seller_type(df_save)
df_save = df_save.withColumnRenamed("seller_json", "sellerJson")
df_save = df_save.withColumnRenamed("buy_box_seller_type", "buyBoxSellerType")