Commit 5efdfc6e by chenyuanjie

利润率导出pg

parent 415179d3
...@@ -87,15 +87,21 @@ class ExportNeedProfitRate(object): ...@@ -87,15 +87,21 @@ class ExportNeedProfitRate(object):
# 5. keepa 当日增量 INNER JOIN # 5. keepa 当日增量 INNER JOIN
# keepa 表已整合为单分区快照,用 updated_time > last_date_info 筛"近一天更新"的增量 # keepa 表已整合为单分区快照,用 updated_time > last_date_info 筛"近一天更新"的增量
# weight 取数规则:优先 item_weight;item_weight <= 0 时用 package_weight 兜底;都不大于 0 → 过滤
sql_keepa = f""" sql_keepa = f"""
SELECT asin, package_length, package_width, package_height, item_weight AS weight SELECT asin, package_length, package_width, package_height,
CASE WHEN item_weight > 0 THEN item_weight
WHEN package_weight > 0 THEN package_weight
ELSE NULL
END AS weight
FROM dim_keepa_asin_info FROM dim_keepa_asin_info
WHERE site_name = '{self.site_name}' AND updated_time >= '{self.last_date_info}' WHERE site_name = '{self.site_name}' AND updated_time >= '{self.last_date_info}'
""" """
df_keepa = self.spark.sql(sqlQuery=sql_keepa) \ df_keepa = self.spark.sql(sqlQuery=sql_keepa) \
.filter((F.col('package_length') > 0) & .filter((F.col('package_length') > 0) &
(F.col('package_width') > 0) & (F.col('package_width') > 0) &
(F.col('package_height') > 0)) \ (F.col('package_height') > 0) &
(F.col('weight') > 0)) \
.repartition(40, 'asin') .repartition(40, 'asin')
df_result = df_flow \ df_result = df_flow \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment