Commit 616c9bd6 by chenyuanjie

fix

parent 66c45f86
...@@ -163,17 +163,20 @@ class EsAsinProfitRate(object): ...@@ -163,17 +163,20 @@ class EsAsinProfitRate(object):
'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time' 'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time'
) )
# 从Doris获取asin_crawl_date(用于利润率主索引写入) # 从Doris获取asin_crawl_date和asin_price,用profit_key关联
df_crawl_date = DorisHelper.spark_import_with_flight( df_crawl_date = DorisHelper.spark_import_with_flight(
session=self.spark, session=self.spark,
table_identifier=f"selection.{self.site_name}_asin_latest_detail", table_identifier=f"selection.{self.site_name}_asin_latest_detail",
read_fields="asin,asin_crawl_date" read_fields="asin,asin_crawl_date,asin_price"
).withColumn( ).withColumn(
"asin_crawl_date", F.substring(F.col("asin_crawl_date"), 1, 10) "asin_crawl_date", F.substring(F.col("asin_crawl_date"), 1, 10)
).repartition(40, 'asin') ).withColumn(
"profit_key", F.concat_ws("_", F.col("asin"), F.round(F.col("asin_price"), 2))
).select('profit_key', 'asin_crawl_date') \
.repartition(40, 'profit_key')
self.df_asin_profit_rate = self.df_asin_profit_rate.join( self.df_asin_profit_rate = self.df_asin_profit_rate.join(
df_crawl_date, on='asin', how='left' df_crawl_date, on='profit_key', how='left'
).select( ).select(
'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time', 'asin_crawl_date' 'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time', 'asin_crawl_date'
).cache() ).cache()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment