import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil, DateTypes from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F, Window from pyspark.sql.types import IntegerType, StringType from pyspark.sql.dataframe import DataFrame from yswg_utils.common_udf import udf_detect_phrase_reg class DwtAbaLast365(object): def __init__(self, site_name, date_type, date_info): self.site_name = site_name self.date_info = date_info self.date_type = date_type self.date_type_original = DateTypes.month.name assert date_type in [DateTypes.month.name, DateTypes.year.name], "date_type 输入有误!!" app_name = f"{self.__class__.__name__}:{self.site_name}:{self.date_type}:{self.date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.hive_tb = "dwt_aba_last365" # 全局df self.df_base = self.spark.sql(f"select 1+1;") self.df_orders = self.spark.sql(f"select 1+1;") self.df_st_sv_rank = self.spark.sql(f"select 1+1;") self.df_history = self.spark.sql(f"select 1+1;") self.df_last_year = self.spark.sql(f"select 1+1;") self.df_sv_change_rate = self.spark.sql(f"select 1+1;") self.df_label = self.spark.sql(f"select 1+1;") self.df_base_lastest = self.spark.sql(f"select 1+1;") self.df_change_rate_lastest = self.spark.sql(f"select 1+1;") # 过去12月list self.last_12_month = [] for i in range(0, 12): self.last_12_month.append(CommonUtil.get_month_offset(self.date_info, -i)) # 12个月前 self.last_year_month = CommonUtil.get_month_offset(self.date_info, -12) pass # 对指定的行进行行转列 def pivot_df(self, last_12_month: list, df_all: DataFrame, df_agg: DataFrame, group_col: str, pivot_col: str, agg_col_arr: list): """ 对指定的行进行行转列 """ # 列名如下: 2024-07_st_num|2024-06_st_num|……|2024-07_bsr_orders|2024-06_bsr_orders|…… df_tmp = df_all.groupBy(group_col).pivot(pivot_col, last_12_month).agg( *list(map(lambda col: F.first(col).alias(col), agg_col_arr)) ).cache() # 列名如下: st_num1|st_num2|……|bsr_orders1|bsr_orders2|…… for index in range(0, len(last_12_month)): for col in agg_col_arr: prefix = last_12_month[index] month = int(prefix.split('-')[-1]) df_tmp = df_tmp.withColumnRenamed(f"{prefix}_{col}", f"{col}{month}") return df_agg.join(df_tmp, group_col, "inner") def run(self): self.read_data() self.handle_data() self.save_data() def read_data(self): # ABA月数据 sql1 = f""" select search_term, id, st_bsr_cate_1_id_new as category_id, bsr_orders, search_volume, st_ao_avg, st_ao_val_rate, price_avg, weight_avg, volume_avg, rating_avg, total_comments_avg, total_asin_num, aadd_proportion, sp_proportion, fbm_proportion, cn_proportion, amzon_proportion, top3_seller_orders, top3_seller_bsr_orders, top3_brand_orders, top3_brand_bsr_orders, page3_brand_num, page3_seller_num, new_bsr_orders_proportion, new_asin_proportion, supply_demand, max_num, most_proportion, gross_profit_fee_sea, gross_profit_fee_air, st_bsr_cate_current_id_new as category_current_id, color_proportion, max_num_asin, is_self_max_num_asin, multi_color_proportion, multi_size_proportion, is_new_market_segment, market_cycle_type, brand_monopoly, seller_monopoly, is_ascending_text, is_search_text, st_word_num, date_info, st_num, rank from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = '{self.date_type_original}' and date_info in ({CommonUtil.list_to_insql(self.last_12_month)}) """ self.df_base = self.spark.sql(sql1).repartition(80, "search_term", "date_info").cache() # 搜索词同比、环比 sql2 = f""" select search_term, rank_change_rate as rank_change_rate_lastest, rank_rate_of_change as rank_rate_of_change_lastest from dwt_aba_last_change_rate where site_name = '{self.site_name}' and date_type = '{self.date_type_original}' and date_info = '{self.date_info}'; """ self.df_change_rate_lastest = self.spark.sql(sql2).repartition(80, 'search_term').cache() # 搜索词预估销量 if date_info < '2023-09': old_list = list(filter(lambda it: it < '2022-10', self.last_12_month)) new_list = list(filter(lambda it: it >= '2022-10', self.last_12_month)) sql3 = f""" select search_term, st_search_sum as orders, date_info from dim_st_detail where site_name = '{self.site_name}' and date_type = 'month_old' and date_info in ({CommonUtil.list_to_insql(old_list)}) union select search_term, orders as orders, date_info from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = 'month' and date_info in ({CommonUtil.list_to_insql(new_list)}) """ else: sql3 = f""" select search_term, orders as orders, date_info from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = 'month' and date_info in ({CommonUtil.list_to_insql(self.last_12_month)}) """ self.df_orders = self.spark.sql(sql3).repartition(80, "search_term", "date_info").cache() # 搜索量排名:本次和上年度排名 sql = f""" select search_term_id, collect_set(rank)[0] as rank, collect_set(last_rank)[0] as last_rank from ( select search_term_id, case date_info when '{self.date_info}' then sv_rank end as rank, case date_info when '{self.last_year_month}' then sv_rank end as last_rank from dwt_st_sv_last365 where site_name = '{self.site_name}' and date_info in ('{self.date_info}', '{self.last_year_month}') ) tmp group by search_term_id; """ self.df_st_sv_rank = self.spark.sql(sql).na.fill({"last_rank": 0}).cache() # 历史新增词识别:读取月表所有历史数据,判断是否为历史新增 sql = f""" select search_term, 0 as is_history_first_text, min(date_info) as history_first_appear_month from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = '{self.date_type_original}' and date_info <= '{self.last_year_month}' group by search_term; """ self.df_history = self.spark.sql(sql).repartition(80, 'search_term').cache() # 新增词识别:读取年表同比数据,判断是否为近1年新增 sql = f""" select search_term, 0 as is_first_text from dwt_aba_last365 where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.last_year_month}'; """ self.df_last_year = self.spark.sql(sql).repartition(80, 'search_term').cache() # 搜索量同比增长识别:判断持续上升/下降 sql = f""" select search_term, search_volume_change_rate from dwt_aba_last_change_rate where site_name = '{self.site_name}' and date_type = '{self.date_type_original}' and date_info in ({CommonUtil.list_to_insql(self.last_12_month)}); """ self.df_sv_change_rate = self.spark.sql(sql).repartition(80, 'search_term').cache() # 影视+品牌标签识别 sql = f""" select search_term, asin_movie_type_count, total_asin_num, st_brand_label from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = '{self.date_type_original}' and date_info in ({CommonUtil.list_to_insql(self.last_12_month)}); """ self.df_label = self.spark.sql(sql).repartition(80, 'search_term').fillna({ 'asin_movie_type_count': 0, 'total_asin_num': 0 }).cache() def handle_data(self): # st最新月数据 self.handle_month_lastest() # 聚合字段处理 self.handle_agg() # 标签处理 self.handle_label() # 语种处理 self.handle_calc_lang() # 入库前处理 self.handle_save() def handle_month_lastest(self): # 保留最新的月数据 self.df_base_lastest = self.df_base.filter(f"date_info = '{self.date_info}'").select( "search_term", "rank", "color_proportion", "multi_color_proportion", "multi_size_proportion", "st_ao_avg", "st_ao_val_rate", "supply_demand", "total_asin_num", "new_asin_proportion", "bsr_orders", "new_bsr_orders_proportion", "price_avg", "weight_avg", "volume_avg", "page3_brand_num", "brand_monopoly", "page3_seller_num", "seller_monopoly", "aadd_proportion", "sp_proportion", "fbm_proportion", "cn_proportion", "amzon_proportion", "most_proportion", "max_num", "max_num_asin", "is_self_max_num_asin", "rating_avg", "total_comments_avg", "date_info" ).withColumnRenamed( 'rank', 'rank_lastest' ).withColumnRenamed( 'multi_color_proportion', 'multi_color_avg_proportion' ).withColumnRenamed( 'multi_size_proportion', 'multi_size_avg_proportion' ).withColumnRenamed( 'new_asin_proportion', 'new_asin_num_avg_monopoly' ).withColumnRenamed( 'new_bsr_orders_proportion', 'new_asin_bsr_orders_avg_monopoly' ).withColumnRenamed( 'most_proportion', 'most_avg_proportion' ).withColumnRenamed( 'date_info', 'appear_month_lastest' ).repartition(80, 'search_term').cache() def handle_agg(self): self.df_base = self.df_base.join( other=self.df_orders, on=["search_term", "date_info"], how="left" ) self.df_orders.unpersist() df_agg = self.df_base.groupBy("id").agg( F.first("search_term").alias("search_term"), F.first("category_id").alias("category_id"), F.first("category_current_id").alias("category_current_id"), F.expr("round(sum(top3_seller_orders)/12,4)").alias("top3_seller_orders"), F.expr("round(sum(top3_seller_bsr_orders)/12,4)").alias("top3_seller_bsr_orders"), F.expr("round(sum(top3_brand_orders)/12,4)").alias("top3_brand_orders"), F.expr("round(sum(top3_brand_bsr_orders)/12,4)").alias("top3_brand_bsr_orders"), F.expr("round(sum(gross_profit_fee_sea)/12,4)").alias("gross_profit_fee_sea"), F.expr("round(sum(gross_profit_fee_air)/12,4)").alias("gross_profit_fee_air"), F.sum(F.col("orders")).alias("orders"), F.sum(F.col("st_num")).alias("total_st_num"), F.max("st_word_num").alias("st_word_num"), # bsr销量最高对应的月 F.max(F.struct("bsr_orders", "date_info")).alias("tmp_row_1"), # 销量最高对应的月 F.max(F.struct("orders", "date_info")).alias("tmp_row_2"), # 首次出现对应的月 F.min("date_info").alias("first_appear_month"), # 所有出现的月 F.concat_ws( ",", F.sort_array(F.collect_set(F.split("date_info", "-")[1].cast(IntegerType()))) ).alias("total_appear_month"), # 是否新细分市场 非平均数算法 12个月都是新出现 表明同比年也是新出现 即 sum=12 表示为1 否则都是0 F.avg("is_new_market_segment").cast(IntegerType()).alias("is_new_market_segment"), # 同比是否是热搜词 热搜词:最近1月/年中,出现的次数大于80% 如果月热搜词 is_search_text的和>=10 则是热搜词 F.expr("sum(is_search_text) / 9.6").cast(IntegerType()).alias("is_search_text") ) # 行转列的字段 agg_col_arr = ['st_num', 'bsr_orders', 'orders', 'market_cycle_type', 'search_volume'] self.df_base = self.pivot_df( self.last_12_month, self.df_base, df_agg, "id", "date_info", agg_col_arr ).repartition(80, "search_term").cache() def handle_label(self): # 影视品牌标签 self.df_label = self.df_label.groupBy('search_term').agg( F.round((F.sum("asin_movie_type_count") / F.sum("total_asin_num")) * 100, 2).alias('movie_prop'), F.max('st_brand_label').alias('st_brand_label') ).withColumn( # 影视标记类型 0:非影视; 1:0< 比例 <= 20%; 2: 20% < 比例 <= 50%; 3:50% < 比例 'st_movie_label', F.when( (F.col("movie_prop") > 0) & (F.col("movie_prop") <= 20), F.lit(1) ).when( (F.col("movie_prop") > 20) & (F.col("movie_prop") <= 50), F.lit(2) ).when( (F.col("movie_prop") > 50), F.lit(3) ).otherwise(F.lit(0)) ).withColumn( 'st_brand_label', F.when(F.col('st_brand_label') == 1, F.lit(4)).otherwise(F.lit(0)) ).withColumn( 'st_movie_brand_label', F.sort_array(F.expr("array_distinct(array(st_movie_label, st_brand_label))")) ).withColumn( 'st_movie_brand_label', F.when( F.array_contains(F.col('st_movie_brand_label'), 0) & (F.size(F.col('st_movie_brand_label')) > 1), F.expr("filter(st_movie_brand_label, x -> x != 0)") ).otherwise(F.col('st_movie_brand_label')) ).withColumn( 'st_movie_brand_label', F.concat_ws(",", F.col('st_movie_brand_label')) ).select( 'search_term', 'st_movie_brand_label' ) self.df_base = self.df_base.join( self.df_label, on='search_term', how='left' ) # 上升词判断 self.df_base = self.df_base.join( self.df_st_sv_rank, self.df_base['id'].eqNullSafe(self.df_st_sv_rank['search_term_id']), "left" ).withColumn( "is_ascending_text", F.expr("rank / last_rank <= 0.5").cast(IntegerType()) ) # 新增词判断 self.df_base = self.df_base.join( self.df_last_year, on='search_term', how='left' ).fillna({'is_first_text': 1}) # 历史新增词判断 self.df_base = self.df_base.join( self.df_history, on='search_term', how='left' ).fillna({ 'is_history_first_text': 1 }).withColumn( 'history_first_appear_month', F.when(F.col('history_first_appear_month').isNotNull(), F.col('history_first_appear_month')) .otherwise(F.col('first_appear_month')) ) # 判断市场周期类型,优先保留最近月的数据,若为null则往前推 num = int(self.date_info.split('-')[-1]) fields_first_round = [F.col(f'market_cycle_type{i}') for i in range(num, 0, -1)] fields_second_round = [F.col(f'market_cycle_type{i}') for i in range(12, num, -1)] fields = fields_first_round + fields_second_round self.df_base = self.df_base.withColumn('market_cycle_type', F.coalesce(*fields)) # 持续上升、下降判断 self.df_sv_change_rate = self.df_sv_change_rate.withColumn( 'sv_rising_flag', F.when(F.col('search_volume_change_rate') > 0, 1).otherwise(0) ).withColumn( 'sv_decline_flag', F.when(F.col('search_volume_change_rate') < 0, 1).otherwise(0) ) # # 计算上升率、下降率 self.df_sv_change_rate = self.df_sv_change_rate.groupBy('search_term').agg( F.round(F.sum('sv_rising_flag') / F.count('search_term'), 4).alias('sv_rising_rate'), F.round(F.sum('sv_decline_flag') / F.count('search_term'), 4).alias('sv_decline_rate') ) self.df_base = self.df_base.join( self.df_sv_change_rate, on='search_term', how='left' ).withColumn( 'sv_change_rate_flag', F.when(F.col('sv_rising_rate') > 0.7, 1).when(F.col('sv_decline_rate') > 0.7, 2).otherwise(0) ).cache() self.df_label.unpersist() self.df_st_sv_rank.unpersist() self.df_last_year.unpersist() self.df_history.unpersist() self.df_sv_change_rate.unpersist() def handle_calc_lang(self): sql = """ select word, langs from big_data_selection.tmp_lang_word_frequency; """ lang_word_list = self.spark.sql(sql).collect() # 转为map lang_word_map = {row['word']: row['langs'] for row in lang_word_list} self.df_base = self.df_base.withColumn( "lang", F.coalesce(udf_detect_phrase_reg(lang_word_map)(F.col("search_term")).getField("lang"), F.lit("other")) ).cache() def handle_save(self): # 关联月数据 self.df_base = self.df_base.join( other=self.df_base_lastest, on="search_term", how="left" ).join( other=self.df_change_rate_lastest, on="search_term", how="left" ).cache() self.df_base_lastest.unpersist() self.df_change_rate_lastest.unpersist() self.df_base = self.df_base.select( F.col("id"), F.col("search_term"), F.col("category_id"), F.col("category_current_id"), F.col('rank').cast(IntegerType()), F.col("total_st_num"), # 后缀数字表示对应的月份,如:2024-07则为st_num7,2023-12则为st_num12。下面字段同理 F.col("st_num1").cast(IntegerType()), F.col("st_num2").cast(IntegerType()), F.col("st_num3").cast(IntegerType()), F.col("st_num4").cast(IntegerType()), F.col("st_num5").cast(IntegerType()), F.col("st_num6").cast(IntegerType()), F.col("st_num7").cast(IntegerType()), F.col("st_num8").cast(IntegerType()), F.col("st_num9").cast(IntegerType()), F.col("st_num10").cast(IntegerType()), F.col("st_num11").cast(IntegerType()), F.col("st_num12").cast(IntegerType()), F.col("orders1").cast(IntegerType()), F.col("orders2").cast(IntegerType()), F.col("orders3").cast(IntegerType()), F.col("orders4").cast(IntegerType()), F.col("orders5").cast(IntegerType()), F.col("orders6").cast(IntegerType()), F.col("orders7").cast(IntegerType()), F.col("orders8").cast(IntegerType()), F.col("orders9").cast(IntegerType()), F.col("orders10").cast(IntegerType()), F.col("orders11").cast(IntegerType()), F.col("orders12").cast(IntegerType()), F.col("bsr_orders1").cast(IntegerType()), F.col("bsr_orders2").cast(IntegerType()), F.col("bsr_orders3").cast(IntegerType()), F.col("bsr_orders4").cast(IntegerType()), F.col("bsr_orders5").cast(IntegerType()), F.col("bsr_orders6").cast(IntegerType()), F.col("bsr_orders7").cast(IntegerType()), F.col("bsr_orders8").cast(IntegerType()), F.col("bsr_orders9").cast(IntegerType()), F.col("bsr_orders10").cast(IntegerType()), F.col("bsr_orders11").cast(IntegerType()), F.col("bsr_orders12").cast(IntegerType()), F.col("market_cycle_type1").cast(IntegerType()), F.col("market_cycle_type2").cast(IntegerType()), F.col("market_cycle_type3").cast(IntegerType()), F.col("market_cycle_type4").cast(IntegerType()), F.col("market_cycle_type5").cast(IntegerType()), F.col("market_cycle_type6").cast(IntegerType()), F.col("market_cycle_type7").cast(IntegerType()), F.col("market_cycle_type8").cast(IntegerType()), F.col("market_cycle_type9").cast(IntegerType()), F.col("market_cycle_type10").cast(IntegerType()), F.col("market_cycle_type11").cast(IntegerType()), F.col("market_cycle_type12").cast(IntegerType()), F.col("search_volume1").cast(IntegerType()), F.col("search_volume2").cast(IntegerType()), F.col("search_volume3").cast(IntegerType()), F.col("search_volume4").cast(IntegerType()), F.col("search_volume5").cast(IntegerType()), F.col("search_volume6").cast(IntegerType()), F.col("search_volume7").cast(IntegerType()), F.col("search_volume8").cast(IntegerType()), F.col("search_volume9").cast(IntegerType()), F.col("search_volume10").cast(IntegerType()), F.col("search_volume11").cast(IntegerType()), F.col("search_volume12").cast(IntegerType()), F.col("st_ao_avg"), F.expr("round(st_ao_val_rate, 4)").alias("st_ao_val_rate"), F.col("price_avg"), F.col("weight_avg"), F.col("volume_avg"), F.col("rating_avg"), F.col("total_comments_avg"), F.col("multi_size_avg_proportion"), F.col("multi_color_avg_proportion"), F.col("brand_monopoly"), F.col("seller_monopoly"), F.col("most_avg_proportion"), F.col("supply_demand"), F.col("aadd_proportion"), F.col("sp_proportion"), F.col("fbm_proportion"), F.col("cn_proportion"), F.col("amzon_proportion"), F.col("top3_seller_orders"), F.col("top3_seller_bsr_orders"), F.col("top3_brand_orders"), F.col("top3_brand_bsr_orders"), F.col("page3_brand_num").cast(IntegerType()), F.col("page3_seller_num").cast(IntegerType()), F.col("new_asin_num_avg_monopoly"), F.col("new_asin_bsr_orders_avg_monopoly"), F.col("orders").cast(IntegerType()), F.col("bsr_orders"), F.col("gross_profit_fee_sea"), F.col("gross_profit_fee_air"), F.col("color_proportion"), F.col("total_asin_num"), F.col("max_num"), F.col("max_num_asin"), F.col("is_self_max_num_asin"), F.col("tmp_row_1").getField("date_info").alias("max_bsr_orders_month"), F.col("tmp_row_2").getField("date_info").alias("max_orders_month"), F.col("is_new_market_segment").alias("is_new_market_segment"), F.col("is_ascending_text").alias("is_ascending_text"), F.col("is_search_text").alias("is_search_text"), F.col("st_word_num").alias("st_word_num"), F.current_date().alias("updated_time").cast(StringType()), F.current_date().alias("created_time").cast(StringType()), F.lit(None).alias("usr_mask_type"), F.lit(None).alias("usr_mask_progress"), F.col("lang"), # 历史新增词判断,对比所有历史数据 F.col("is_history_first_text"), # 该词首次出现的月份 F.col("history_first_appear_month"), # 新增词判断,同比上一年 F.col("is_first_text"), # 该词今年首次出现的月份 F.col("first_appear_month"), # 搜索量上升率 F.col("sv_rising_rate"), # 搜索量下降率 F.col("sv_decline_rate"), # 持续上升或下降标签,1上升 2下降 0都不是 F.col("sv_change_rate_flag"), # 影视品牌标签 F.col("st_movie_brand_label"), # 该词今年出现的所有月份 F.col("total_appear_month"), # 市场周期类型 F.col("market_cycle_type"), # 最新月份的排名 F.col("rank_lastest"), # 最新月份的同比 F.col("rank_change_rate_lastest"), # 最新月份的环比 F.col("rank_rate_of_change_lastest"), # 最新出现的月份 F.col("appear_month_lastest"), F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"), F.lit(self.date_info).alias("date_info") ) # 四个季度bsr销量 self.df_base = self.df_base.withColumn( "q1_bsr_orders", F.expr("coalesce(bsr_orders1,0) + coalesce(bsr_orders2,0) + coalesce(bsr_orders3,0)") ).withColumn( "q2_bsr_orders", F.expr("coalesce(bsr_orders4,0) + coalesce(bsr_orders5,0) + coalesce(bsr_orders6,0)") ).withColumn( "q3_bsr_orders", F.expr("coalesce(bsr_orders7,0) + coalesce(bsr_orders8,0) + coalesce(bsr_orders9,0)") ).withColumn( "q4_bsr_orders", F.expr("coalesce(bsr_orders10,0) + coalesce(bsr_orders11,0) + coalesce(bsr_orders12,0)") ) # 四个季度预估销量 self.df_base = self.df_base.withColumn( "q1_orders", F.expr("coalesce(orders1,0) + coalesce(orders2,0) + coalesce(orders3,0)") ).withColumn( "q2_orders", F.expr("coalesce(orders4,0) + coalesce(orders5,0) + coalesce(orders6,0)") ).withColumn( "q3_orders", F.expr("coalesce(orders7,0) + coalesce(orders8,0) + coalesce(orders9,0)") ).withColumn( "q4_orders", F.expr("coalesce(orders10,0) + coalesce(orders11,0) + coalesce(orders12,0)") ) # top_rank 兼容 self.df_base = self.df_base.withColumn( "top_rank", F.col("rank") ).na.fill({ "rank": 0, "top_rank": 0 }).cache() def save_data(self): # 重新分区 self.df_base = self.df_base.repartition(20) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) hdfs_path = CommonUtil.build_hdfs_path( self.hive_tb, partition_dict={ "site_name": self.site_name, "date_type": self.date_type, "date_info": self.date_info, } ) print(f"清除hdfs目录中:{hdfs_path}") HdfsUtils.delete_file_in_folder(hdfs_path) self.df_base.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) obj = DwtAbaLast365(site_name=site_name, date_type=date_type, date_info=date_info) obj.run()