""" @Author : HuangJian @Description : 店铺feedBack页面top20相关指标统计 @SourceTable : ①ods_seller_account_syn ②ods_seller_asin_account ③ods_seller_account_feedback_report ④ods_asin_detail_product ⑤dim_asin_history_info @SinkTable : ①dwd_seller_asin_account_agg ②dwd_seller_asin_account_detail @CreateTime : 2022/11/2 9:56 @UpdateTime : 2022/11/2 9:56 """ import datetime import traceback import os import sys from datetime import date, timedelta sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.sql.functions import ceil from pyspark.sql.types import IntegerType from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql import functions as F from yswg_utils.common_udf import udf_get_package_quantity class DwdFeedBack(Templates): def __init__(self, site_name='us', date_type="week", date_info='2022-40'): super().__init__() self.db_save = "feedback_week" self.site_name = site_name self.date_type = date_type self.date_info = date_info self.spark = self.create_spark_object( app_name=f"{self.db_save}:{self.site_name}_{self.date_type}_{self.date_info}") self.df_date = self.get_year_week_tuple() self.month = self.get_month_by_week() self.week_date = self.get_calDay_by_dateInfo() self.month_old = int(self.month) self.ym = f"{self.year}_{self.month_old}" self.partitions_by = ['site_name', 'date_type', 'date_info'] # 自定义udf函数相关对象 self.u_launch_time = self.spark.udf.register("u_launch_time", self.udf_launch_time, IntegerType()) self.u_days_diff = self.spark.udf.register("u_days_diff", self.udf_days_diff, IntegerType()) self.u_judge_package_quantity = F.udf(udf_get_package_quantity,IntegerType()) # 初始化全局变量df--ods获取数据的原始df self.df_seller_acount_syn = self.spark.sql("select 1+1;") self.df_seller_asin_acount = self.spark.sql("select 1+1;") self.df_seller_account_feedback_report = self.spark.sql("select 1+1;") self.df_asin_detail_product = self.spark.sql("select 1+1;") self.df_asin_detail = self.spark.sql("select 1+1;") # 初始化全局变量df--dwd层转换输出的df self.df_seller = self.spark.sql("select 1+1;") self.df_seller_agg = self.spark.sql("select 1+1;") self.df_seller_detail = self.spark.sql("select 1+1;") self.df_asin_parent = self.spark.sql("select 1+1;") # 初始化全局变量df -- 中间过程使用 self.df_seller_top = self.spark.sql("select 1+1;") self.df_asin_counts = self.spark.sql("select 1+1;") self.df_asin_new_counts = self.spark.sql("select 1+1;") self.df_variat_ratio = self.spark.sql("select 1+1;") @staticmethod def udf_launch_time(launch_time, cal_day): # 针对launch_time字段进行计算与当前日期的间隔天数 if "-" in str(launch_time): # print(DwdFeedBack.week_date) asin_date_list = str(launch_time).split("-") try: asin_date = datetime.date(year=int(asin_date_list[0]), month=int(asin_date_list[1]), day=int(asin_date_list[2])) if not cal_day.strip(): week_date = '2022-11-02' else: week_date = cal_day cur_date_list = str(week_date).split("-") cur_date = datetime.date(year=int(cur_date_list[0]), month=int(cur_date_list[1]), day=int(cur_date_list[2])) days_diff = (cur_date - asin_date).days except Exception as e: print(e, traceback.format_exc()) print(launch_time, asin_date_list) days_diff = 999999 else: days_diff = 999999 return days_diff @staticmethod def udf_days_diff(days_diff): # 针对days_diff字段进行计算180天,判断是否为新品 if days_diff <= 180: return 1 elif days_diff == 999999: return None else: return 0 def get_month_by_week(self): if self.date_type == "week": df = self.df_date.loc[self.df_date.year_week == self.year_week] month = list(df.month)[0] if int(month) < 10: month = "0" + str(month) print("month:", month) return str(month) def get_calDay_by_dateInfo(self): if self.date_type in ['day', 'last30day']: return str(self.date_info) # 如果为 周、月则取该周、月的最后一日,作为新品计算基准日 if self.date_type in ['week', 'month']: self.df_date = self.spark.sql(f"select * from dim_date_20_to_30;") df = self.df_date.toPandas() df_loc = df.loc[df[f'year_{self.date_type}'] == f"{self.date_info}"] self.date_info_tuple = tuple(df_loc.date) # week_date_info_tuple = tuple(df_loc.date) # last_index = len(week_date_info_tuple) # print("self.cal_day:", str(tuple(df_loc.date)[last_index - 1])) # # 判断长度,取最后一日 # return str(tuple(df_loc.date)[last_index - 1]) # 取周第一天、月的第一天 print("self.cal_day:", str(list(df_loc.date)[0])) return str(list(df_loc.date)[0]) # 1.获取原始数据 def read_data(self): # 获取ods_seller相关原始表 print("获取 ods_seller_account_syn") sql = f"select id as account_id, account_name, {self.week} as week from ods_seller_account_syn " \ f"where site_name='{self.site_name}' " self.df_seller_acount_syn = self.spark.sql(sqlQuery=sql) print(sql) # print("self.df_seller_acount_syn:", self.df_seller_acount_syn.show(10, truncate=False)) print("获取 ods_seller_asin_account") sql = f"select account_name, asin from ods_seller_asin_account " \ f"where site_name = '{self.site_name}' group by account_name, asin" self.df_seller_asin_acount = self.spark.sql(sqlQuery=sql) print(sql) # print("self.df_seller_asin_acount:", self.df_seller_asin_acount.show(10, truncate=False)) print("获取 ods_seller_account_feedback_report") sql = f"select account_id,num as asin_counts from ods_seller_account_feedback_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}'" self.df_seller_account_feedback_report = self.spark.sql(sqlQuery=sql) print(sql) # print("self.df_seller_account_feedback_report", self.df_seller_account_feedback_report.show(10, truncate=False)) # 获取ods_asin相关原始表 print("获取 ods_asin_detail_product") sql = f"select account_id, asin, price, rating, total_comments, {self.week} as week, row_num, created_at " \ f"from ods_asin_detail_product where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}'" self.df_asin_detail_product = self.spark.sql(sqlQuery=sql) print(sql) # print("self.df_asin_detail_product1:", self.df_asin_detail_product.show(10, truncate=False)) self.df_asin_detail_product = self.df_asin_detail_product.sort(['account_id', "row_num", "created_at"], ascending=[True, True, False]) self.df_asin_detail_product = self.df_asin_detail_product.dropDuplicates(['account_id', "row_num"]) #print("self.df_asin_detail_product2:", self.df_asin_detail_product.show(10, truncate=False)) print("获取 dim_asin_history_info") sql = f"select asin, asin_title,asin_launch_time as launch_time from dim_cal_asin_history_detail " \ f"where site_name='{self.site_name}'" self.df_asin_detail = self.spark.sql(sqlQuery=sql) print(sql) # print("self.df_asin_detail:", self.df_asin_detail.show(10, truncate=False)) print("获取 dim_asin_variation_info") sql = f"select asin,parent_asin from dim_asin_variation_info " \ f"where site_name='{self.site_name}'" \ f" and asin != parent_asin " self.df_asin_parent = self.spark.sql(sqlQuery=sql) print(sql) # print("self.df_asin_parent:", self.df_asin_parent.show(10, truncate=False)) def handle_data(self): self.handle_asin_top20_avg() self.handle_asin_count() self.handle_save_date() # 2.1处理top20产品的平均指标值-按account聚合统计 def handle_asin_top20_avg(self): print("处理asin_detail_product的top20指标逻辑") self.df_seller_top = self.df_asin_detail_product.filter("row_num<=20") self.df_seller_top = self.df_seller_top.groupby('account_id').agg( F.avg('price').alias('top_20_avg_price'), F.avg('rating').alias('top_20_avg_rating'), F.avg('total_comments').alias('top_20_avg_total_comments'), ) self.df_seller_top = self.df_seller_top.withColumn("top_20_avg_total_comments", ceil(self.df_seller_top.top_20_avg_total_comments)) # print("df_seller_top:", self.df_seller_top.show(10, truncate=False)) # 2.2 计算asin_count和asin_new_count逻辑 def handle_asin_count(self): print("处理df_seller_account相关数据逻辑") # 让 df_seller_acount_syn 与 df_seller_asin_acount 和 df_seller_account_feedback_report 关联得到具体明细 self.df_seller = self.df_seller_acount_syn. \ join(self.df_seller_asin_acount, on='account_name', how='inner'). \ join(self.df_asin_detail, on='asin', how='left') # 标记是否新品标签 self.df_seller = self.df_seller.withColumn("days_diff", self.u_launch_time( self.df_seller.launch_time, F.lit(self.week_date))) # 通过days_diff走自定义udf,生成is_asin_new字段(是否asin新品标记) self.df_seller = self.df_seller.withColumn("is_asin_new", self.u_days_diff( self.df_seller.days_diff)) # 做缓存 self.df_seller = self.df_seller.cache() # 计算店铺-asin的打包数量 self.df_seller = self.df_seller.withColumn('asin_package_quantity', self.u_judge_package_quantity(F.col('asin_title'))) # 符合打包数量>=2的商品数标识 self.df_seller = self.df_seller.withColumn('is_pq_flag', F.when(F.col('asin_package_quantity') >= 2, F.lit(1))) # 计算asin_counts_exists与asin_new_counts指标 self.df_asin_counts = self.df_seller.groupby(['account_id', 'account_name']).agg( F.count('asin').alias('asin_counts_exists'), F.sum('is_asin_new').alias('asin_new_counts'), F.sum('is_pq_flag').alias('fb_package_quantity_num') ) # 关联report表,获取店铺的商品数量 self.df_asin_counts = self.df_asin_counts\ .join(self.df_seller_account_feedback_report, on='account_id', how='left') # 计算 counts_new_rate asin新品比率 self.df_asin_counts = self.df_asin_counts.withColumn('counts_new_rate', F.round(F.col('asin_new_counts')/F.col('asin_counts_exists'), 3)) # 计算店铺打包数量的比例 self.df_asin_counts = self.df_asin_counts.withColumn('fb_package_quantity_prop', F.round(F.col('fb_package_quantity_num') / F.col('asin_counts_exists'), 3)) # 计算店铺多数量占比 有变体asin数量/(有变体asin数量+单产品asin数量) 逻辑实现 self.df_variat_ratio = self.df_seller.join(self.df_asin_parent, on='asin', how='left') # 没有匹配上asin_parent的给他附上自己的asin值:1.可能是父asin最顶端; 2.可能是asin单产品 self.df_variat_ratio = self.df_variat_ratio.withColumn('parent_asin_new', F.when(F.col('parent_asin').isNull(), F.col('asin')) .otherwise(F.col('parent_asin'))) # 按照account_name、asin_parent分组统计数量 self.df_variat_ratio = self.df_variat_ratio.groupby(['account_id', 'account_name', 'parent_asin_new'])\ .agg( F.count('asin').alias('asin_son_count') ) # 打上多变体标签 self.df_variat_ratio = self.df_variat_ratio.withColumn('is_variat_flag', F.when(F.col('asin_son_count') > 1, F.lit(1))) # 按照account_name分组,得出分子分母 self.df_variat_ratio = self.df_variat_ratio.groupby(['account_id', 'account_name'])\ .agg( F.sum('is_variat_flag').alias('variat_num'), F.count('parent_asin_new').alias('total_asin_num')) self.df_variat_ratio = self.df_variat_ratio.na.fill({'variat_num': 0, 'total_asin_num': 0}) ## 计算店铺多数量占比 有变体asin数量/(有变体asin数量+单产品asin数量) self.df_variat_ratio = self.df_variat_ratio.withColumn('fb_variat_prop', F.round(F.col('variat_num')/F.col('total_asin_num'), 3)) self.df_variat_ratio = self.df_variat_ratio.drop('account_name') # 2.4 指标整合逻辑 def handle_save_date(self): # seller_detail self.df_seller = self.df_seller\ .select( F.col('account_id'), F.col('account_name'), F.col('asin'), F.col('launch_time'), F.col('days_diff'), F.col('is_asin_new'), # 遗留的无用字段 F.lit(0).alias('asin_counts'), F.lit(0).alias('asin_new_counts'), F.lit(0).alias('counts_new_rate'), F.lit(0).alias('asin_counts_exists'), # 分区字段补全 F.lit(self.week).cast('int').alias('week'), F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"), F.lit(self.date_info).alias("date_info") ) # 关联top20avg相关计算指标以及计算店铺多数量占比计算指标 self.df_seller_agg = self.df_asin_counts\ .join(self.df_seller_top, on='account_id', how='left')\ .join(self.df_variat_ratio, on='account_id', how='left') self.df_seller_agg = self.df_seller_agg.select( F.col('account_id'), F.col('account_name'), F.col('asin_new_counts'), F.col('asin_counts'), F.col('counts_new_rate'), F.col('top_20_avg_price'), F.col('top_20_avg_rating'), F.col('top_20_avg_total_comments'), F.col('asin_counts_exists'), F.col('variat_num').alias('fb_variat_num'), F.col('total_asin_num').alias('fb_asin_total'), F.col('fb_variat_prop'), F.col('fb_package_quantity_prop'), F.col('fb_package_quantity_num'), # 分区字段补全 F.lit(self.week).cast('int').alias('week'), F.lit(self.ym).alias('ym'), F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"), F.lit(self.date_info).alias("date_info") ) # 重写数据写入方法,对应两张目标表 def save_data(self): self.reset_partitions(partitions_num=1) self.save_data_common( df_save=self.df_seller_agg, db_save='dwd_seller_asin_account_agg', partitions_num=self.partitions_num, partitions_by=self.partitions_by ) self.reset_partitions(partitions_num=10) self.save_data_common( df_save=self.df_seller, db_save='dwd_seller_asin_account_detail', partitions_num=self.partitions_num, partitions_by=self.partitions_by ) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwdFeedBack(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()