Commit 8e098cba by chenyuanjie

待计算利润率asin筛选fix

parent cbab55e6
"""
author: CT
description: 导出待计算利润率的 ASIN
1) Hive dwt_flow_asin 月维度读取 date_info >= '2025-05' 的所有 ASIN:
asin / price / category_first_id / asin_crawl_date
2) Doris dwt.{site}_flow_asin_30day 读取所有相关 ASIN:
asin / price / category_first_id / asin_crawl_date
3) union 后按 (asin, price) 去重保留 asin_crawl_date 最新
4) LEFT JOIN 分类、INNER JOIN keepa 增量
keepa 表已整合为单分区快照,按 updated_time > last_date_info 筛增量
5) keepa 关联到的 ASIN 全部导出 PG {site}_asin_profit_rate_calc 重新计算利润率
执行示例: spark-submit export_need_profit_rate.py us 2026-05-15
description: 导出待计算利润率的 ASIN(修复漏洞: 不再仅依赖 keepa 近 1 天更新)
【数据流】
flow_asin 池: Hive dwt_flow_asin 最近 3 月 ∪ Doris dwt.{site}_flow_asin_30day
→ (asin, price) 去重保留 asin_crawl_date 最新
两路合并:
A) keepa 变动重算: flow_asin INNER JOIN keepa(updated_time >= date_info-1天)
B) 新组合(asin,price)首次计算: flow_asin LEFT ANTI 利润率已计算快照
INNER JOIN keepa(全量,无时间过滤)
A ∪ B → 按 (asin, price) 去重保留 asin_crawl_date 最新
→ LEFT JOIN 分类 → 派生 source_month / part_key → 写 PG {site}_asin_profit_rate_calc
【近 3 月窗口】
通过 MySQL workflow_everyday.MAX(report_date) WHERE date_type='month' AND page='流量选品'
向前推 3 个月(参考 export_keepa_asin_del.py)
【已计算判断】
hive dim_asin_profit_rate_info 全量(只保留最新分区,按 site_name 过滤即可),
按 (asin, price) LEFT ANTI 剔除
【keepa 来源】
hive dim_keepa_asin_info 只保留最新分区,按 site_name 过滤即可
执行示例: spark-submit export_need_profit_rate.py us 2026-06-15
"""
import os
import sys
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
sys.path.append(os.path.dirname(sys.path[0]))
......@@ -23,15 +33,13 @@ from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from utils.DorisHelper import DorisHelper
START_MONTH = '2025-05'
class ExportNeedProfitRate(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info # 计算时间 yyyy-MM-dd
# keepa 增量过滤下限:date_info - 1 天(如 date_info=2026-05-13 → last_date_info=2026-05-12)
# keepa 增量过滤下限:date_info - 1 天
self.last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
self.spark = SparkUtil.get_spark_session(
f"{self.__class__.__name__}: {self.site_name} {self.date_info}"
......@@ -41,8 +49,33 @@ class ExportNeedProfitRate(object):
df_export = self.build_export_df()
self.write_to_pg(df_export)
def build_export_df(self):
# 1. Hive dwt_flow_asin 月维度,date_info >= 2025-05 所有月份
# ---------------- 公共数据池: flow_asin 近 3 月 ----------------
def _get_recent_3_months(self):
"""通过 MySQL workflow_everyday 拿最大 report_date 向前推 3 月(参考 export_keepa_asin_del.py)"""
sql_max_month = (
f"select MAX(report_date) as date_info from workflow_everyday "
f"where site_name = '{self.site_name}' and date_type = 'month' and page = '流量选品'"
)
print(f"sql_max_month=\n{sql_max_month}")
mysql_con = DBUtil.get_connection_info('mysql', self.site_name)
max_date_info = SparkUtil.read_jdbc_query(
session=self.spark, url=mysql_con['url'],
pwd=mysql_con['pwd'], username=mysql_con['username'], query=sql_max_month,
).collect()[0]['date_info']
print(f"workflow_everyday 最大 report_date: {max_date_info}")
base_dt = datetime.strptime(str(max_date_info), '%Y-%m')
months_3 = [(base_dt - relativedelta(months=i)).strftime('%Y-%m') for i in range(3)]
print(f"近 3 月窗口: {months_3}")
return months_3
def _build_flow_asin_pool(self):
"""flow_asin 池: Hive dwt_flow_asin 近 3 月 ∪ Doris {site}_flow_asin_30day
→ (asin, price) 去重保留 asin_crawl_date 最新"""
months_3 = self._get_recent_3_months()
months_in = ",".join([f"'{m}'" for m in months_3])
# 1. Hive dwt_flow_asin 近 3 月
sql_dwt = f"""
SELECT asin,
asin_price AS price,
......@@ -51,7 +84,7 @@ class ExportNeedProfitRate(object):
FROM dwt_flow_asin
WHERE site_name = '{self.site_name}'
AND date_type = 'month'
AND date_info >= '{START_MONTH}'
AND date_info IN ({months_in})
AND asin_price > 0
"""
print(f"sql_dwt =\n{sql_dwt}")
......@@ -59,8 +92,7 @@ class ExportNeedProfitRate(object):
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(20,2)')) \
.withColumn('asin_crawl_date', F.to_timestamp(F.col('asin_crawl_date')))
# 2. Doris dwt.{site}_flow_asin_30day 所有相关 ASIN
# spark_import_with_connector 不支持 WHERE 下推,price > 0 过滤改到 Spark 端
# 2. Doris dwt.{site}_flow_asin_30day(保留: 月数据可能延迟,30day 兜底)
table_identifier = f"dwt.{self.site_name}_flow_asin_30day"
read_fields = "asin,price,category_first_id,asin_crawl_date"
df_doris = DorisHelper.spark_import_with_connector(self.spark, table_identifier, read_fields) \
......@@ -69,25 +101,29 @@ class ExportNeedProfitRate(object):
.withColumn('asin_crawl_date', F.col('asin_crawl_date').cast('timestamp')) \
.select('asin', 'price', 'category_first_id', 'asin_crawl_date')
# 3. union + 按 (asin, price) 去重保留 asin_crawl_date 最新
# 3. union + (asin, price) 去重保留最新 asin_crawl_date
df_flow = df_dwt.unionByName(df_doris).repartition(40, 'asin', 'price')
window = Window.partitionBy('asin', 'price').orderBy(F.col('asin_crawl_date').desc_nulls_last())
df_flow = df_flow.withColumn('rk', F.row_number().over(window)) \
.filter('rk = 1') \
.drop('rk') \
.cache()
print(f"flow_asin 池数据量(去重后): {df_flow.count():,}")
return df_flow
# 4. 分类名 LEFT JOIN
sql_cate = f"""
SELECT category_first_id, en_name AS category
FROM dim_bsr_category_tree
WHERE site_name = '{self.site_name}' AND nodes_num = 2
"""
df_cate = self.spark.sql(sqlQuery=sql_cate)
# ---------------- keepa 数据源 ----------------
# 5. keepa 当日增量 INNER JOIN
# keepa 表已整合为单分区快照,用 updated_time > last_date_info 筛"近一天更新"的增量
# weight 取数规则:优先 item_weight;item_weight <= 0 时用 package_weight 兜底;都不大于 0 → 过滤
def _read_keepa(self, only_recent_update):
"""从 Hive dim_keepa_asin_info 读 keepa
only_recent_update=True : 仅近 1 天更新(updated_time >= last_date_info), 路径 A 用
only_recent_update=False: 全量(keepa 历史就有的 asin), 路径 B 用
过滤规则一致: 包装长宽高 > 0, weight 取 item_weight 优先 / package_weight 兜底
"""
where_clause = (
f"WHERE site_name = '{self.site_name}' AND updated_time >= '{self.last_date_info}'"
if only_recent_update
else f"WHERE site_name = '{self.site_name}'"
)
sql_keepa = f"""
SELECT asin, package_length, package_width, package_height,
CASE WHEN item_weight > 0 THEN item_weight
......@@ -95,18 +131,65 @@ class ExportNeedProfitRate(object):
ELSE NULL
END AS weight
FROM dim_keepa_asin_info
WHERE site_name = '{self.site_name}' AND updated_time >= '{self.last_date_info}'
{where_clause}
"""
df_keepa = self.spark.sql(sqlQuery=sql_keepa) \
print(f"sql_keepa (only_recent_update={only_recent_update}) =\n{sql_keepa}")
return self.spark.sql(sqlQuery=sql_keepa) \
.filter((F.col('package_length') > 0) &
(F.col('package_width') > 0) &
(F.col('package_height') > 0) &
(F.col('weight') > 0)) \
.repartition(40, 'asin')
df_result = df_flow \
# ---------------- 已计算过利润率的快照(路径 B 用作 LEFT ANTI 基准) ----------------
def _read_profit_rate_latest_snapshot(self):
"""读 hive dim_asin_profit_rate_info(全站点过滤),按 (asin, price) 去重,作为"已计算过"基准"""
sql_profit = f"""
SELECT asin, price
FROM dim_asin_profit_rate_info
WHERE site_name = '{self.site_name}'
"""
print(f"sql_profit =\n{sql_profit}")
return self.spark.sql(sqlQuery=sql_profit) \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(20,2)')) \
.dropDuplicates(['asin', 'price']) \
.repartition(40, 'asin', 'price')
# ---------------- 主流程 ----------------
def build_export_df(self):
df_flow = self._build_flow_asin_pool()
# 路径 A: flow_asin INNER keepa(近 1 天更新)
df_keepa_recent = self._read_keepa(only_recent_update=True)
df_a = df_flow.join(df_keepa_recent, on='asin', how='inner')
# 路径 B: flow_asin LEFT ANTI 已算过 INNER keepa(全量)
df_profit_existing = self._read_profit_rate_latest_snapshot()
df_flow_new = df_flow.join(df_profit_existing, on=['asin', 'price'], how='left_anti')
df_keepa_all = self._read_keepa(only_recent_update=False)
df_b = df_flow_new.join(df_keepa_all, on='asin', how='inner')
# A ∪ B: (asin, price) 去重保留 asin_crawl_date 最新
df_union = df_a.unionByName(df_b).repartition(40, 'asin', 'price')
window = Window.partitionBy('asin', 'price').orderBy(F.col('asin_crawl_date').desc_nulls_last())
df_need = df_union.withColumn('rk', F.row_number().over(window)) \
.filter('rk = 1') \
.drop('rk') \
.cache()
print(f"A ∪ B 合并去重后数据量: {df_need.count():,}")
# 分类名 LEFT JOIN
sql_cate = f"""
SELECT category_first_id, en_name AS category
FROM dim_bsr_category_tree
WHERE site_name = '{self.site_name}' AND nodes_num = 2
"""
df_cate = self.spark.sql(sqlQuery=sql_cate)
df_result = df_need \
.join(df_cate, on='category_first_id', how='left') \
.join(df_keepa, on='asin', how='inner') \
.withColumn('source_month', F.date_format(F.col('asin_crawl_date'), 'yyyy-MM')) \
.withColumn('part_key', F.ntile(50).over(Window.orderBy(F.rand()))) \
.select(
......@@ -116,9 +199,10 @@ class ExportNeedProfitRate(object):
).cache()
count = df_result.count()
print(f"待计算利润率数据量{count:,}")
print(f"待计算利润率数据量: {count:,}")
df_result.show(10, truncate=False)
df_flow.unpersist()
df_need.unpersist()
return df_result
def write_to_pg(self, df_export):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment