Commit 957e48f8 by hejiangming

店铺新增字段计算 导出增加判断 空分区不执行

parent dac2671d
......@@ -21,10 +21,12 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil, DateTypes
from pyspark.sql.types import StringType, IntegerType, DoubleType
from pyspark.sql.window import Window
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_udf import udf_get_package_quantity
from yswg_utils.common_udf import udf_new_asin_flag
from yswg_utils.common_udf import udf_parse_amazon_orders # 功能1:解析爬虫"3K+ bought in past month"为整数月销
from utils.db_util import DBUtil
......@@ -48,8 +50,8 @@ class DwtFbBaseReport(object):
self.spark = SparkUtil.get_spark_session(app_name)
# 获取不同维度日期下的计算日期YYYY-MM-DD
self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info)
self.last_month = CommonUtil.get_month_offset(date_info, -1)
self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info) # cal_date:当前周期对应的具体日期(如月度 2023-08 → 2023-08-31),用于判断是否新品
self.last_month = CommonUtil.get_month_offset(date_info, -1) # last_month:上个月(如 2023-08 → 2023-07),用于计算 Feedback 环比变化率
# 全局df初始化
self.df_fb_feedback = self.spark.sql(f"select 1+1;")
......@@ -61,11 +63,18 @@ class DwtFbBaseReport(object):
self.df_fb_asin_detail = self.spark.sql(f"select 1+1;")
self.df_self_seller_id = self.spark.sql(f"select 1+1;")
self.df_seller_account = self.spark.sql(f"select 1+1;")
# 功能1-6:流量选品数据 — account_id + 月销 + 履约方式 + 大类排名 + 新品标记 + NSR/BSR上榜日期 + 价格
self.df_flow_asin = self.spark.sql(f"select 1+1;")
self.df_home_asin = self.spark.sql(f"select 1+1;")
# 功能6:利润率 — ASIN+价格维度的海运/空运利润率
self.df_profit_rate = self.spark.sql(f"select 1+1;")
# 初始化UDF函数
self.udf_new_asin_flag = F.udf(udf_new_asin_flag, IntegerType())
self.u_judge_package_quantity = F.udf(udf_get_package_quantity, IntegerType())
self.u_get_business_val = F.udf(self.get_business_val, StringType())
self.udf_new_asin_flag = F.udf(udf_new_asin_flag, IntegerType()) # # 判断是否新品
self.u_judge_package_quantity = F.udf(udf_get_package_quantity, IntegerType()) # # 从标题提取打包数量
self.u_get_business_val = F.udf(self.get_business_val, StringType()) # 解析 seller_address 字段
# 功能1:解析爬虫 buy_data 字段(如 "3K+ bought in past month" → 3000),用于首页销量求和
self.u_parse_amazon_orders = F.udf(udf_parse_amazon_orders, IntegerType())
# 解析seller_address字段,获取卖家公司数据
@staticmethod
......@@ -78,7 +87,7 @@ class DwtFbBaseReport(object):
if p.startswith(key):
# Business Address: 拼接后续所有内容
if key in ("Business Address", "Geschäftsadresse"):
return " ".join(parts[i + 1:]).strip()
return " ".join(parts[i + 1:]).strip() # # 地址取后续所有内容拼接 因为地址可能有多段(街道、城市、邮编),用 |-| 分隔,不能只取一段
# 其他key: 只取下一个
elif i + 1 < len(parts):
return parts[i + 1].strip()
......@@ -100,7 +109,9 @@ class DwtFbBaseReport(object):
round((count_30_day_num - last_30_day_num) / last_30_day_num, 4) as count_30_day_rate,
round((count_1_year_num - last_1_year_num) / last_1_year_num, 4) as count_1_year_rate,
round((count_lifetime_num - last_lifetime_num) / last_lifetime_num, 4) as count_life_time_rate,
seller_rating
seller_rating,
cur_fd.feedback_histogram,
cur_fd.metadata_json
from
(
select
......@@ -112,6 +123,8 @@ class DwtFbBaseReport(object):
country_name as fb_country_name,
seller_address,
seller_rating,
feedback_histogram,
metadata_json,
date_format(updated_at, 'yyyy-MM-dd HH:mm:ss') as fb_crawl_date
from ods_seller_account_feedback
where site_name = '{self.site_name}'
......@@ -134,12 +147,18 @@ class DwtFbBaseReport(object):
) last_fd
on cur_fd.seller_id = last_fd.seller_id
"""
# 子查询 cur_fd — 读本月数据 length(seller_id) > 2 过滤无效卖家ID
# 子查询 last_fd — date_info = '{self.last_month} 读上月数据 过滤无效卖家ID
# cur_fd LEFT JOIN last_fd ON cur_fd.seller_id = last_fd.seller_id 本月新出现的卖家,上月没有记录 → last_fd 那边是 null 用 LEFT JOIN 保留这些新卖家,变化率字段为 null(表示无法计算)
# 外层 SELECT — 计算三个环比变化率 公式:(本月值 - 上月值)/ 上月值 = 环比变化率,保留4位小数
self.df_fb_feedback = self.spark.sql(sqlQuery=sql)
self.df_fb_feedback = self.df_fb_feedback.drop_duplicates(['seller_id']).cache()
# 解析seller_rating: 格式为"30天|-|90天|-|1年|-|历史",切分后转double,-1改为0
# 解析seller_rating: 格式为"30天|-|90天|-|1年|-|历史",切分后转double,-1改为0。-1.0 代表该时间段无评分数据,统一替换成 0.0,避免负值干扰后续计算
# 注意:cache 必须放在所有 withColumn 之后,否则 cache 的是不含 rating_*_num 4 个新列的版本,
# 后续每次访问都要重算 withColumn(虽然是窄变换开销不大,但能避免就避免)
_rating_split = F.split(F.col("seller_rating"), r"\|-\|")
self.df_fb_feedback = self.df_fb_feedback \
.drop_duplicates(['seller_id']) \
.withColumn("rating_30_day_num",
F.when(_rating_split.getItem(0).cast(DoubleType()) == -1.0, F.lit(0.0))
.otherwise(_rating_split.getItem(0).cast(DoubleType()))) \
......@@ -151,7 +170,44 @@ class DwtFbBaseReport(object):
.otherwise(_rating_split.getItem(2).cast(DoubleType()))) \
.withColumn("rating_lifetime_num",
F.when(_rating_split.getItem(3).cast(DoubleType()) == -1.0, F.lit(0.0))
.otherwise(_rating_split.getItem(3).cast(DoubleType())))
.otherwise(_rating_split.getItem(3).cast(DoubleType()))) \
.withColumn("fb_star_5_pct",
F.when(F.col("feedback_histogram").isNull(), F.lit(-1.0))
.otherwise(F.coalesce(
F.round(F.get_json_object(F.col("feedback_histogram"), "$.365d.star5").cast(DoubleType()) / 100, 4),
F.lit(-1.0)
))) \
.withColumn("fb_star_4_pct",
F.when(F.col("feedback_histogram").isNull(), F.lit(-1.0))
.otherwise(F.coalesce(
F.round(F.get_json_object(F.col("feedback_histogram"), "$.365d.star4").cast(DoubleType()) / 100, 4),
F.lit(-1.0)
))) \
.withColumn("fb_star_3_pct",
F.when(F.col("feedback_histogram").isNull(), F.lit(-1.0))
.otherwise(F.coalesce(
F.round(F.get_json_object(F.col("feedback_histogram"), "$.365d.star3").cast(DoubleType()) / 100, 4),
F.lit(-1.0)
))) \
.withColumn("fb_star_2_pct",
F.when(F.col("feedback_histogram").isNull(), F.lit(-1.0))
.otherwise(F.coalesce(
F.round(F.get_json_object(F.col("feedback_histogram"), "$.365d.star2").cast(DoubleType()) / 100, 4),
F.lit(-1.0)
))) \
.withColumn("fb_star_1_pct",
F.when(F.col("feedback_histogram").isNull(), F.lit(-1.0))
.otherwise(F.coalesce(
F.round(F.get_json_object(F.col("feedback_histogram"), "$.365d.star1").cast(DoubleType()) / 100, 4),
F.lit(-1.0)
))) \
.withColumn("fb_web_asin_num",
# metadata_json.totalResultCount 更准确(实时爬取的真实总数),有值优先用;无值回退原字段 num
F.when(
F.get_json_object(F.col("metadata_json"), "$.totalResultCount").isNotNull(),
F.get_json_object(F.col("metadata_json"), "$.totalResultCount").cast(IntegerType())
).otherwise(F.col("fb_web_asin_num"))) \
.cache() # 在所有 withColumn 之后再 cache,缓存的是含 4 个 rating_*_num + 5 个 fb_star_*_pct 的最终版本
print(sql)
......@@ -161,6 +217,7 @@ class DwtFbBaseReport(object):
select seller_id, asin from ods_seller_asin_account
where site_name='{self.site_name}' and date_format(created_at,'yyyy-MM-dd') <= '{self.cal_date}'
"""
# 取该站点下、创建时间不超过计算日期的账号-ASIN 映射。为什么要过滤 created_at? 避免把未来才录入的数据算进来,保证数据口径与当期一致
self.df_fb_asin = self.spark.sql(sqlQuery=sql)
self.df_fb_asin = self.df_fb_asin.drop_duplicates(['seller_id', 'asin'])
print(sql)
......@@ -181,11 +238,14 @@ class DwtFbBaseReport(object):
and date_info = '{self.date_info}'
and row_num <= 20
"""
# 只取每个卖家排名前20的 ASIN(row_num <= 20),后续用于计算该卖家 Top20 的均价、均分、均评论数。
self.df_top20_asin = self.spark.sql(sqlQuery=sql)
self.df_top20_asin = self.df_top20_asin.drop_duplicates(['seller_id', 'asin'])
print(sql)
# 获取dim_cal_asin_history提取launch_time用于计算是否新品
# 功能2新增:补 asin_buy_box_seller_type(履约方式,3=FBM)和 asin_rank(大类排名),用于 FBM 占比计算
# 这两个字段已在 dim_cal_asin_history_detail 里,原 SQL 没取出来,本次补上
print("获取 dim_cal_asin_history")
sql = f"""
select
......@@ -197,10 +257,13 @@ class DwtFbBaseReport(object):
asin_total_comments,
asin_weight,
asin_volume,
asin_launch_time
asin_launch_time,
asin_buy_box_seller_type,
asin_rank
from dim_cal_asin_history_detail
where site_name = '{self.site_name}'
"""
# (ASIN 详情历史维表) 取 ASIN 的上架时间(asin_launch_time)等详情,用于后续判断是否新品(上架时间距计算日期 ≤180 天则为新品)
self.df_asin_history = self.spark.sql(sqlQuery=sql)
print(sql)
......@@ -209,6 +272,7 @@ class DwtFbBaseReport(object):
sql = f"""
select asin, parent_asin from dim_asin_variation_info where site_name='{self.site_name}' and asin != parent_asin
"""
# 只取 asin != parent_asin 的记录,即有父 ASIN 的子变体。单品(自己就是父)不在这张表里,后续会用 asin 本身填充 parent_asin。
self.df_asin_parent = self.spark.sql(sqlQuery=sql)
print(sql)
......@@ -218,10 +282,13 @@ class DwtFbBaseReport(object):
select seller_id, account_name, id from ods_seller_account_syn where site_name='{self.site_name}'
"""
self.df_seller_account = self.spark.sql(sqlQuery=sql)
# 进行去重
self.df_seller_account = self.df_seller_account.orderBy(self.df_seller_account.id.desc())
self.df_seller_account = self.df_seller_account.drop_duplicates(['seller_id'])
self.df_seller_account = self.df_seller_account.drop('id')
# 进行去重 同一个 seller_id 可能有多条账号记录,先按 id 倒序,再去重,保留最新的那条 account_name。去重后把 id 字段删掉,不需要带入后续计算。
# orderBy+drop_duplicates 在分布式环境下不可靠(shuffle 会打乱顺序),改用 Window row_number 保证取 id 最大的那行
w_seller = Window.partitionBy('seller_id').orderBy(F.col('id').desc())
self.df_seller_account = self.df_seller_account \
.withColumn('_rn', F.row_number().over(w_seller)) \
.filter(F.col('_rn') == 1) \
.drop('_rn', 'id')
print(sql)
# 获取mysql:selection.accounts ,用于排除公司内部店铺
......@@ -229,6 +296,7 @@ class DwtFbBaseReport(object):
sql = f"""
select seller_id, 1 as is_self_fb from (select distinct seller_id from selection.accounts) t1
"""
# 这张表不在 Hive 里,直接通过 JDBC 读 MySQL。只要在这张表里出现的 seller_id,后续就标记 is_self_fb=1(公司自己的店铺),供业务端过滤用,避免把内部数据混入外部市场分析
conn_info = DBUtil.get_connection_info("mysql", "us")
self.df_self_seller_id = SparkUtil.read_jdbc_query(
session=self.spark,
......@@ -238,19 +306,97 @@ class DwtFbBaseReport(object):
query=sql
)
# =========================================================
# 功能1-6:从 dwt_flow_asin 读取所有计算所需字段
# =========================================================
# account_id :卖家ID,handle_fb_cal_agg 中 withColumnRenamed 为 seller_id
# asin_buy_box_seller_type:履约方式(功能2 FBM 判断,3=FBM)
# first_category_rank:大类排名(功能2 FBM 有效判断;注意 dwt_flow_asin 中 asin_rank=lit(None) 是 NULL,不能用)
# asin_is_new :是否新品(功能5/6 拆分新老品用)
# nsr_last_seen_at :NSR 最近上榜日期(功能3 按月度区间判断是否当月上榜)
# bsr_last_seen_at :BSR 最近上榜日期(功能4 同上)
# asin_price :ASIN 价格(功能6 利润率 join key,同一 ASIN 不同价格对应不同利润率)
# !!!调度依赖:dwt_flow_asin 必须在 dwt_fb_base_report 之前完成,否则会读到空分区
print("获取 dwt_flow_asin(功能1-6)")
sql = f"""
select account_id, asin, asin_bought_month,
asin_buy_box_seller_type,
first_category_rank,
asin_is_new,
nsr_last_seen_at,
bsr_last_seen_at,
asin_price
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and account_id is not null
"""
self.df_flow_asin = self.spark.sql(sqlQuery=sql)
# dwt_flow_asin 已按 asin 去重,drop_duplicates 防御性保证,避免下游 join 放大
# cache:handle_fb_cal_agg 中 df_flow_agg + df_profit_cal 两处使用,避免重复扫磁盘
self.df_flow_asin = self.df_flow_asin.drop_duplicates(['asin']).cache()
print(sql)
# 首页所有 ASIN 的原始 buy_data 字符串(如 "3K+ bought in past month")
# 不加 row_num 过滤:这张表本身就是首页全量数据(top20 那次读取是另一份切片)
# 带 id 字段是为了去重时按最新爬取保留:同一 (seller_id, asin) 可能爬虫重跑多次,id 最大 = 最新
print("获取 ods_asin_detail_product buy_data")
sql = f"""
select seller_id, asin, buy_data, id
from ods_asin_detail_product
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_home_asin = self.spark.sql(sqlQuery=sql)
# 同一 (seller_id, asin) 按 id 倒序保留最新一条,再 drop id(不再需要)
# 同 df_seller_account 原因,改用 Window row_number 确保保留 id 最大(最新爬取)的那行
w_home = Window.partitionBy('seller_id', 'asin').orderBy(F.col('id').desc())
self.df_home_asin = self.df_home_asin \
.withColumn('_rn', F.row_number().over(w_home)) \
.filter(F.col('_rn') == 1) \
.drop('_rn', 'id')
print(sql)
# 功能3/4 NSR/BSR 不再单独读取榜单历史表
# 改为在 handle_fb_cal_agg() 中直接对 df_flow_asin 的 nsr_last_seen_at/bsr_last_seen_at 做月度区间过滤
# =========================================================
# 功能6:利润率数据
# =========================================================
# 数据来源:dim_asin_profit_rate_info 提供 ASIN+价格维度的海运/空运利润率
# 表的设计:每天跑完后会删除上一天分区,理论上只保留最新一天数据
# 字段对齐:把 price 别名为 asin_price,与 df_fb_asin_detail 中的 asin_price 列对齐,便于后续 join
# join 条件用 ['asin', 'asin_price']:同一 ASIN 不同价格对应不同利润率(price-dependent),单 asin 维度 join 会错配
# !!!dropDuplicates(['asin','asin_price']):防御性去重,与 dwt/dwt_st_profit_rate_day.py 的处理一致
# 原因:表本身按 (asin, price) 去重保留最新 updated_time,但 SQL 没过滤 date_info,依赖"上一天分区
# 已删除"这个副作用。万一上一天分区删除失败/调度重叠,跨分区会出现同 (asin, asin_price) 重复行,
# 后续 avg(ocean_profit)/avg(air_profit) 会被污染(同一行重复计 N 次)
print("获取 dim_asin_profit_rate_info 利润率数据")
sql = f"""
select asin, price as asin_price, ocean_profit, air_profit
from dim_asin_profit_rate_info
where site_name = '{self.site_name}'
"""
self.df_profit_rate = self.spark.sql(sqlQuery=sql) \
.dropDuplicates(['asin', 'asin_price'])
print(sql)
def handle_fb_top_20(self):
# 基于每个卖家排名前20的 ASIN,计算这20个 ASIN 的均价、均评分、均评论数、新品数量,最终每个卖家汇总成一行
print("处理asin_detail_product的top20指标")
self.df_top20_asin = self.df_top20_asin. \
join(self.df_asin_history.select('asin', 'asin_launch_time'), on='asin', how='left')
# 上架时间和当前计算日期。内部逻辑是:上架时间距 cal_date ≤ 180 天则为新品,返回 1;否则返回 0
self.df_top20_asin = self.df_top20_asin.withColumn("is_asin_new",
self.udf_new_asin_flag(F.col('asin_launch_time'),
F.lit(self.cal_date)))
self.df_top20_asin = self.df_top20_asin.groupby('seller_id').agg(
F.lit(self.cal_date))) #判断是否新品
self.df_top20_asin = self.df_top20_asin.groupby('seller_id').agg( # 按 seller_id 分组,对 Top20 ASIN 做聚合:
F.avg('price').alias('top_20_avg_price'),
F.avg('rating').alias('top_20_avg_rating'),
F.avg('total_comments').alias('top_20_avg_total_comments'),
F.sum('is_asin_new').alias('top_20_new_asin_num')
F.avg('total_comments').alias('top_20_avg_total_comments'), # 均价、均评分、均评论数:直接 avg
F.sum('is_asin_new').alias('top_20_new_asin_num') # 新品数量:is_asin_new 是 0/1 标记,直接 sum 就等于新品个数
)
def handle_fb_cal_agg(self):
......@@ -264,12 +410,13 @@ class DwtFbBaseReport(object):
).join(
self.df_asin_parent, on='asin', how='left'
)
# 三步 join 建立卖家 → ASIN → ASIN详情的明细宽表
# 计算是否新品
df_fb_join = df_fb_join.withColumn("is_asin_new",
self.udf_new_asin_flag(F.col('asin_launch_time'), F.lit(self.cal_date)))
# 计算店铺-asin的打包数量
# 计算店铺-asin的打包数量 # 从标题提取打包数量(如"Pack of 3" → 3)
df_fb_join = df_fb_join.withColumn('asin_package_quantity', self.u_judge_package_quantity(F.col('asin_title')))
# 打包数量标记 打包数量>=2的商品数标识
......@@ -280,37 +427,24 @@ class DwtFbBaseReport(object):
.otherwise(F.col('parent_asin')))
self.df_fb_asin_detail = df_fb_join.cache()
fb_counts_agg = self.df_fb_asin_detail.groupby(['seller_id']).agg(
F.count('asin').alias('fb_asin_total'),
F.sum('is_asin_new').alias('fb_new_asin_num'),
F.sum('is_pq_flag').alias('fb_pq_num')
)
# 计算新品比率
fb_counts_agg = fb_counts_agg.withColumn('fb_new_asin_rate',
F.round(F.col('fb_new_asin_num') / F.col('fb_asin_total'), 4))
# 计算打包数量比率
fb_counts_agg = fb_counts_agg.withColumn('fb_pq_rate',
F.round(F.col('fb_pq_num') / F.col('fb_asin_total'), 4))
fb_counts_agg = fb_counts_agg.select('seller_id', 'fb_new_asin_num', 'fb_pq_num',
'fb_asin_total', 'fb_new_asin_rate', 'fb_pq_rate')
# 注:原 fb_counts_agg(fb_asin_total / fb_new_asin_num / fb_pq_num + 新品率/打包率)已合并到
# 下方 df_seller_main_agg 主聚合中,不再单独计算。
# 计算店铺多数量占比 有变体asin数量/(有变体asin数量+单产品asin数量) 逻辑实现
# 计算多变体比率
df_variant_radio = self.df_fb_asin_detail.groupby(['seller_id', 'parent_asin']).agg(
F.count('asin').alias('asin_son_count')
F.count('asin').alias('asin_son_count') # 按 seller_id + parent_asin 聚合,数每个父品下有几个子变体
)
# 打上多变体标签 如果asin_son_count > 1则说明该店铺该asin存在多变体
df_variant_radio = df_variant_radio.withColumn('is_variant_flag',
F.when(F.col('asin_son_count') > 1, F.lit(1)))
# asin_son_count > 1 说明这个父品下有多个子变体,标记为 1。单品(son_count=1)为 null
# 按照seller_id再次聚合,得出多变体计算分子分母
df_variant_radio = df_variant_radio.groupby(['seller_id']).agg(
F.sum('is_variant_flag').alias('fb_more_variant_num'),
F.count('parent_asin').alias('fb_variant_asin_total')
F.sum('is_variant_flag').alias('fb_more_variant_num'), # 多变体父品数
F.count('parent_asin').alias('fb_variant_asin_total') # 父品总数(含单品)
)
# 得出多变体比率
......@@ -323,13 +457,132 @@ class DwtFbBaseReport(object):
'fb_variant_asin_total',
'fb_variant_rate')
# =========================================================
# 老指标聚合(来源:ods_seller_asin_account,保持不变)
# =========================================================
# df_fb_asin_detail 基于 ods_seller_asin_account,只算老指标,不涉及功能1-6
df_seller_main_agg = self.df_fb_asin_detail.groupby('seller_id').agg(
F.count('asin').alias('fb_asin_total'), # 该卖家总 ASIN 数(来自 ods_seller_asin_account)
F.sum('is_asin_new').alias('fb_new_asin_num'), # 新品数(0/1 求和)
F.sum('is_pq_flag').alias('fb_pq_num'), # 打包商品数
)
# 老指标的衍生比率:放在 groupBy 之外算,避免 agg 里嵌套 round/除法表达式
df_seller_main_agg = df_seller_main_agg \
.withColumn('fb_new_asin_rate',
F.round(F.col('fb_new_asin_num') / F.col('fb_asin_total'), 4)) \
.withColumn('fb_pq_rate',
F.round(F.col('fb_pq_num') / F.col('fb_asin_total'), 4))
# =========================================================
# 功能1-5:从流量选品 dwt_flow_asin 计算(ASIN 来源:account_id)
# =========================================================
# 功能3/4:nsr_last_seen_at/bsr_last_seen_at 落在 [月初, 下月月初) 内则视为当月上榜
# 未上榜的 ASIN is_nsr/is_bs = null,sum 自动忽略;整组未命中时 sum=null,coalesce→0
month_start = f"{self.date_info}-01"
next_month_start = f"{CommonUtil.get_month_offset(self.date_info, 1)}-01"
df_flow_with_flags = self.df_flow_asin \
.withColumn('is_nsr', F.when(
(F.col('nsr_last_seen_at') >= month_start) &
(F.col('nsr_last_seen_at') < next_month_start),
F.lit(1)
)) \
.withColumn('is_bs', F.when(
(F.col('bsr_last_seen_at') >= month_start) &
(F.col('bsr_last_seen_at') < next_month_start),
F.lit(1)
))
df_flow_agg = df_flow_with_flags.groupby('account_id').agg(
# fb_flow_asin_total:流量选品 ASIN 总数,作为功能2/3/4 占比的分母
F.count('asin').alias('fb_flow_asin_total'),
# 功能2 FBM
# FBM:asin_buy_box_seller_type = 3
# FBM 有效:FBM 且 first_category_rank <= 500000
# (dwt_flow_asin 中 asin_rank = lit(None) 是 NULL,大类排名必须用 first_category_rank)
F.sum(
F.when(F.col('asin_buy_box_seller_type') == 3, 1).otherwise(0)
).alias('fb_fbm_asin_num'),
F.sum(
F.when(
(F.col('asin_buy_box_seller_type') == 3) & (F.col('first_category_rank') <= 500000),
1
).otherwise(0)
).alias('fb_fbm_valid_asin_num'),
# 功能3 NSR / 功能4 BS
F.coalesce(F.sum('is_nsr'), F.lit(0)).alias('fb_nsr_asin_num'),
F.coalesce(F.sum('is_bs'), F.lit(0)).alias('fb_bs_asin_num'),
# 功能1 总销量:流量选品中该卖家所有 ASIN 月销之和
# 该卖家不在 dwt_flow_asin 时 left join 后无行,save_data 转 -1
F.sum('asin_bought_month').alias('fb_shop_total_sales_raw'),
# 功能5 新品销量分子:新品 ASIN 月销之和(asin_is_new=1,月销为 null 按 0 计入)
F.sum(
F.when(F.col('asin_is_new') == 1,
F.coalesce(F.col('asin_bought_month'), F.lit(0)))
.otherwise(F.lit(0))
).alias('fb_new_asin_sales_raw'),
# 功能5 新品数量占比分子:asin_is_new=1 的 ASIN 计数(来自 dwt_flow_asin)
F.sum(F.when(F.col('asin_is_new') == 1, F.lit(1)).otherwise(F.lit(0))).alias('fb_new_asin_count_raw'),
).withColumnRenamed('account_id', 'seller_id')
# =========================================================
# 功能1:首页销量 + 缺数据标记(保留独立:来源是 df_home_asin 不是 df_fb_asin_detail)
# =========================================================
# 解析 buy_data("3K+ bought in past month" → 3000)后按 seller_id 求和
# F.sum 自动忽略 null(等价于按 0 计入),符合需求"空值按 0 处理,计入求和"
# max(when isNull then 1 else 0):该卖家是否存在 buy_data 解析为 null 的 ASIN
# null_flag=1:前端在首页销量后追加"含无数据产品"标注,避免用户误以为销量低
df_home_sales = self.df_home_asin.withColumn(
'asin_bought_month', self.u_parse_amazon_orders(F.col('buy_data'))
).groupby('seller_id').agg(
F.sum('asin_bought_month').alias('fb_shop_home_sales_raw'),
F.max(
F.when(F.col('asin_bought_month').isNull(), F.lit(1)).otherwise(F.lit(0))
).alias('fb_shop_home_null_flag_raw')
)
# =========================================================
# 功能6:新品/老品平均利润率(来源:dwt_flow_asin)
# =========================================================
# 以 df_flow_asin 为基底(而非 df_fb_asin_detail),确保 ASIN 来自流量选品
# asin_price 来自 dwt_flow_asin,与 dim_asin_profit_rate_info.price 对齐,作为 join key
# 利润率为 null 的 ASIN:F.avg() 自动忽略 null,符合需求"利润率为空的 ASIN 完全排除"
# 有效样本=0 时 avg() 返回 null → save_data 转 -1000(不能用 -1,因为 -1=-100% 是真实亏损值)
df_profit_cal = self.df_flow_asin.join(
self.df_profit_rate, on=['asin', 'asin_price'], how='left'
).filter(
F.col('asin_bought_month') >= 50
)
# 新品利润率均值(asin_is_new=1,dwt_flow_asin 中的字段名)
df_new_profit = df_profit_cal.filter(F.col('asin_is_new') == 1) \
.groupby('account_id').agg(
F.round(F.avg('ocean_profit'), 4).alias('fb_new_ocean_profit_raw'),
F.round(F.avg('air_profit'), 4).alias('fb_new_air_profit_raw')
).withColumnRenamed('account_id', 'seller_id')
# 老品利润率均值(asin_is_new=0)
df_old_profit = df_profit_cal.filter(F.col('asin_is_new') == 0) \
.groupby('account_id').agg(
F.round(F.avg('ocean_profit'), 4).alias('fb_old_ocean_profit_raw'),
F.round(F.avg('air_profit'), 4).alias('fb_old_air_profit_raw')
).withColumnRenamed('account_id', 'seller_id')
# 合并计算结果
self.df_fb_agg = self.df_fb_feedback.join(
fb_counts_agg, on='seller_id', how='left'
df_seller_main_agg, on='seller_id', how='left' # 老指标(ods_seller_asin_account)
).join(
df_variant_radio, on='seller_id', how='left'
df_variant_radio, on='seller_id', how='left'
).join(
self.df_top20_asin, on='seller_id', how='left'
).join(
df_flow_agg, on='seller_id', how='left' # 功能1总销量 + 功能2 FBM + 功能3 NSR + 功能4 BS + 功能5 新品销量
).join(
df_home_sales, on='seller_id', how='left' # 功能1:首页销量
).join(
df_new_profit, on='seller_id', how='left' # 功能6:新品利润率
).join(
df_old_profit, on='seller_id', how='left' # 功能6:老品利润率
)
# 关联公司店铺df,并标记是否公司内部店铺
......@@ -370,7 +623,6 @@ class DwtFbBaseReport(object):
F.round('top_20_avg_price', 4).alias('top_20_avg_price'),
F.round('top_20_avg_rating', 4).alias('top_20_avg_rating'),
F.ceil('top_20_avg_total_comments').alias('top_20_avg_total_comments'),
F.col('top_20_new_asin_num'),
F.col('count_30_day_num'),
F.col('count_1_year_num'),
F.col('count_lifetime_num'),
......@@ -386,6 +638,9 @@ class DwtFbBaseReport(object):
F.col('fb_pq_rate'),
F.col('fb_pq_num'),
F.col('fb_crawl_date'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
F.col('top_20_new_asin_num'),
F.when(F.col('fb_country_name').isNull(), F.lit(0))
.when(F.col('fb_country_name') == self.site_name.upper(), F.lit(1))
.when(F.col('fb_country_name') == 'CN', F.lit(2))
......@@ -419,8 +674,6 @@ class DwtFbBaseReport(object):
.when(F.col('fb_new_asin_rate') <= 0.2, F.lit(4))
.when(F.col('fb_new_asin_rate') <= 0.5, F.lit(5))
.otherwise(F.lit(6)).alias('fb_new_asin_rate_type'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
F.lit(None).alias('usr_mask_type'),
F.lit(None).alias('usr_mask_progress'),
F.col('business_name'),
......@@ -429,20 +682,111 @@ class DwtFbBaseReport(object):
F.col('rating_90_day_num'),
F.col('rating_1_year_num'),
F.col('rating_lifetime_num'),
# ============== 功能1:销量展示 ==============
# 总销量:流量选品无该店铺数据时 sum=null → -1(前端展示"—")
# 有数据但销量为 0 → 保留 0(真实零销量,与"—"区分)
F.when(F.col('fb_shop_total_sales_raw').isNull(), F.lit(-1))
.otherwise(F.col('fb_shop_total_sales_raw'))
.alias('fb_shop_total_sales'),
# 首页销量:首页无产品 / 全部 buy_data 为 null 时 sum=null → 0
# 0 在业务上既是"无产品"也是"真实0销量"(语义等同),不需要用 -1 区分
F.coalesce(F.col('fb_shop_home_sales_raw'), F.lit(0)).alias('fb_shop_home_sales'),
# 首页缺数据标记:首页有 ASIN 的 buy_data 解析为 null → flag=1
# 卖家不在首页表(无产品)时 flag_raw=null → coalesce 为 0(无产品不需要标注)
F.coalesce(F.col('fb_shop_home_null_flag_raw'), F.lit(0)).alias('fb_shop_home_null_flag'),
# ============== 功能2:FBM 占比 ==============
# 卖家不在 dwt_flow_asin → left join 后整行 null → coalesce 为 -1
F.coalesce(F.col('fb_fbm_asin_num'), F.lit(-1)).alias('fb_fbm_asin_num'),
F.coalesce(F.col('fb_fbm_valid_asin_num'), F.lit(-1)).alias('fb_fbm_valid_asin_num'),
# 功能2 FBM 占比 = FBM 数 / 流量选品 ASIN 总数(分子分母均来自 dwt_flow_asin,口径一致)
# fb_flow_asin_total=null 或 0(卖家不在流量选品)→ -1("—")
F.when(
F.col('fb_flow_asin_total').isNull() | (F.col('fb_flow_asin_total') == 0), F.lit(-1.0)
).otherwise(
F.round(F.col('fb_fbm_asin_num') / F.col('fb_flow_asin_total'), 4)
).alias('fb_fbm_rate'),
# FBM 有效占比 = 有效 FBM 数 / FBM 数(分母用 FBM 数,不变)
F.when(
F.col('fb_fbm_asin_num').isNull() | (F.col('fb_fbm_asin_num') == 0), F.lit(-1.0)
).otherwise(
F.round(F.col('fb_fbm_valid_asin_num') / F.col('fb_fbm_asin_num'), 4)
).alias('fb_fbm_valid_rate'),
# ============== 功能3:Hot New 榜单 ==============
F.coalesce(F.col('fb_nsr_asin_num'), F.lit(-1)).alias('fb_nsr_asin_num'),
# NSR 占比 = NSR ASIN 数 / 流量选品 ASIN 总数
F.when(
F.col('fb_flow_asin_total').isNull() | (F.col('fb_flow_asin_total') == 0), F.lit(-1.0)
).otherwise(
F.round(F.col('fb_nsr_asin_num') / F.col('fb_flow_asin_total'), 4)
).alias('fb_nsr_rate'),
# ============== 功能4:Best Seller 榜单 ==============
F.coalesce(F.col('fb_bs_asin_num'), F.lit(-1)).alias('fb_bs_asin_num'),
# BS 占比 = BS ASIN 数 / 流量选品 ASIN 总数
F.when(
F.col('fb_flow_asin_total').isNull() | (F.col('fb_flow_asin_total') == 0), F.lit(-1.0)
).otherwise(
F.round(F.col('fb_bs_asin_num') / F.col('fb_flow_asin_total'), 4)
).alias('fb_bs_rate'),
# ============== 功能5:新品销量占比 ==============
# 分子:fb_new_asin_sales_raw(新品月销之和),无新品/月销全 null 时 = 0
# 分母:fb_shop_total_sales_raw(店铺总月销),null 或 0 时无法计算 → -1
# 分母 > 0 且分子 = 0 → 0.0(真实 0%)
F.when(
F.col('fb_shop_total_sales_raw').isNull() | (F.col('fb_shop_total_sales_raw') == 0),
F.lit(-1.0)
).otherwise(
F.round(F.col('fb_new_asin_sales_raw') / F.col('fb_shop_total_sales_raw'), 4)
).alias('fb_new_asin_sales_rate'),
# ============== 功能6:新品/老品平均利润率 ==============
# 占位值用 -1000(不是 -1):因为 -1 = -100% 是真实亏损值,必须用 -1000 区分"无样本"
# avg() 返回 null 的两种场景:
# ①该 seller 无符合条件 ASIN(filter 月销>=50 后无行)
# ②有 ASIN 但利润率全 null(avg 全忽略后无样本)
F.when(F.col('fb_new_ocean_profit_raw').isNull(), F.lit(-1000))
.otherwise(F.col('fb_new_ocean_profit_raw'))
.alias('fb_new_ocean_profit_rate'),
F.when(F.col('fb_new_air_profit_raw').isNull(), F.lit(-1000))
.otherwise(F.col('fb_new_air_profit_raw'))
.alias('fb_new_air_profit_rate'),
F.when(F.col('fb_old_ocean_profit_raw').isNull(), F.lit(-1000))
.otherwise(F.col('fb_old_ocean_profit_raw'))
.alias('fb_old_ocean_profit_rate'),
F.when(F.col('fb_old_air_profit_raw').isNull(), F.lit(-1000))
.otherwise(F.col('fb_old_air_profit_raw'))
.alias('fb_old_air_profit_rate'),
# ============== 功能5:新品数量占比(流量选品口径)==============
# 老字段 fb_new_asin_rate 来自 ods_seller_asin_account,保持不动
# 新字段 fb_flow_new_asin_rate 来自 dwt_flow_asin,分子分母口径一致
# fb_flow_asin_total=null → 不在流量选品 → -1("—");无新品 → 0.0
F.when(F.col('fb_flow_asin_total').isNull() | (F.col('fb_flow_asin_total') == 0), F.lit(-1.0))
.otherwise(F.round(F.col('fb_new_asin_count_raw') / F.col('fb_flow_asin_total'), 4))
.alias('fb_flow_new_asin_rate'),
# ============== 功能7:店铺综合评分(各星级占比)==============
# feedback_histogram JSON 值 0.0~1.0,×100 取整存为百分比整数(如 75)
# feedback_histogram 为 null 或 key 不存在时,用 -1 占位(Java 端转 null 返前端)
F.col('fb_star_5_pct'),
F.col('fb_star_4_pct'),
F.col('fb_star_3_pct'),
F.col('fb_star_2_pct'),
F.col('fb_star_1_pct'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
)
# CommonUtil.check_schema(self.spark, df_save, self.hive_tb)
print(f"清除hdfs目录中:{self.hdfs_path}")
HdfsUtils.delete_file_in_folder(self.hdfs_path)
df_save = df_save.repartition(1)
df_save = df_save.repartition(3)
partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
df_save.unpersist()
print("success")
# 输出数据集-asin_info
......@@ -479,7 +823,7 @@ class DwtFbBaseReport(object):
print(f"清除hdfs目录中:{hdfs_path_asin_info}")
HdfsUtils.delete_file_in_folder(hdfs_path_asin_info)
df_save_asin = df_save_asin.repartition(50)
df_save_asin = df_save_asin.repartition(20)
partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{hive_tb_name},分区为{partition_by}", )
df_save_asin.write.saveAsTable(name=hive_tb_name, format='hive', mode='append', partitionBy=partition_by)
......@@ -490,7 +834,12 @@ class DwtFbBaseReport(object):
self.handle_fb_top_20()
self.handle_fb_cal_agg()
self.save_data_report()
# report 写完后 df_fb_feedback 和 df_flow_asin 不再需要,立即释放
self.df_fb_feedback.unpersist()
self.df_flow_asin.unpersist()
self.save_data_asin_info()
# asin_info 写完后 df_fb_asin_detail 不再需要,立即释放
self.df_fb_asin_detail.unpersist()
if __name__ == '__main__':
......
......@@ -6,7 +6,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
......@@ -36,6 +36,15 @@ if __name__ == '__main__':
# 获取数据库连接
engine = DBUtil.get_db_engine(db_type, site_name)
# 导出前校验 Hive 分区是否有数据,避免空分区触发交换导致 PG 数据被清空
hive_partition_path = f"/home/big_data_selection/dwt/dwt_aba_st_analytics/site_name={site_name}/date_type={date_type}/date_info={date_info}"
hive_files = HdfsUtils.read_list(hive_partition_path)
if not hive_files:
print(f"[ERROR] Hive 分区无数据文件,路径:{hive_partition_path},跳过导出,请先检查 DWT 计算任务是否正常写入!")
engine.dispose()
sys.exit(1)
print(f"Hive 分区文件数:{len(hive_files)},路径:{hive_partition_path},继续导出")
# 保证幂等性,先删除原始表同周期的数据
sql = f"""
drop table if exists {export_tb};
......@@ -92,7 +101,36 @@ if __name__ == '__main__':
"rating_30_day_num",
"rating_90_day_num",
"rating_1_year_num",
"rating_lifetime_num"
"rating_lifetime_num",
# 功能1:销量展示
"fb_shop_total_sales",
"fb_shop_home_sales",
"fb_shop_home_null_flag",
# 功能2:FBM 占比
"fb_fbm_asin_num",
"fb_fbm_valid_asin_num",
"fb_fbm_rate",
"fb_fbm_valid_rate",
# 功能3:Hot New 榜单
"fb_nsr_asin_num",
"fb_nsr_rate",
# 功能4:Best Seller 榜单
"fb_bs_asin_num",
"fb_bs_rate",
# 功能5:新品销量占比 / 新品数量占比(流量选品口径)
"fb_new_asin_sales_rate",
# 功能6:新品/老品平均利润率
"fb_new_ocean_profit_rate",
"fb_new_air_profit_rate",
"fb_old_ocean_profit_rate",
"fb_old_air_profit_rate",
"fb_flow_new_asin_rate",
# 功能7:店铺综合评分(各星级占比)
"fb_star_5_pct",
"fb_star_4_pct",
"fb_star_3_pct",
"fb_star_2_pct",
"fb_star_1_pct"
],
partition_dict={
"site_name": site_name,
......@@ -117,8 +155,8 @@ if __name__ == '__main__':
cp_index_flag=False,
)
update_workflow_sql = f"""
update selection.workflow_progress set `status`='导出pg集群完成', status_val=6, over_date=CURRENT_TIME, is_end='是' where page='店铺Feedback'
update_workflow_sql = f"""
update selection.workflow_progress set `status`='导出pg集群完成', status_val=6, over_date=CURRENT_TIME, is_end='是' where page='店铺Feedback'
and `date_info`='{date_info}' and date_type='{date_type}' and site_name='{site_name}'
"""
CommonUtil.modify_export_workflow_status(update_workflow_sql, site_name, date_type, date_info)
......
......@@ -33,9 +33,10 @@ if __name__ == '__main__':
num,
created_at,
updated_at,
regexp_replace(seller_address, E'[\\r\\n\\t]+', ' ', 'g') as seller_address,
regexp_replace(seller_rating, E'[\\r\\n\\t]+', ' ', 'g') as seller_rating,
regexp_replace(feedback_histogram, E'[\\r\\n\\t]+', ' ', 'g') as feedback_histogram
regexp_replace(seller_address, E'[\\r\\n\\t]+', ' ', 'g') as seller_address,
regexp_replace(seller_rating, E'[\\r\\n\\t]+', ' ', 'g') as seller_rating,
regexp_replace(feedback_histogram, E'[\\r\\n\\t]+', ' ', 'g') as feedback_histogram,
regexp_replace(metadata_json, E'[\\r\\n\\t]+', ' ', 'g') as metadata_json
from {import_table}
where 1=1
and \$CONDITIONS
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment