Commit e8a5c1e8 by chenyuanjie

每日利润率清洗

parent 40fcc4ea
......@@ -20,6 +20,7 @@ class DimAsinProfitRateInfo(object):
self.df_asin_profit = self.spark.sql(f"select 1+1;")
self.df_asin_profit_history = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
......@@ -36,9 +37,20 @@ class DimAsinProfitRateInfo(object):
self.df_asin_profit = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
self.df_asin_profit_history = self.df_asin_profit.filter(f"date_info < '{self.date_info}'").cache()
# 读取keepa数据
sql = f"""
select asin, package_length, package_width, package_height, weight
from dim_keepa_asin_info where site_name = '{self.site_name}';
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
def handle_data(self):
# 因为keepa数据存在更新的情况,保留与keepa最新数据所对应的数据行
self.df_asin_profit = self.df_asin_profit.join(
self.df_keepa_asin, on=['asin', 'package_length', 'package_width', 'package_height', 'weight'], how='inner'
)
# 去重
window = Window.partitionBy(['asin', 'price', 'package_length', 'package_width', 'package_height', 'weight']).orderBy(
window = Window.partitionBy(['asin', 'price']).orderBy(
self.df_asin_profit.updated_time.desc_nulls_last()
)
self.df_asin_profit = self.df_asin_profit.withColumn(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment