Commit 9edcc990 by wangjing

no message

parent 9d0dad52
import os
import sys
current_script_dir = os.path.dirname(os.path.abspath(__file__)) # /opt/module/spark/demo/py_demo/dwt
utils_parent_dir = os.path.abspath(os.path.join(current_script_dir, "../")) # /opt/module/spark/demo/py_demo
# 将 py_demo 加入 Python 搜索路径
sys.path.append(utils_parent_dir)
from functools import reduce
from utils.secure_db_client import get_remote_engine
from pyspark.sql import functions as F
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType, IntegerType, DoubleType, LongType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
class DwtBsTop100(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = 'dwt_bs_top100' # 数据库 存储表
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") # spark实例
self.df_save = self.spark.sql("select 1+1;") # DF 对象占位符
self.df_bs_data = self.spark.sql("select 1+1;") # DF 对象占位符
self.df_deduplicated = self.spark.sql("select 1+1;") # DF 对象占位符
self.df_deduplicated2 = self.spark.sql("select 1+1;") # DF 对象占位符
self.df_cate_bs_data = self.spark.sql("select 1+1;") # DF 对象占位符
self.df_cate_tree = self.spark.sql("select 1+1;") # DF 对象占位符
self.df_nodes_num_concat = self.spark.sql("select 1+1;") # DF 对象占位符
self.df_nodes_num_concat2 = self.spark.sql("select 1+1;") # DF 对象占位符
self.df_level_concat = self.spark.sql("select 1+1;") # DF 对象占位符
self.partitions_by = ['site_name', 'date_type', 'date_info'] # 三个分区
self.reset_partitions(partitions_num=1) # 分区数量
def read_data(self):
# self.df_bs_data = self.spark.read.csv("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/dwt/asin_bsr_us_2025-11.csv",header=True,inferSchema=True).limit(1000000).cache()
sql = f"""
select asin, asin_type, bsr_orders, category_first_id, category_id, first_category_rank, current_category_rank,
asin_price, asin_rating, asin_buy_box_seller_type, asin_is_new, asin_total_comments, asin_launch_time, asin_launch_time_type,
asin_brand_name, is_brand_label, asin_bought_month as buy_data_bought_month
from dwt_flow_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
print(f"1. 读取dwt_flow_asin数据: sql -- {sql}")
self.df_bs_data = self.spark.sql(sqlQuery=sql).cache()
# self.df_bs_data.show(10, truncate=False)
print("df_bs_data数量", self.df_bs_data.count())
import pandas as pd
if pd.__version__ >= "2.0.0" and not hasattr(pd.DataFrame, "iteritems"):
# 给 DataFrame 动态添加 iteritems 方法,映射到新版 items()
pd.DataFrame.iteritems = pd.DataFrame.items
engine = get_remote_engine(site_name="us", db_type="mysql")
sql = f"SELECT nodes_num, category_id, category_parent_id, category_first_id, redirect_flag, redirect_first_id, en_name as cur_category_name from us_bs_category where delete_time is null;"
df_cate = engine.read_sql(sql)
self.df_cate_bs_data = self.spark.createDataFrame(df_cate)
self.df_cate_bs_data.cache()
# print(self.df_cate_bs_data.show())
print("df_cate_bs_data数量", self.df_cate_bs_data.count())
def clean_column(self, df, col_name):
"""统一处理 ID 列:去除 .0 后缀,处理空值"""
return df.withColumn(
col_name,
F.when(
F.col(col_name).isNull() |
(F.col(col_name) == "") |
(F.col(col_name) == "nan") |
(F.col(col_name) == "null"),
None
).otherwise(
F.regexp_replace(F.col(col_name).cast("string"), "\\.0$", "")
)
)
def handle_bs_redirect_flag(self):
print("handle_bs_redirect_flag调用 重定向一级分类关系处理")
# 重定向一级分类关系处理
self.df_cate_bs_data = self.df_cate_bs_data.withColumn("category_first_id",
F.when(self.df_cate_bs_data.redirect_flag == 1,
self.df_cate_bs_data.redirect_first_id).otherwise(
self.df_cate_bs_data.category_first_id))
# 根据指定列去重
drop_col_list = ['category_id', 'category_first_id', 'category_parent_id', 'nodes_num', 'cur_category_name']
# self.df_cate_bs_data = self.df_cate_bs_data.drop_duplicates(drop_col_list) # 根据指定列去重
self.df_cate_bs_data = self.df_cate_bs_data.withColumn("_cate_order",
F.monotonically_increasing_id()) # 添加一个顺序列 方便去重操作
print("df_cate_bs_data的_cate_order的值是")
print(self.df_cate_bs_data.select("_cate_order").tail(10))
w = Window.partitionBy(drop_col_list).orderBy(F.col("_cate_order").asc())
# 2. 生成行号并过滤
self.df_cate_bs_data = self.df_cate_bs_data.withColumn("rn", F.row_number().over(w)) \
.filter(F.col("rn") == 1) \
.drop("rn")
def handle_bs_hierarchy_to_row(self):
print("handle_bs_hierarchy_to_row调用 层级关系处理")
# print("=== df_cate_bs_data 列名列表 ===")
# print(self.df_cate_bs_data.columns)
# 对每个分类的节点进行 层级关系处理 把层级关系转为一行
nodes_num_list = self.df_cate_bs_data.select("nodes_num").distinct().collect()
nodes_num_list = [x.nodes_num for x in nodes_num_list]
nodes_num_list.sort()
print("nodes_num_list列表",nodes_num_list)
df_node_list = []
for num in nodes_num_list[1:]: # 1为根节点 不需要处理
num = int(num)
# print(num)
df_node = self.df_cate_bs_data.filter(F.col("nodes_num") == num).select("category_id", "category_parent_id",
"_cate_order")
df_node = df_node.withColumnRenamed("category_id", f"asin_bs_cate_{num - 1}_id").withColumnRenamed(
"category_parent_id", f"asin_bs_cate_{num - 2}_id")
df_node = df_node.withColumnRenamed("_cate_order", f"_cate_order_{num - 1}")
# 开窗函数去重
window_spec = Window.partitionBy(f"asin_bs_cate_{num - 1}_id", f"asin_bs_cate_{num - 2}_id").orderBy(
f"_cate_order_{num - 1}")
df_node = df_node.withColumn("_rn", F.row_number().over(window_spec))
df_node = df_node.filter(F.col("_rn") == 1).drop("_rn")
# num为2 是第一次循环 就把他当成一级分类 父节点 就是根节点 为3 asin_bs_cate_2_id就是二级分类
# print((df_node.count(), len(df_node.columns)))
if num == 2:
self.df_cate_tree = df_node # 以第一次循环的为基准 此时 df_node 的数据 是asin_bs_cate_1_id asin_bs_cate_0_id 就是一级和根节点
else:
# 以父节点为关联键 num =2 是基准 num=3 asin_bs_cate_2_id asin_bs_cate_1_id 如果以当前循环为关联键 上一次循环的 没有当前的字段
# 父节点就有 上次循环 父节点和当前循环 的当前节点是一个数据 就会把当前节点的数据 拼接到原基准上
self.df_cate_tree = self.df_cate_tree.join(df_node, how='left', on=[f"asin_bs_cate_{num - 2}_id"])
df_node_list.append(df_node)
# 删除所有 _cate_order 开头的列
# order_cols = [c for c in self.df_cate_tree.columns if c.startswith("_cate_order")]
# for col in order_cols:
# df_cate_tree = df_cate_tree.drop(col)
print("self.df_cate_tree数量", self.df_cate_tree.count())
print("self.df_cate_tree列名", self.df_cate_tree.columns)
# print(self.df_cate_tree.show(10, truncate=False))
print("handle_bs_hierarchy_to_row 层级关系处理完毕")
self.df_cate_tree.cache()
def handle_bse_column_rename(self):
print("handle_bse_column_rename调用 重命名列名")
rename_dict = {
'bsr_orders': 'asin_bsr_orders',
'category_id': 'asin_bs_cate_current_id',
'category_first_id': 'asin_bs_cate_1_id',
'category_parent_id': 'asin_bs_cate_parent_id',
'first_category_rank': 'asin_bs_cate_1_rank',
'buy_data_bought_month': 'asin_amazon_orders'
}
for old_col, new_col in rename_dict.items():
self.df_bs_data = self.df_bs_data.withColumnRenamed(old_col, new_col)
rename_dict2 = {
'category_id': 'asin_bs_cate_current_id',
'category_first_id': 'asin_bs_cate_1_id',
'category_parent_id': 'asin_bs_cate_parent_id'
}
# 循环批量重命名,赋值给原变量=原数据修改
for old, new in rename_dict2.items():
self.df_cate_bs_data = self.df_cate_bs_data.withColumnRenamed(old, new)
def handle_bs_categorical_data(self):
print("handle_bs_categorical_data 调用")
# 排除asin_type not in (1, 2), 1:内部asin,2:视频音乐图书等asin
print("排除asin_type")
self.df_bs_data = self.df_bs_data.filter(~(self.df_bs_data.asin_type.isin([1, 2])))
print("处理上架时间分类")
# 处理上架时间分类 上架时间类型(默认0,1:最近30天,2:1-3个月,3:3-6个月,4:6-12个月,5:1-2年,6:2-3年,7:3年以上)
launch_time_type_list = [x[0] for x in self.df_bs_data.select("asin_launch_time_type").distinct().collect()]
for rate in launch_time_type_list:
self.df_bs_data = self.df_bs_data.withColumn(f"launch_time_type{rate}_rate",
self.df_bs_data["asin_launch_time_type"] == rate)
print("处理卖家类型")
# 处理卖家类型 buybox卖家类型(1:Amazon,2:FBA,3FBM,4无BB卖家)
asin_buy_box_seller_type_list = [x[0] for x in
self.df_bs_data.select("asin_buy_box_seller_type").distinct().collect()]
for rate in asin_buy_box_seller_type_list:
self.df_bs_data = self.df_bs_data.withColumn(f"asin_buy_box_seller_type{rate}_rate",
self.df_bs_data["asin_buy_box_seller_type"] == rate)
def handle_bs_asin_deduplicated(self):
print("handle_bs_asin_deduplicated 调用")
# # 对每列 转换类型为 string 然后统一将空值 nan null 设置为none 把数据末尾的.0去除
for col_name in self.df_cate_bs_data.columns:
if col_name.startswith("_cate_order"):
continue
self.df_cate_bs_data = self.df_cate_bs_data.withColumn(col_name, F.col(col_name).cast("string"))
self.df_cate_bs_data = self.clean_column(self.df_cate_bs_data, col_name)
# 数据匹配分类数据,并取最小级分类¶ 以asin分类 获取 所有asin分类层级中 最小的节点 后续操作都是对去重去最小分类节点的asin 的df数据操作
self.df_bs_data = self.df_bs_data.withColumn("asin_bs_cate_current_id",
self.df_bs_data["asin_bs_cate_current_id"].cast("string"))
self.df_bs_data = self.df_bs_data.withColumn("asin_bs_cate_current_id",
F.split(self.df_bs_data["asin_bs_cate_current_id"], "\\.").getItem(
0)) # 以点分割取第一个
# an null 设置为none 把数据末尾的.0去除 以防join出问题
self.df_bs_data = self.clean_column(self.df_bs_data, "asin_bs_cate_current_id")
self.df_bs_data = self.clean_column(self.df_bs_data, "asin_bs_cate_1_id")
self.df_deduplicated = self.df_bs_data.join(self.df_cate_bs_data,
on=['asin_bs_cate_1_id', 'asin_bs_cate_current_id'], how="left")
window_spec = Window.partitionBy("asin").orderBy(
F.col("nodes_num").desc_nulls_last(), # 降序排列 null在最后
F.col("_cate_order").asc_nulls_last()
# nodes_num 相等 按照_cate_order 升序 模拟 Pandas 的 drop_duplicates(keep='first')
)
# 生成行号并过滤
self.df_deduplicated = self.df_deduplicated.withColumn("rn", F.row_number().over(window_spec)) \
.filter(F.col("rn") == 1) \
.drop("rn")
self.df_deduplicated .cache()
print("df_deduplicated打印")
# self.df_deduplicated.show(10, truncate=False)
# print(self.df_deduplicated.columns)
# print(self.df_deduplicated.count())
def handle_bs_add_column(self):
print("handle_bs_add_column 调用")
self.df_deduplicated = self.df_deduplicated.withColumn("asin_bsr_orders_new",
F.when(self.df_deduplicated.asin_is_new == 1,
self.df_deduplicated.asin_bsr_orders).otherwise(
F.lit(0)))
self.df_deduplicated = self.df_deduplicated.withColumn("asin_bsr_orders_brand",
F.when(self.df_deduplicated.is_brand_label == 1,
self.df_deduplicated.asin_bsr_orders).otherwise(
F.lit(0)))
self.df_deduplicated = self.df_deduplicated.withColumn("level", self.df_deduplicated.nodes_num - 1)
# 销售额相关字段计算
self.df_deduplicated = self.df_deduplicated.withColumn("asin_bsr_orders_sales",
self.df_deduplicated.asin_bsr_orders * self.df_deduplicated.asin_price)
self.df_deduplicated = self.df_deduplicated.withColumn("asin_bsr_orders_sales_new",
self.df_deduplicated.asin_bsr_orders_new * self.df_deduplicated.asin_price)
self.df_deduplicated = self.df_deduplicated.withColumn("asin_bsr_orders_sales_brand",
self.df_deduplicated.asin_bsr_orders_brand * self.df_deduplicated.asin_price)
def handle_bs_nodes_num_concat(self):
print("handle_bs_nodes_num_concat 调用")
raw_list = self.df_deduplicated.filter(F.col("nodes_num").isNotNull()).agg(F.collect_set("nodes_num")).first()[
0]
nodes_num_list = sorted([int(x) for x in raw_list])
print("df_node_num_list打印", nodes_num_list)
cols_select_list = ['asin', 'asin_amazon_orders', 'asin_bsr_orders', 'asin_bsr_orders_new',
'asin_bsr_orders_brand',
'asin_bsr_orders_sales', 'asin_bsr_orders_sales_new', 'asin_bsr_orders_sales_brand',
'asin_bs_cate_1_rank', 'asin_price', 'asin_rating', 'asin_is_new', 'is_brand_label',
'asin_total_comments', 'asin_bs_cate_current_id', 'asin_bs_cate_1_id',
'launch_time_type0_rate', 'launch_time_type1_rate', 'launch_time_type2_rate',
'launch_time_type3_rate', 'launch_time_type4_rate',
'launch_time_type5_rate', 'launch_time_type6_rate', 'launch_time_type7_rate',
'asin_buy_box_seller_type0_rate', 'asin_buy_box_seller_type1_rate',
'asin_buy_box_seller_type2_rate', 'asin_buy_box_seller_type3_rate',
'asin_buy_box_seller_type4_rate']
df_node_num_list = []
for idx, num in enumerate(nodes_num_list):
num = int(num)
df_node_num = self.df_deduplicated.filter(self.df_deduplicated["nodes_num"] == num).select(
*cols_select_list)
# print((df_node_num.count(), len(df_node_num.columns)))
df_node_num = df_node_num.withColumnRenamed("asin_bs_cate_current_id", f"asin_bs_cate_{num - 1}_id")
cols_list = [f"asin_bs_cate_{i}_id" for i in
range(1, num)] # 生成当前分类的层级路径 如 num=4 生成 1-3的asin_bs_cate_x_id
# 添加循环顺序列
df_node_num = df_node_num.withColumn("_loop_order", F.lit(idx))
df_first_tree = self.df_cate_tree.select(*cols_list).dropDuplicates()
df_first_tree = df_first_tree.withColumn(
"asin_bs_cate_parent_id_join",
F.concat_ws("&&&&", *[df_first_tree[c].cast("string") for c in cols_list[:-1]])
)
df_node_num = df_node_num.join(df_first_tree, how="left",
on=["asin_bs_cate_1_id", f"asin_bs_cate_{num - 1}_id"])
# 以一级分类id和当前分类ID 为条件 左连接
df_node_num_list.append(df_node_num)
self.df_nodes_num_concat = reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), df_node_num_list)
window_spec = Window.partitionBy("asin").orderBy(F.col("_loop_order").asc()) # 按 asin 去重,保留第一条
# 生成行号并过滤
self.df_nodes_num_concat = self.df_nodes_num_concat.withColumn("rn", F.row_number().over(window_spec)) \
.filter(F.col("rn") == 1) \
.drop("rn", "_loop_order")
print("df_nodes_num_concat打印") # 这个后续没用到
print(self.df_nodes_num_concat.count())
print(self.df_nodes_num_concat.columns)
def handle_bs_nodes_num_concat2(self):
print("handle_bs_nodes_num_concat2 打印")
self.df_deduplicated2 = self.df_deduplicated.fillna({"nodes_num": -1})
self.df_deduplicated2.cache()
print("df_deduplicated2数量",self.df_deduplicated2.count())
raw_list = self.df_deduplicated2.filter(F.col("nodes_num").isNotNull()).agg(F.collect_set("nodes_num")).first()[0]
nodes_num_list2 = sorted([int(x) for x in raw_list])
print("df_node_num_list2打印", nodes_num_list2)
cols_select_list = ['asin', 'asin_amazon_orders', 'asin_bsr_orders', 'asin_bsr_orders_new',
'asin_bsr_orders_brand',
'asin_bsr_orders_sales', 'asin_bsr_orders_sales_new', 'asin_bsr_orders_sales_brand',
'asin_bs_cate_1_rank', 'asin_price', 'asin_rating', 'asin_is_new', 'is_brand_label',
'asin_total_comments', 'asin_bs_cate_current_id', 'asin_bs_cate_1_id',
'launch_time_type0_rate', 'launch_time_type1_rate', 'launch_time_type2_rate',
'launch_time_type3_rate', 'launch_time_type4_rate',
'launch_time_type5_rate', 'launch_time_type6_rate', 'launch_time_type7_rate',
'asin_buy_box_seller_type0_rate', 'asin_buy_box_seller_type1_rate',
'asin_buy_box_seller_type2_rate', 'asin_buy_box_seller_type3_rate',
'asin_buy_box_seller_type4_rate', 'nodes_num']
df_node_num_list2 = []
for idx,num in enumerate(nodes_num_list2) :
num = int(num)
df_node_num = self.df_deduplicated2.filter(self.df_deduplicated2.nodes_num == num).select(*cols_select_list)
df_node_num = df_node_num.withColumn("_loop_order", F.lit(idx))
# print(num,df_node_num.count(),len(df_node_num.columns))
if num > 1:
df_node_num = df_node_num.withColumnRenamed("asin_bs_cate_current_id", f"asin_bs_cate_{num - 1}_id")
cols_list = [f"asin_bs_cate_{i}_id" for i in
range(1, num)] # 生成当前分类的层级路径 如 num=4 生成 1-3的asin_bs_cate_x_id
# df_first_tree2 = self.df_cate_tree.select(*cols_list).dropDuplicates()
# 获取分类树中对应的列 _cate_order_1 _cate_order_2 这种
order_cols = [c for c in self.df_cate_tree.columns if c.startswith("_cate_order")]
# 合并两个列 保留存在 df_cate_tree里的
select_cols = [c for c in cols_list + order_cols if c in self.df_cate_tree.columns]
df_first_tree2 = self.df_cate_tree.select(*select_cols)
if order_cols:
available_order_cols = [c for c in order_cols if c in df_first_tree2.columns]
if available_order_cols:
window_spec = Window.partitionBy(*cols_list).orderBy(
*[F.col(c).asc_nulls_last() for c in available_order_cols] # 按照order顺序依次排序
)
df_first_tree2 = df_first_tree2.withColumn("_rn", F.row_number().over(window_spec))
df_first_tree2 = df_first_tree2.filter(F.col("_rn") == 1).drop("_rn")
else:
df_first_tree2 = df_first_tree2.dropDuplicates(cols_list)
# 删除df_first_tree2的_cate_order_x 的序列
for oc in order_cols:
if oc in df_first_tree2.columns:
df_first_tree2 = df_first_tree2.drop(oc)
df_first_tree2 = df_first_tree2.withColumn("asin_bs_cate_parent_id_join", F.concat_ws("&&&&", *[
df_first_tree2[c].cast("string") for c in cols_list[:-1]]))
df_node_num = df_node_num.join(df_first_tree2, how="left",
on=["asin_bs_cate_1_id", f"asin_bs_cate_{num - 1}_id"])
df_node_num_list2.append(df_node_num)
df_nodes_num_concat2 = reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), df_node_num_list2)
window_spec = Window.partitionBy("asin").orderBy(F.col("_loop_order").asc())
# 生成行号并过滤
self.df_nodes_num_concat2 = df_nodes_num_concat2.withColumn("rn", F.row_number().over(window_spec)) \
.filter(F.col("rn") == 1) \
.drop("rn")
self.df_nodes_num_concat2.cache()
# 处理 asin_bs_cate_ 空值 null
id_cols = [c for c in self.df_nodes_num_concat2.columns if 'asin_bs_cate_' in c and '_id' in c]
for id_col in id_cols:
self.df_nodes_num_concat2 = self.clean_column(self.df_nodes_num_concat2, id_col)
print("df_nodes_num_concat2打印")
# self.df_nodes_num_concat2.show(10,truncate=False)
print(self.df_nodes_num_concat2.count())
# print(self.df_nodes_num_concat2.columns)
def handle_bs_level(self):
print("handle_bs_level 调用")
import re
levels = []
for col in self.df_nodes_num_concat2.columns:
match = re.match(r'asin_bs_cate_(\d+)_id', col)
if match:
levels.append(int(match.group(1)))
levels = sorted(set(levels)) #获取要遍历的asin_bs_cate_列表
print("asin_bs_cate_列表",levels)
# 获取所有 rate 列
rate_cols = [c for c in self.df_nodes_num_concat2.columns if 'rate' in c]
df_level_list = []
# 10月数据八个层级
# for level in range(1, 9):
for level in levels:
curr_col = f"asin_bs_cate_{level}_id"
parent_col = f"asin_bs_cate_{level - 1}_id"
filter_cond = F.col(curr_col).isNotNull()
if level > 1 and parent_col in self.df_nodes_num_concat2.columns:
filter_cond = filter_cond & F.col(parent_col).isNotNull()
df_level = self.df_nodes_num_concat2.filter(filter_cond)
if level == 1:
group_keys = [curr_col]
else:
group_keys = [curr_col, parent_col]
aggs = [
F.sum("asin_amazon_orders").alias("asin_amazon_orders"),
F.sum("asin_bsr_orders").alias("asin_bsr_orders"),
F.sum("asin_bsr_orders_new").alias("asin_bsr_orders_new"),
F.sum("asin_bsr_orders_brand").alias("asin_bsr_orders_brand"),
F.sum("asin_bsr_orders_sales").alias("asin_bsr_orders_sales"),
F.sum("asin_bsr_orders_sales_new").alias("asin_bsr_orders_sales_new"),
F.sum("asin_bsr_orders_sales_brand").alias("asin_bsr_orders_sales_brand"),
F.sum("asin_is_new").alias("asin_is_new"), # 已转int
F.sum("is_brand_label").alias("is_brand_label"), # 已转int
F.count("asin").alias("asin"),
F.avg("asin_price").alias("asin_price"),
F.avg("asin_rating").alias("asin_rating"),
F.avg("asin_bs_cate_1_rank").alias("asin_bs_cate_1_rank"),
F.avg("asin_total_comments").alias("asin_total_comments"),
]
#添加 rate 列的聚合
for rc in rate_cols:
# aggs.append(F.sum(F.col(rc)).alias(rc))
aggs.append(F.sum(F.col(rc).cast("int")).alias(rc))
if level > 1 and "asin_bs_cate_parent_id_join" in self.df_nodes_num_concat2.columns:
aggs.append(F.min("asin_bs_cate_parent_id_join").alias("asin_bs_cate_parent_id_join"))
df_res = df_level.groupBy(group_keys).agg(*aggs)
df_res = df_res.withColumnRenamed(curr_col, "asin_bs_cate_current_id")
if level == 1:
df_res = df_res.withColumn("asin_bs_cate_parent_id", F.lit("0"))
df_res = df_res.withColumn("asin_bs_cate_parent_id_join", F.lit("0"))
else:
df_res = df_res.withColumnRenamed(parent_col, "asin_bs_cate_parent_id")
if "asin_bs_cate_parent_id_join" in df_res.columns:
df_res = df_res.withColumn(
"asin_bs_cate_parent_id_join",
F.concat(F.lit("0&&&&"), F.col("asin_bs_cate_parent_id_join"))
)
else:
df_res = df_res.withColumn("asin_bs_cate_parent_id_join", F.lit("0&&&&&nan"))
df_res = df_res.withColumn("level", F.lit(level))
# 打印行数进行对比
# print(f"Level {level} Count: {df_res.count()}")
df_level_list.append(df_res)
self.df_level_concat = reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), df_level_list)
self.df_level_concat.cache()
# print("self.df_level_concat",self.df_level_concat.count())
# print(self.df_level_concat.columns)
# 增加分类名称
# # 筛选self.df_cate_bs_data指定两列的数据 以对asin_bs_cate_current_id去重
print("增加分类名称")
df_cate_name = self.df_cate_bs_data.select(*['asin_bs_cate_current_id', 'cur_category_name','_cate_order'])
window_spec = Window.partitionBy('asin_bs_cate_current_id').orderBy(F.col("_cate_order").asc_nulls_last())
df_cate_name = df_cate_name.withColumn("_rn",F.row_number().over(window_spec)).filter(F.col("_rn") == 1).drop("_rn")
self.df_level_concat =self.df_level_concat.join(df_cate_name, on='asin_bs_cate_current_id',how="inner")
print("增加分类名称 join后的数据")
# self.df_level_concat.show(10,truncate=False )
# print(self.df_level_concat.count())
# print(self.df_level_concat.columns)
# print("df_cate_bs_data的_cate_order的值是")
# self.df_cate_bs_data.select("_cate_order").show()
def handle_bs_level_calculate_column(self):
print("handle_bs_level_calculate_column 调用 字段占比计算")
rename_columns = {
"asin_amazon_orders": "asin_amazon_orders_sum",
"asin_bsr_orders": "asin_bsr_orders_sum",
"asin_bsr_orders_new": "asin_bsr_orders_sum_new",
"asin_bsr_orders_brand": "asin_bsr_orders_sum_brand",
"asin_bsr_orders_sales": "asin_bsr_orders_sales_sum",
"asin_bsr_orders_sales_new": "asin_bsr_orders_sales_sum_new",
"asin_bsr_orders_sales_brand": "asin_bsr_orders_sales_sum_brand",
"asin_is_new": "asin_count_new",
"is_brand_label": "asin_count_brand",
"asin": "asin_count",
"asin_price": "asin_price_mean",
"asin_rating": "asin_rating_mean",
"asin_bs_cate_1_rank": "asin_bs_cate_1_rank_mean",
"asin_total_comments": "asin_total_comments_mean"
}
for old_col, new_col in rename_columns.items():
self.df_level_concat = self.df_level_concat.withColumnRenamed(old_col, new_col)
# 新品占比
self.df_level_concat = self.df_level_concat.withColumn(
"asin_count_rate_new",
F.col("asin_count_new") / F.col("asin_count")
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sum_rate_new",
F.col("asin_bsr_orders_sum_new") / F.col("asin_bsr_orders_sum")
)
# 品牌占比
self.df_level_concat = self.df_level_concat.withColumn(
"asin_count_rate_brand",
F.col("asin_count_brand") / F.col("asin_count")
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sum_rate_brand",
F.col("asin_bsr_orders_sum_brand") / F.col("asin_bsr_orders_sum")
)
# 平均销售额
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sales_mean",
F.col("asin_bsr_orders_sales_sum") / F.col("asin_count")
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sales_mean_new",
F.col("asin_bsr_orders_sales_sum_new") / F.col("asin_count_new")
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sales_mean_brand",
F.col("asin_bsr_orders_sales_sum_brand") / F.col("asin_count_brand")
)
# 平均月销量
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_mean",
F.col("asin_bsr_orders_sum") / F.col("asin_count")
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_mean_new",
F.col("asin_bsr_orders_sum_new") / F.col("asin_count_new")
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_mean_brand",
F.col("asin_bsr_orders_sum_brand") / F.col("asin_count_brand")
)
# ========== 3. 保留小数位数 ==========
# round(0) 保留整数
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sum", F.round("asin_bsr_orders_sum", 0)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sum_new", F.round("asin_bsr_orders_sum_new", 0)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sum_brand", F.round("asin_bsr_orders_sum_brand", 0)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_count_new", F.round("asin_count_new", 0)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_count_brand", F.round("asin_count_brand", 0)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bs_cate_1_rank_mean", F.round("asin_bs_cate_1_rank_mean", 0)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_total_comments_mean", F.round("asin_total_comments_mean", 0)
)
# round(2) 保留2位小数
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sales_sum", F.round("asin_bsr_orders_sales_sum", 2)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sales_mean", F.round("asin_bsr_orders_sales_mean", 2)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sales_sum_new", F.round("asin_bsr_orders_sales_sum_new", 2)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sales_mean_new", F.round("asin_bsr_orders_sales_mean_new", 2)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sales_sum_brand", F.round("asin_bsr_orders_sales_sum_brand", 2)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sales_mean_brand", F.round("asin_bsr_orders_sales_mean_brand", 2)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_price_mean", F.round("asin_price_mean", 2)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_rating_mean", F.round("asin_rating_mean", 2)
)
# round(4) 保留4位小数
self.df_level_concat = self.df_level_concat.withColumn(
"asin_count_rate_new", F.round("asin_count_rate_new", 4)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_count_rate_brand", F.round("asin_count_rate_brand", 4)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sum_rate_new", F.round("asin_bsr_orders_sum_rate_new", 4)
)
self.df_level_concat = self.df_level_concat.withColumn(
"asin_bsr_orders_sum_rate_brand", F.round("asin_bsr_orders_sum_rate_brand", 4)
)
def handle_bs_level_proportion(self):
# 被handle_bs_level_proportion2代替
print("handle_bs_level_proportion 调用 计算层级占比")
row_list = self.df_level_concat.select("level").distinct().collect()
level_list = sorted( [row.level for row in row_list])
print("level_list",level_list)
self.df_level_concat.cache() # 先缓存输入数据,防止重复计算
df_level_rate_list = []
for level in level_list:
df_cur_lever = self.df_level_concat.filter(self.df_level_concat.level == level).select(*['asin_bs_cate_current_id', 'asin_bs_cate_parent_id',
'asin_bsr_orders_sum', 'level'])
# print(level, df_cur_lever.count(),len(df_cur_lever.columns))
df_level_parent = df_cur_lever.groupBy("asin_bs_cate_parent_id").agg(
F.sum("asin_bsr_orders_sum").alias("asin_bsr_orders_sum"))
df_level_parent = df_level_parent.withColumnRenamed("asin_bsr_orders_sum","asin_bsr_orders_sum_parent")
df_cur_lever = df_cur_lever.join(df_level_parent, on=["asin_bs_cate_parent_id"],how="inner")
df_cur_lever = df_cur_lever.withColumn("asin_bsr_orders_sum_rate",F.col("asin_bsr_orders_sum") / F.col("asin_bsr_orders_sum_parent"))
df_cur_lever = df_cur_lever.withColumn("asin_bsr_orders_sum_rate",F.round("asin_bsr_orders_sum_rate",4))
df_level_rate_list.append(df_cur_lever)
print("循环完毕")
df_level_rate_concat = reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), df_level_rate_list)
print("df_level_rate_concat 打印")
# df_level_rate_concat.show(10,truncate=False)
# print(df_level_rate_concat.count())
# print(df_level_rate_concat.columns)
# self.df_save生成
print("self.df_save 保存对象生成")
self.df_save = self.df_level_concat.join(df_level_rate_concat, how="inner", on=['asin_bs_cate_current_id', 'asin_bs_cate_parent_id', 'asin_bsr_orders_sum', 'level'])
print("df_save的值为")
self.df_save.show(10,truncate=False)
print(self.df_save.count())
print(self.df_save.columns)
def handle_bs_level_proportion2(self):
print("handle_bs_level_proportion 调用 计算层级占比")
self.df_level_concat.cache()
# 2. 定义窗口:按父分类ID (asin_bs_cate_parent_id) 分组
# 这一步相当于原来的: groupBy("asin_bs_cate_parent_id")
window_spec = Window.partitionBy("asin_bs_cate_parent_id")
# 3. 计算父节点总订单数
# 这一步相当于原来的: agg(sum("asin_bsr_orders_sum")) + join
# 效果:每一行都会多出一列 'asin_bsr_orders_sum_parent',
# 这一列的值等于该行所属父分类下所有行的 orders_sum 之和。
self.df_save = self.df_level_concat.withColumn(
"asin_bsr_orders_sum_parent",
F.sum("asin_bsr_orders_sum").over(window_spec)
)
self.df_save.cache()
# 4. 计算占比
# 逻辑:自身订单 / 父节点总订单
self.df_save = self.df_save.withColumn(
"asin_bsr_orders_sum_rate",
F.when(F.col("asin_bsr_orders_sum_parent") != 0,
F.round(F.col("asin_bsr_orders_sum") / F.col("asin_bsr_orders_sum_parent"), 4)
).otherwise(0)
)
# 释放缓存
self.df_level_concat.unpersist()
def handle_bs_proportion(self):
print("handle_bs_proportion调用")
# 占比计算
# 定义需要处理的列名列表
rate_cols = [
"launch_time_type0_rate", "launch_time_type1_rate", "launch_time_type2_rate",
"launch_time_type3_rate", "launch_time_type4_rate", "launch_time_type5_rate",
"launch_time_type6_rate", "launch_time_type7_rate",
"asin_buy_box_seller_type0_rate", "asin_buy_box_seller_type1_rate",
"asin_buy_box_seller_type2_rate", "asin_buy_box_seller_type3_rate",
"asin_buy_box_seller_type4_rate"
]
# 使用循环批量更新列
for col_name in rate_cols:
self.df_save = self.df_save.withColumn(col_name, F.col(col_name) / F.col("asin_count"))
def handle_data(self):
self.handle_bs_redirect_flag()
self.handle_bs_hierarchy_to_row()
self.handle_bse_column_rename()
self.handle_bs_categorical_data()
self.handle_bs_asin_deduplicated()
self.handle_bs_add_column()
self.handle_bs_nodes_num_concat()
self.handle_bs_nodes_num_concat2()
self.handle_bs_level()
self.handle_bs_level_calculate_column()
# self.handle_bs_level_proportion()
self.handle_bs_level_proportion2()
self.handle_bs_proportion()
self.handle_bs_to_csv_ceil() # 不转csv的话可以不用加 为了对齐mysql格式(mysql设置了约束 数据会清洗 截断)
# print("df_save 计算完成")
# self.df_save.show(10, truncate=False)
print(f" df_save总行数: {self.df_save.count()}")
# print(self.df_save.columns)
def handle_bs_to_csv_ceil(self):
df = self.df_save
# ========== 所有数值字段先填充NULL为0 ==========
all_numeric_cols = [
# int类型
"asin_amazon_orders_sum", "asin_bsr_orders_sum", "asin_bsr_orders_sum_new",
"asin_bsr_orders_sum_brand", "asin_bsr_orders_sales_sum",
"asin_bsr_orders_sales_sum_new", "asin_bsr_orders_sales_sum_brand",
"asin_count_new", "asin_count_brand", "asin_count",
"asin_total_comments_mean", "level",
"asin_bsr_orders_mean_new", "asin_bsr_orders_sales_mean_new",
"asin_bsr_orders_sales_mean_brand", "asin_bsr_orders_mean_brand",
"asin_bs_cate_1_rank_mean",
# double类型
"asin_bsr_orders_mean", "asin_bsr_orders_sum_parent",
"asin_bsr_orders_sales_mean", "asin_price_mean", "asin_rating_mean",
"asin_bsr_orders_sum_rate", "asin_bsr_orders_sum_rate_new",
"asin_bsr_orders_sum_rate_brand", "asin_count_rate_new", "asin_count_rate_brand",
"launch_time_type0_rate", "launch_time_type1_rate", "launch_time_type2_rate",
"launch_time_type3_rate", "launch_time_type4_rate", "launch_time_type5_rate",
"launch_time_type6_rate", "launch_time_type7_rate",
"asin_buy_box_seller_type0_rate", "asin_buy_box_seller_type1_rate",
"asin_buy_box_seller_type2_rate", "asin_buy_box_seller_type3_rate",
"asin_buy_box_seller_type4_rate",
]
for c in all_numeric_cols:
if c in df.columns:
df = df.withColumn(c, F.coalesce(F.col(c), F.lit(0)))
# ========== 1. 纯int类型字段(输出纯整数,如 "2480")==========
# 这些字段在SQL中是int,且通常不会有NULL值
pure_int_cols = [
"asin_amazon_orders_sum",
"asin_bsr_orders_sum",
"asin_bsr_orders_sum_new",
"asin_bsr_orders_sum_brand",
"asin_bsr_orders_sales_sum",
"asin_bsr_orders_sales_sum_new",
"asin_bsr_orders_sales_sum_brand",
"asin_count_new",
"asin_count_brand",
"asin_count",
"asin_total_comments_mean",
"level",
]
for c in pure_int_cols:
if c in df.columns:
df = df.withColumn(c, F.round(F.col(c), 0).cast("long"))
# ========== 2. int类型但pandas读取后会带.0的字段(有NULL值)==========
# 这些字段输出为 "2480.0" 格式,与pandas读取SQL后的格式一致
int_with_decimal_cols = [
"asin_bsr_orders_mean_new",
"asin_bsr_orders_sales_mean_new",
"asin_bsr_orders_sales_mean_brand",
"asin_bsr_orders_mean_brand",
"asin_bs_cate_1_rank_mean",
]
for c in int_with_decimal_cols:
if c in df.columns:
df = df.withColumn(c, F.round(F.col(c), 0).cast("double"))
# ========== 3. double(20,0) - 整数精度的double ==========
double_int_cols = [
"asin_bsr_orders_mean",
"asin_bsr_orders_sum_parent",
]
for c in double_int_cols:
if c in df.columns:
df = df.withColumn(c, F.round(F.col(c), 0))
# ========== 4. double(10,2) 字段 ==========
double2_cols = [
"asin_price_mean",
"asin_rating_mean",
"asin_bsr_orders_sales_mean",
"asin_bsr_orders_sum_rate_new",
"asin_bsr_orders_sum_rate_brand",
]
for c in double2_cols:
if c in df.columns:
df = df.withColumn(c, F.round(F.col(c), 2))
# ========== 5. double(10,4) 比例字段 ==========
double4_cols = [
"asin_bsr_orders_sum_rate",
"asin_count_rate_new",
"asin_count_rate_brand",
"launch_time_type0_rate",
"launch_time_type1_rate",
"launch_time_type2_rate",
"launch_time_type3_rate",
"launch_time_type4_rate",
"launch_time_type5_rate",
"launch_time_type6_rate",
"launch_time_type7_rate",
"asin_buy_box_seller_type0_rate",
"asin_buy_box_seller_type1_rate",
"asin_buy_box_seller_type2_rate",
"asin_buy_box_seller_type3_rate",
"asin_buy_box_seller_type4_rate",
]
for c in double4_cols:
if c in df.columns:
df = df.withColumn(c, F.round(F.col(c), 4))
self.df_save = df
if "_cate_order" in self.df_save.columns:
self.df_save = self.df_save.drop("_cate_order")
# self.df_save = self.df_save.drop("_cate_order")
def save_pandas_to_csv(self):
"""
将Spark DataFrame转为Pandas并导出CSV,格式与MySQL读取后的pandas格式一致
"""
import pandas as pd
import numpy as np
p_df = self.df_save.toPandas()
# ========== 字段分类定义(根据SQL表结构和pandas读取行为)==========
# 纯int类型:输出纯整数,如 "1234"(不带.0)
pure_int_cols = [
"asin_amazon_orders_sum",
"asin_bsr_orders_sum",
"asin_bsr_orders_sum_new",
"asin_bsr_orders_sum_brand",
"asin_bsr_orders_sales_sum",
"asin_bsr_orders_sales_sum_new",
"asin_bsr_orders_sales_sum_brand",
"asin_count_new",
"asin_count_brand",
"asin_count",
"asin_total_comments_mean",
"level",
]
# int类型但pandas读取后带.0的字段:输出 "1234.0" 格式
int_with_decimal_cols = [
"asin_bsr_orders_mean_new",
"asin_bsr_orders_sales_mean_new",
"asin_bsr_orders_sales_mean_brand",
"asin_bsr_orders_mean_brand",
"asin_bs_cate_1_rank_mean",
]
# double(20,0)类型:输出 "1234.0" 格式
double_int_cols = [
"asin_bsr_orders_mean",
"asin_bsr_orders_sum_parent",
]
# double(10,2)类型:最多2位小数,去尾部0但保留.0
double2_cols = [
"asin_price_mean",
"asin_rating_mean",
"asin_bsr_orders_sales_mean",
"asin_bsr_orders_sum_rate_new",
"asin_bsr_orders_sum_rate_brand",
]
# double(10,4)类型:最多4位小数
double4_cols = [
"asin_bsr_orders_sum_rate",
"asin_count_rate_new",
"asin_count_rate_brand",
"launch_time_type0_rate",
"launch_time_type1_rate",
"launch_time_type2_rate",
"launch_time_type3_rate",
"launch_time_type4_rate",
"launch_time_type5_rate",
"launch_time_type6_rate",
"launch_time_type7_rate",
"asin_buy_box_seller_type0_rate",
"asin_buy_box_seller_type1_rate",
"asin_buy_box_seller_type2_rate",
"asin_buy_box_seller_type3_rate",
"asin_buy_box_seller_type4_rate",
]
# 字符串类型
str_cols = [
"asin_bs_cate_current_id",
"asin_bs_cate_parent_id",
"asin_bs_cate_parent_id_join",
"cur_category_name",
]
# ========== 格式化函数 ==========
def format_pure_int(v):
"""纯int类型:空值为空字符串,否则为纯整数字符串(不带.0)"""
if pd.isna(v):
return ''
try:
return str(int(round(float(v))))
except:
return ''
def format_int_with_decimal(v):
"""int类型但需要带.0:如 "1234.0"(与pandas读取SQL后格式一致)"""
if pd.isna(v):
return ''
try:
return f"{round(float(v)):.1f}"
except:
return ''
def format_double_int(v):
"""double(20,0)类型:整数但带.0,如 "1234.0" """
if pd.isna(v):
return ''
try:
return f"{round(float(v)):.1f}"
except:
return ''
def format_double2(v):
"""double(10,2)类型:最多2位小数,去尾部0但整数保留.0"""
if pd.isna(v):
return ''
try:
val = round(float(v), 2)
if val == int(val):
# 整数情况,保留.0 (如 1.0, 0.0)
return f"{int(val)}.0"
else:
# 有小数,去尾部0
s = f"{val:.2f}".rstrip('0')
return s
except:
return ''
def format_double4(v):
"""double(10,4)类型:最多4位小数,去尾部0"""
if pd.isna(v):
return ''
try:
val = round(float(v), 4)
s = f"{val:.4f}".rstrip('0').rstrip('.')
return s
except:
return ''
def format_str(v):
"""字符串类型"""
if pd.isna(v):
return ''
return str(v)
# ========== 应用格式化 ==========
for c in pure_int_cols:
if c in p_df.columns:
p_df[c] = p_df[c].apply(format_pure_int)
for c in int_with_decimal_cols:
if c in p_df.columns:
p_df[c] = p_df[c].apply(format_int_with_decimal)
for c in double_int_cols:
if c in p_df.columns:
p_df[c] = p_df[c].apply(format_double_int)
for c in double2_cols:
if c in p_df.columns:
p_df[c] = p_df[c].apply(format_double2)
for c in double4_cols:
if c in p_df.columns:
p_df[c] = p_df[c].apply(format_double4)
for c in str_cols:
if c in p_df.columns:
p_df[c] = p_df[c].apply(format_str)
# 处理其他未明确分类的列(兜底)
processed_cols = set(pure_int_cols + int_with_decimal_cols + double_int_cols +
double2_cols + double4_cols + str_cols)
for c in p_df.columns:
if c not in processed_cols:
p_df[c] = p_df[c].apply(format_str)
# 3. 最后保存
p_df.to_csv(f"/home/hejiangming/bs_top100_pandas_{self.site_name}_{self.date_info}.csv", index=False, encoding='utf-8',quoting = 1)
print("pandas方法保存csv成功 ")
# def save_spark_to_csv(self):
# (self.df_save.repartition(1) # 确保只生成一个 CSV 文件
# .write
# .mode("overwrite")
# .option("header", "true")
# .option("sep", ",") # 指定分隔符
# .csv(f"/home/hejiangming/bs_top100_spark_{self.site_name}_{self.date_info}.csv"))
# print("spark方法保存csv成功 ") # 没权限
def save_data(self):
print("save_data调用")
# self.df_save = self.df_save.toPandas()
# # quoting = 1 所有字段都加双引号 escapechar="\\" 转义字符 当字段里本身有 双引号 时,需要转义,否则 CSV 解析器会报错
# self.df_save.to_csv(f"/home/hejiangming/bs_top100_{self.site_name}_{self.date_info}.csv", index=False,encoding="utf-8-sig",quoting = 1,escapechar="\\")
self.save_pandas_to_csv()
print("success保存成功")
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点 us
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter month
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 2025-11
handle_obj = DwtBsTop100(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment