Commit 3d10c8d2 by chenyuanjie

ABA搜索词-更新利润率数据

parent ff1ff0b8
...@@ -48,10 +48,11 @@ class DwtAbaStAnalytics(Templates): ...@@ -48,10 +48,11 @@ class DwtAbaStAnalytics(Templates):
self.df_st_detail = self.spark.sql(f"select 1+1;") self.df_st_detail = self.spark.sql(f"select 1+1;")
self.df_st_key = self.spark.sql(f"select 1+1;") self.df_st_key = self.spark.sql(f"select 1+1;")
self.df_st_market = self.spark.sql(f"select 1+1;") self.df_st_market = self.spark.sql(f"select 1+1;")
self.df_st_volume_fba = self.spark.sql(f"select 1+1;") # self.df_st_volume_fba = self.spark.sql(f"select 1+1;")
self.df_st_brand = self.spark.sql(f"select 1+1;") self.df_st_brand = self.spark.sql(f"select 1+1;")
self.df_asin_label = self.spark.sql(f"select 1+1;") self.df_asin_label = self.spark.sql(f"select 1+1;")
self.df_is_hidden_cate = self.spark.sql(f"select 1+1;") self.df_is_hidden_cate = self.spark.sql(f"select 1+1;")
self.df_asin_profit_rate = self.spark.sql(f"select 1+1;")
# 自定义udf函数注册 # 自定义udf函数注册
self.u_contains = self.spark.udf.register('u_contains', self.udf_contains, IntegerType()) self.u_contains = self.spark.udf.register('u_contains', self.udf_contains, IntegerType())
...@@ -377,20 +378,20 @@ class DwtAbaStAnalytics(Templates): ...@@ -377,20 +378,20 @@ class DwtAbaStAnalytics(Templates):
self.df_st_market.show(10, truncate=True) self.df_st_market.show(10, truncate=True)
# 获取dwd_st_volume_fba 取gross_profit_fee_air 和 gross_profit_fee_sea # 获取dwd_st_volume_fba 取gross_profit_fee_air 和 gross_profit_fee_sea
sql = f""" # sql = f"""
select # select
search_term, # search_term,
gross_profit_fee_air, # gross_profit_fee_air,
gross_profit_fee_sea # gross_profit_fee_sea
from dwd_st_volume_fba # from dwd_st_volume_fba
where site_name = '{self.site_name}' # where site_name = '{self.site_name}'
and date_type = '{self.date_type}' # and date_type = '{self.date_type}'
and date_info = '{self.date_info}' # and date_info = '{self.date_info}'
""" # """
self.df_st_volume_fba = self.spark.sql(sqlQuery=sql) # self.df_st_volume_fba = self.spark.sql(sqlQuery=sql)
self.df_st_volume_fba = self.df_st_volume_fba.repartition(80, 'search_term').cache() # self.df_st_volume_fba = self.df_st_volume_fba.repartition(80, 'search_term').cache()
print("self.df_st_volume_fba:") # print("self.df_st_volume_fba:")
self.df_st_volume_fba.show(10, truncate=True) # self.df_st_volume_fba.show(10, truncate=True)
# 获取影视标签dim_asin_label 取 asin_label_type # 获取影视标签dim_asin_label 取 asin_label_type
sql = f""" sql = f"""
...@@ -459,6 +460,15 @@ class DwtAbaStAnalytics(Templates): ...@@ -459,6 +460,15 @@ class DwtAbaStAnalytics(Templates):
print("self.df_is_hidden_cate:") print("self.df_is_hidden_cate:")
self.df_is_hidden_cate.show(10, truncate=True) self.df_is_hidden_cate.show(10, truncate=True)
# asin利润率
sql = f"""
select asin, price as asin_price, ocean_profit, air_profit
from dim_asin_profit_rate_info where site_name = '{self.site_name}'
"""
self.df_asin_profit_rate = self.spark.sql(sqlQuery=sql).repartition(80, 'asin').dropDuplicates(['asin', 'asin_price']).cache()
print("self.df_asin_profit_rate:")
self.df_asin_profit_rate.show(10, truncate=True)
def handle_data(self): def handle_data(self):
# 对基础计算表进行关联 # 对基础计算表进行关联
self.handle_base_join() self.handle_base_join()
...@@ -485,11 +495,14 @@ class DwtAbaStAnalytics(Templates): ...@@ -485,11 +495,14 @@ class DwtAbaStAnalytics(Templates):
self.df_asin_detail, on=['asin'], how='left' self.df_asin_detail, on=['asin'], how='left'
).join( ).join(
self.df_asin_label, on=['asin'], how='left' self.df_asin_label, on=['asin'], how='left'
).join(
self.df_asin_profit_rate, on=['asin', 'asin_price'], how='left'
).cache() ).cache()
self.df_st_asin_measure.unpersist() self.df_st_asin_measure.unpersist()
self.df_asin_measure.unpersist() self.df_asin_measure.unpersist()
self.df_asin_detail.unpersist() self.df_asin_detail.unpersist()
self.df_asin_label.unpersist() self.df_asin_label.unpersist()
self.df_asin_profit_rate.unpersist()
self.df_st_asin_cal = self.df_st_asin_join.join( self.df_st_asin_cal = self.df_st_asin_join.join(
self.df_seller_asin_country, on=['asin'], how='left' self.df_seller_asin_country, on=['asin'], how='left'
...@@ -556,7 +569,9 @@ class DwtAbaStAnalytics(Templates): ...@@ -556,7 +569,9 @@ class DwtAbaStAnalytics(Templates):
F.sum("asin_amazon_orders").alias("amazon_monthly_sales"), F.sum("asin_amazon_orders").alias("amazon_monthly_sales"),
F.avg("asin_title_len").alias("title_length_avg"), F.avg("asin_title_len").alias("title_length_avg"),
F.avg("asin_rating").alias("rating_avg"), F.avg("asin_rating").alias("rating_avg"),
F.avg("asin_total_comments").alias("total_comments_avg") F.avg("asin_total_comments").alias("total_comments_avg"),
F.avg("ocean_profit").alias("ocean_profit_avg"),
F.avg("air_profit").alias("air_profit_avg")
).repartition(80, 'search_term').cache() ).repartition(80, 'search_term').cache()
def handle_brand_seller_agg(self): def handle_brand_seller_agg(self):
...@@ -767,8 +782,6 @@ class DwtAbaStAnalytics(Templates): ...@@ -767,8 +782,6 @@ class DwtAbaStAnalytics(Templates):
).join( ).join(
self.df_st_market, on=['search_term'], how='left' self.df_st_market, on=['search_term'], how='left'
).join( ).join(
self.df_st_volume_fba, on=['search_term'], how='left'
).join(
self.df_st_brand, on=['search_term'], how='left' self.df_st_brand, on=['search_term'], how='left'
).join( ).join(
self.df_is_hidden_cate, on=['st_bsr_cate_1_id_new'], how='left' self.df_is_hidden_cate, on=['st_bsr_cate_1_id_new'], how='left'
...@@ -777,7 +790,6 @@ class DwtAbaStAnalytics(Templates): ...@@ -777,7 +790,6 @@ class DwtAbaStAnalytics(Templates):
self.df_st_key.unpersist() self.df_st_key.unpersist()
self.df_st_num_stats.unpersist() self.df_st_num_stats.unpersist()
self.df_st_market.unpersist() self.df_st_market.unpersist()
self.df_st_volume_fba.unpersist()
self.df_st_brand.unpersist() self.df_st_brand.unpersist()
self.df_is_hidden_cate.unpersist() self.df_is_hidden_cate.unpersist()
...@@ -862,8 +874,6 @@ class DwtAbaStAnalytics(Templates): ...@@ -862,8 +874,6 @@ class DwtAbaStAnalytics(Templates):
"supply_demand", "supply_demand",
"market_cycle_type", "market_cycle_type",
"color_proportion", "color_proportion",
"gross_profit_fee_air",
"gross_profit_fee_sea",
"multi_color_proportion", "multi_color_proportion",
"multi_size_proportion", "multi_size_proportion",
"st_4_20_ao_avg", "st_4_20_ao_avg",
...@@ -905,7 +915,9 @@ class DwtAbaStAnalytics(Templates): ...@@ -905,7 +915,9 @@ class DwtAbaStAnalytics(Templates):
"is_hidden_cate", "is_hidden_cate",
"st_dd50_proportion", "st_dd50_proportion",
"st_dd100_proportion", "st_dd100_proportion",
"st_dd200_proportion" "st_dd200_proportion",
F.round("ocean_profit_avg", 4).alias("gross_profit_fee_sea"),
F.round("air_profit_avg", 4).alias("gross_profit_fee_air")
) )
# 空值处理 # 空值处理
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment