Commit f966e87d by hejiangming

月搜索词利润率重新计算代码 用利润率表计算得出月搜索词利润率 重新写回 dwt_aba_st_analytics

parent 6d492287
"""
业务背景:
月搜索词报表(dwt_aba_st_analytics)中的利润率字段(gross_profit_fee_sea/gross_profit_fee_air)是月批次跑完后固化的。
但利润率的计算依赖Keepa包裹数据(长宽高重量),Keepa数据是分批采集、持续补充的,
导致月批次跑完后新补的Keepa数据对应的ASIN利润率无法体现在搜索词报表中,与流量选品等其他页面产生数据差异。
本任务从月批次中抽取利润率聚合逻辑,每日用最新的dim_asin_profit_rate_info重新计算搜索词利润率均值,
覆盖更新dwt_aba_st_analytics中的gross_profit_fee_sea和gross_profit_fee_air两个字段。
上游数据:
dwd_st_asin_measure — 搜索词和ASIN对应关系
dim_asin_detail — ASIN详情 ASIN价格
dim_asin_profit_rate_info — ASIN利润率(每日更新,提供ocean_profit/air_profit)
dwt_aba_st_analytics — 月搜索词报表(读取已有数据,替换利润率字段后覆盖写回)
下游数据:
dwt_aba_st_analytics — 覆盖更新gross_profit_fee_sea和gross_profit_fee_air字段
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
class DwtStProfitRateDaily(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
# self.hive_tb = "tmp_dwt_st_profit_rate_test"
self.hive_tb = "dwt_aba_st_analytics"
self.spark = SparkUtil.get_spark_session(
f"DwtStProfitRateDaily: {self.site_name}, {self.date_type}, {self.date_info}"
)
# 一些不涵盖month_old的分区,重定义成month,其他正常(与dwt_aba_st_analytics保持一致)
self.spe_date_type = 'month' if 'month_old' == self.date_type else self.date_type
self.df_st_asin_measure = self.spark.sql(f"select 1+1;")
self.df_asin_detail = self.spark.sql(f"select 1+1;")
self.df_asin_profit_rate = self.spark.sql(f"select 1+1;")
self.df_profit_avg = self.spark.sql(f"select 1+1;")
self.df_existing = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
# 1. 读取搜索词↔ASIN关联关系
# 作用:确定每个搜索词下有哪些ASIN参与利润率计算
# 范围:亚马逊搜索结果前三页所有类型(zr/sp/sb/ac/bs/er/tr)的ASIN
sql = f"""
select search_term, asin
from dwd_st_asin_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_st_asin_measure = self.spark.sql(sqlQuery=sql).repartition(80, 'asin').cache()
print("df_st_asin_measure:")
self.df_st_asin_measure.show(10, truncate=True)
# 2. 读取ASIN当前售价
# 作用:asin_price是关联利润率的匹配键(同一ASIN不同价格对应不同利润率)
sql = f"""
select asin, asin_price
from dim_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.spe_date_type}'
and date_info = '{self.date_info}'
"""
self.df_asin_detail = self.spark.sql(sqlQuery=sql).repartition(80, 'asin').cache()
print("df_asin_detail:")
self.df_asin_detail.show(10, truncate=True)
# 3. 读取ASIN利润率(每日最新)
# 作用:提供每个ASIN在特定价格下的海运/空运利润率
# 说明:不过滤date_info,因为该表只保留最新一天的分区(历史分区会被删除)
# dropDuplicates:确保同一asin+价格只有一条记录(与原始逻辑一致)
sql = f"""
select asin, price as asin_price, ocean_profit, air_profit
from dim_asin_profit_rate_info
where site_name = '{self.site_name}'
"""
self.df_asin_profit_rate = self.spark.sql(sqlQuery=sql) \
.repartition(80, 'asin') \
.dropDuplicates(['asin', 'asin_price']).cache()
print("df_asin_profit_rate:")
self.df_asin_profit_rate.show(10, truncate=True)
# 4. 读取dwt_aba_st_analytics已有数据(用于后续替换利润率字段)
# 始终从正式表读取,因为测试时测试表是空的,源数据要从正式表拿
sql = f"""
select * from dwt_aba_st_analytics
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_existing = self.spark.sql(sqlQuery=sql).cache()
# existing_count = self.df_existing.count()
# print(f"dwt_aba_st_analytics 已有数据量: {existing_count}")
print(f"dwt_aba_st_analytics ")
self.df_existing.show(10, truncate=True)
def handle_data(self):
# 三表join:df_st_asin_measure left join ASIN价格 left join 利润率
# 为什么用left join:
# - 不是所有ASIN都有价格
# - 不是所有ASIN都有利润率
# 没匹配到的利润率为NULL,avg时自动忽略
df_joined = self.df_st_asin_measure.join(
self.df_asin_detail, on=['asin'], how='left'
).join(
self.df_asin_profit_rate, on=['asin', 'asin_price'], how='left'
)
# 按搜索词聚合,计算利润率均值
# avg会自动忽略NULL值,只对有利润率的ASIN求均值
self.df_profit_avg = df_joined.groupby('search_term').agg(
F.round(F.avg("ocean_profit"), 4).alias("gross_profit_fee_sea_new"),
F.round(F.avg("air_profit"), 4).alias("gross_profit_fee_air_new")
).repartition(80, 'search_term').cache()
print("利润率均值计算结果:")
self.df_profit_avg.show(10, truncate=True)
# # ========================================
# # 对比新旧利润率,打印变化的搜索词
# df_old_profit = self.df_existing.select(
# "search_term",
# F.col("gross_profit_fee_sea").alias("old_sea"),
# F.col("gross_profit_fee_air").alias("old_air")
# )
# df_diff = df_old_profit.join(
# self.df_profit_avg, on=['search_term'], how='inner'
# ).filter(
# # 筛选出有变化的记录:值不同,或者一边有值一边为NULL
# (F.col("old_sea") != F.col("gross_profit_fee_sea_new"))
# | (F.col("old_air") != F.col("gross_profit_fee_air_new"))
# | (F.col("old_sea").isNull() & F.col("gross_profit_fee_sea_new").isNotNull())
# | (F.col("old_air").isNull() & F.col("gross_profit_fee_air_new").isNotNull())
# | (F.col("old_sea").isNotNull() & F.col("gross_profit_fee_sea_new").isNull())
# | (F.col("old_air").isNotNull() & F.col("gross_profit_fee_air_new").isNull())
# )
# diff_count = df_diff.count()
# print(f"利润率发生变化的搜索词数量: {diff_count}")
# print("变化明细(前20条):")
# df_diff.select(
# "search_term", "old_sea", "gross_profit_fee_sea_new", "old_air", "gross_profit_fee_air_new"
# ).show(20, truncate=False)
#
# # 指定搜索词查看变化明细
# print("=== 指定搜索词 white corset 变化明细 ===")
# df_old_profit.join(
# self.df_profit_avg, on=['search_term'], how='inner'
# ).filter(
# F.col("search_term") == "white corset"
# ).select(
# "search_term", "old_sea", "gross_profit_fee_sea_new", "old_air", "gross_profit_fee_air_new"
# ).show(truncate=False)
# print("=== 指定搜索词 christmas pajamas 变化明细 ===")
# df_old_profit.join(
# self.df_profit_avg, on=['search_term'], how='inner'
# ).filter(
# F.col("search_term") == "christmas pajamas"
# ).select(
# "search_term", "old_sea", "gross_profit_fee_sea_new", "old_air", "gross_profit_fee_air_new"
# ).show(truncate=False)
# ========================================
# 用新的利润率替换已有数据中的旧值
# 先drop旧的利润率字段,再join新的利润率,最后重命名
self.df_save = self.df_existing.drop(
"gross_profit_fee_sea", "gross_profit_fee_air"
).join(
self.df_profit_avg, on=['search_term'], how='left'
).withColumnRenamed(
"gross_profit_fee_sea_new", "gross_profit_fee_sea"
).withColumnRenamed(
"gross_profit_fee_air_new", "gross_profit_fee_air"
)
self.df_st_asin_measure.unpersist()
self.df_asin_detail.unpersist()
self.df_asin_profit_rate.unpersist()
self.df_existing.unpersist()
def save_data(self):
# 保证字段顺序与正式表一致
hive_columns = self.spark.sql(f"select * from dwt_aba_st_analytics limit 0").columns
self.df_save = self.df_save.select(*hive_columns).repartition(10).cache()
new_count = self.df_save.count()
old_count = self.spark.sql(f"""
select count(1) as cnt from dwt_aba_st_analytics
where site_name='{self.site_name}'
and date_type='{self.date_type}'
and date_info='{self.date_info}'
""").collect()[0]['cnt']
print(f"正式表旧数据量: {old_count}")
print(f"新数据量: {new_count}")
if new_count != old_count:
raise RuntimeError(
f"数据量校验失败!新 {new_count} != 旧 {old_count},终止写入,请人工检查。"
)
print("数据量校验通过,开始写入...")
# 写入目标表(测试时写测试表,正式时写正式表)
partition_dict = {
"site_name": self.site_name,
"date_type": self.date_type,
"date_info": self.date_info
}
hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict)
HdfsUtils.delete_hdfs_file(hdfs_path)
partition_by = list(partition_dict.keys())
self.df_save.write.saveAsTable(
name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by
)
print(f"写入完成,目标表: {self.hive_tb},路径: {hdfs_path}")
print("利润率更新完成!")
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点,如 us/uk/de
date_type = sys.argv[2] # 参数2:类型,如 month/month_week
date_info = sys.argv[3] # 参数3:年-月,如 2026-03
handle_obj = DwtStProfitRateDaily(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
# date_type 每日执行 date_type要为month 因为月搜索词是month分区
# 只有US 有利润率
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment