Commit 2fc06234 by chenyuanjie

fix

parent f4da7619
...@@ -38,7 +38,7 @@ class DimAsinProfitRateInfo(object): ...@@ -38,7 +38,7 @@ class DimAsinProfitRateInfo(object):
def handle_data(self): def handle_data(self):
# 去重 # 去重
window = Window.partitionBy(['asin', 'price']).orderBy( window = Window.partitionBy(['asin', 'price', 'package_length', 'package_width', 'package_height', 'weight']).orderBy(
self.df_asin_profit.updated_time.desc_nulls_last() self.df_asin_profit.updated_time.desc_nulls_last()
) )
self.df_asin_profit = self.df_asin_profit.withColumn( self.df_asin_profit = self.df_asin_profit.withColumn(
......
...@@ -63,13 +63,13 @@ class DwtFlowKeepaAsin(object): ...@@ -63,13 +63,13 @@ class DwtFlowKeepaAsin(object):
# 读取已经计算过利润率的asin # 读取已经计算过利润率的asin
sql = f""" sql = f"""
select asin, price from dim_asin_profit_rate_info where site_name = '{self.site_name}' and date_info = '{self.date_info}' select asin, price, package_length, package_width, package_height, weight from dim_asin_profit_rate_info where site_name = '{self.site_name}' and date_info = '{self.date_info}'
""" """
self.df_calc_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache() self.df_calc_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
# 读取已经导出过asin+price,避免重复计算 # 读取已经导出过asin+price,避免重复计算
sql = f""" sql = f"""
select asin, price from dwt_flow_keepa_asin where site_name = '{self.site_name}' select asin, price, package_length, package_width, package_height, weight from dwt_flow_keepa_asin where site_name = '{self.site_name}'
""" """
self.df_export_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache() self.df_export_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
...@@ -79,9 +79,9 @@ class DwtFlowKeepaAsin(object): ...@@ -79,9 +79,9 @@ class DwtFlowKeepaAsin(object):
).join( ).join(
self.df_keepa_asin, on='asin', how='inner' self.df_keepa_asin, on='asin', how='inner'
).join( ).join(
self.df_calc_asin, on=['asin', 'price'], how='left_anti' self.df_calc_asin, on=['asin', 'price', 'package_length', 'package_width', 'package_height', 'weight'], how='left_anti'
).join( ).join(
self.df_export_asin, on=['asin', 'price'], how='left_anti' self.df_export_asin, on=['asin', 'price', 'package_length', 'package_width', 'package_height', 'weight'], how='left_anti'
).cache() ).cache()
self.df_flow_asin.unpersist() self.df_flow_asin.unpersist()
self.df_category_id.unpersist() self.df_category_id.unpersist()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment