import os import sys import time sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import IntegerType from utils.db_util import DBUtil from utils.spark_util import SparkUtil from yswg_utils.common_udf import udf_detect_phrase_reg class DwtAbaStAnalytics(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-40"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dwt_aba_st_analytics" self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}") # 写入、分区初始化 self.df_save = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(partitions_num=10) # 初始化列表 self.sp_symbols = [] # 初始化全局df self.df_st_measure = self.spark.sql(f"select 1+1;") self.df_asin_measure = self.spark.sql(f"select 1+1;") self.df_st_asin_measure = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_seller_asin_info = self.spark.sql(f"select 1+1;") self.df_st_asin_cal = self.spark.sql(f"select 1+1;") self.df_st_asin_join = self.spark.sql(f"select 1+1;") self.df_seller_asin_country = self.spark.sql(f"select 1+1;") self.df_st_brand_cal = self.spark.sql(f"select 1+1;") self.df_top3_st_brand_cal = self.spark.sql(f"select 1+1;") self.df_st_seller_cal = self.spark.sql(f"select 1+1;") self.df_top3_st_seller_cal = self.spark.sql(f"select 1+1;") self.df_st_num_stats = self.spark.sql(f"select 1+1;") self.df_st_detail = self.spark.sql(f"select 1+1;") self.df_st_key = self.spark.sql(f"select 1+1;") self.df_st_market = self.spark.sql(f"select 1+1;") self.df_st_volume_fba = self.spark.sql(f"select 1+1;") self.df_st_brand = self.spark.sql(f"select 1+1;") self.df_asin_label = self.spark.sql(f"select 1+1;") self.df_is_hidden_cate = self.spark.sql(f"select 1+1;") # 自定义udf函数注册 self.u_contains = self.spark.udf.register('u_contains', self.udf_contains, IntegerType()) self.u_judge_color = self.spark.udf.register('u_judge_color', self.udf_judge_color, IntegerType()) self.u_judge_title_color = self.spark.udf.register( 'u_judge_title_color', self.udf_judge_title_color, IntegerType() ) self.u_judge_multi_size = self.spark.udf.register( 'u_judge_multi_size', self.udf_judge_multi_size, IntegerType() ) # 解析aba搜索词的拆词个数 def st_word_count(self, sp_symbols): def udf_st_word_count(name): # 特殊字符基准列表---迁移到数据库维护 -已处理 # sp_symbols = ['?', '!', '-', '%', '&', '|'] split_list = name.split(" ") # 取切割list中包含的特殊字符 sp_list = list(set(split_list).intersection(set(sp_symbols))) # 排除掉特殊list中的特殊字符 word_list = list(filter(lambda x: x not in sp_list, split_list)) word_count = len(word_list) # 存在多个特殊字符都设定为 1 if len(sp_list) > 0: symbol_count = 1 else: symbol_count = 0 return word_count + symbol_count return F.udf(udf_st_word_count, IntegerType()) @staticmethod def udf_contains(sub, text): if text is None: return None if str(sub).lower() in str(text).lower(): return 1 else: return 0 # @staticmethod # def udf_get_volume(volume): # # print("get_volume", volume) # volume = str(volume) # if volume == "null": # return None # else: # pattern = r"\d+\.?\d*" # volumeList = re.findall(pattern, volume) # if len(volumeList): # volumeList = list(map(float, volumeList)) # result = reduce((lambda x, y: x * y), volumeList) # return result # else: # return None @staticmethod def udf_judge_color(color): if color is None: return None color = str(color).lower() color_len = len(color) if color not in ['null', 'none'] and color_len > 1: return 1 else: return 0 @staticmethod def udf_judge_title_color(asin_title): if asin_title is None: return None title = str(asin_title).lower() modeTypes = ['colorful', 'assorted color', 'multi color'] for color in modeTypes: if color in title: return 1 return 0 @staticmethod def udf_judge_multi_size(size, style): size = str(size).lower() style = str(style).lower() # 变体表中即有size又有style时,取size进行计数。如果无size,则判断是否有style进行计数 if size not in ['none', 'null']: return 1 else: if style not in ['none', 'null']: return 1 return 0 def read_data(self): # 一些不涵盖month_old的分区,重定义成month,其他正常 spe_date_type = 'month' if 'month_old' == self.date_type else self.date_type # 获取ods_st_key, st唯一主键 sql = f""" select search_term, cast(st_key as int) as id from ods_st_key where site_name = '{self.site_name}' """ self.df_st_key = self.spark.sql(sqlQuery=sql) self.df_st_key = self.df_st_key.repartition(80, 'search_term').cache() print("self.df_st_key:") self.df_st_key.show(10, truncate=True) # 获取dwd_st_measure 事实表 sql = f""" select search_term, st_zr_orders, st_bsr_orders, st_ao_val as st_ao_avg, st_ao_val_rate, st_zr_page1_title_appear_rate as page1_title_proportion, null as st_4_20_ao_avg, null as st_4_20_ao_rate, st_volume_avg as volume_avg, st_weight_avg as weight_avg, st_price_avg as price_avg, st_zr_page123_title_appear_rate, st_sp_page123_title_appear_rate, st_zr_flow_proportion, st_ao_val_matrix, st_flow_proportion_matrix, st_zr_counts, st_sp_counts, st_self_asin_counts, st_self_asin_proportion from dwd_st_measure where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_st_measure = self.spark.sql(sqlQuery=sql) self.df_st_measure = self.df_st_measure.repartition(80, 'search_term').cache() print("self.df_st_measure:") self.df_st_measure.show(10, truncate=True) # 获取dwd_st_asin_measure 事实表 sql = f""" select search_term, asin from dwd_st_asin_measure where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_st_asin_measure = self.spark.sql(sqlQuery=sql) self.df_st_asin_measure = self.df_st_asin_measure.repartition(80, 'asin').cache() print("self.df_st_asin_measure:") self.df_st_asin_measure.show(10, truncate=True) # 获取dwd_asin_measure 事实表 sql = f""" select asin, asin_bsr_orders, asin_zr_orders, asin_amazon_orders from dwd_asin_measure where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_asin_measure = self.spark.sql(sqlQuery=sql) self.df_asin_measure = self.df_asin_measure.repartition(80, 'asin').cache() print("self.df_asin_measure:") self.df_asin_measure.show(10, truncate=True) # 获取dim_asin_detail表 sql = f""" select asin, asin_title, asin_title_len, asin_category_desc, asin_rank, asin_color, asin_size, asin_style, asin_price, asin_rating, asin_total_comments, asin_material, asin_brand_name, bsr_cate_1_id, asin_buy_box_seller_type, asin_is_amazon, asin_is_fba, asin_is_fbm, asin_is_other, asin_is_sale, asin_launch_time, asin_is_new, asin_img_num, asin_img_type, asin_is_picture, asin_is_video, asin_is_aadd from dim_asin_detail where site_name = '{self.site_name}' and date_type = '{spe_date_type}' and date_info = '{self.date_info}' """ self.df_asin_detail = self.spark.sql(sqlQuery=sql) self.df_asin_detail = self.df_asin_detail.repartition(80, 'asin').cache() print("self.df_asin_detail:") self.df_asin_detail.show(10, truncate=True) # 仅获取 asin和country_name,对country_name进行了聚合处理 sql = f""" select asin, concat_ws(\",\",collect_list(cast(fd_country_name as string))) as country_name from dim_fd_asin_info where site_name = '{self.site_name}' group by asin; """ self.df_seller_asin_country = self.spark.sql(sqlQuery=sql) self.df_seller_asin_country = self.df_seller_asin_country.repartition(80, 'asin').cache() print("self.df_seller_asin_country:") self.df_seller_asin_country.show(10, truncate=True) # 获取 dim_fd_asin_info 表 sql = f""" select asin, fd_unique as account_id, fd_country_name as country_name from dim_fd_asin_info where site_name = '{self.site_name}' """ self.df_seller_asin_info = self.spark.sql(sqlQuery=sql) self.df_seller_asin_info = self.df_seller_asin_info.drop_duplicates(['asin']).repartition(80, 'asin').cache() print("self.df_seller_asin_info:") self.df_seller_asin_info.show(10, truncate=True) # 获取 dim_st_detail asin1-3共享点击信息表 sql = f""" select search_term, st_rank as rank, st_asin1 as asin1, st_asin2 as asin2, st_asin3 as asin3, st_click_share1 as click_share1, st_click_share2 as click_share2, st_click_share3 as click_share3, st_click_share_sum as total_click_share, st_is_new_market_segment as is_new_market_segment, st_conversion_share1 as conversion_share1, st_conversion_share2 as conversion_share2, st_conversion_share3 as conversion_share3, st_conversion_share_sum as total_conversion_share, st_quantity_being_sold as quantity_being_sold, cast(st_bsr_cate_1_id as int) as category_id, st_search_num as search_volume, st_is_first_text as is_first_text, st_is_ascending_text as is_ascending_text, st_is_search_text as is_search_text, cast(st_bsr_cate_current_id as int) as category_current_id, st_appear_history_counts as st_num, cast((st_quantity_being_sold / st_search_num) as decimal(10, 3)) as supply_demand, st_brand1, st_category1, st_brand2, st_category2, st_brand3, st_category3, st_bsr_cate_1_id_new, st_bsr_cate_current_id_new, if(st_appear_history_counts>=4 and (st_click_share_sum > st_conversion_share_sum),1,0) as is_high_return_text, date_format(st_updated_time, 'yyyy-MM-dd HH:mm:ss') as st_crawl_date, st_competition_level from dim_st_detail where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_st_detail = self.spark.sql(sqlQuery=sql) self.df_st_detail = self.df_st_detail.repartition(80, 'search_term').cache() print("self.df_st_detail:") self.df_st_detail.show(10, truncate=True) # 获取dws_st_num_stats表 取max_num、most_proportion sql = f""" select search_term, cast(max_num as double) as max_num, most_proportion, max_num_asin, is_self_max_num_asin from dws_st_num_stats where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' and max_num_asin is not null """ self.df_st_num_stats = self.spark.sql(sqlQuery=sql) self.df_st_num_stats = self.df_st_num_stats.repartition(80, 'search_term').cache() print("self.df_st_num_stats:") self.df_st_num_stats.show(10, truncate=True) # 获取dwt_st_market表 取market_cycle_type sql = f""" select search_term, cast(market_cycle_type as int ) as market_cycle_type from dwt_st_market where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_st_market = self.spark.sql(sqlQuery=sql) self.df_st_market = self.df_st_market.repartition(80, 'search_term').cache() print("self.df_st_market:") self.df_st_market.show(10, truncate=True) # 获取dwd_st_volume_fba 取gross_profit_fee_air 和 gross_profit_fee_sea sql = f""" select search_term, gross_profit_fee_air, gross_profit_fee_sea from dwd_st_volume_fba where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_st_volume_fba = self.spark.sql(sqlQuery=sql) self.df_st_volume_fba = self.df_st_volume_fba.repartition(80, 'search_term').cache() print("self.df_st_volume_fba:") self.df_st_volume_fba.show(10, truncate=True) # 获取影视标签dim_asin_label 取 asin_label_type sql = f""" select asin, asin_label_type from dim_asin_label where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_asin_label = self.spark.sql(sqlQuery=sql) self.df_asin_label = self.df_asin_label.repartition(80, 'asin').cache() print("self.df_asin_label:") self.df_asin_label.show(10, truncate=True) # 获取品牌词库 sql = f""" select search_term, st_brand_label from dws_st_brand_info where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' and st_brand_label = 1 """ self.df_st_brand = self.spark.sql(sqlQuery=sql) self.df_st_brand = self.df_st_brand.repartition(80, 'search_term').cache() print("self.df_st_brand:") self.df_st_brand.show(10, truncate=True) # 从pgsql获取特殊字符匹配字典表:match_character_dict pg_sql = f""" select character_name from match_character_dict where match_type = '特殊字符' """ conn_info = DBUtil.get_connection_info("mysql", "us") chart_dict_df = SparkUtil.read_jdbc_query( session=self.spark, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=pg_sql ) # 将数据转换成pandas df dict_df = chart_dict_df.toPandas() # 提取特殊字符list self.sp_symbols = dict_df["character_name"].values.tolist() # 隐藏分类 # Apps & Games、Audible Books & Originals、Books、CDs & Vinyl、Digital Music、Kindle Store、Movies & TV、Software sql = f""" select category_id as st_bsr_cate_1_id_new, 1 as is_hidden_cate from dim_bsr_category_tree where site_name = '{self.site_name}' and en_name in ("Apps & Games", "Audible Books & Originals", "Books", "CDs & Vinyl", "Digital Music", "Kindle Store", "Movies & TV", "Software") and category_parent_id = 0; """ self.df_is_hidden_cate = self.spark.sql(sqlQuery=sql) self.df_is_hidden_cate = self.df_is_hidden_cate.repartition(80).cache() print("self.df_is_hidden_cate:") self.df_is_hidden_cate.show(10, truncate=True) def handle_data(self): # 对基础计算表进行关联 self.handle_base_join() # 对st_asin按st进行指标聚合 self.handle_st_agg() # 对品牌和卖家按st进行指标聚合 self.handle_brand_seller_agg() # 计算最终指标 self.handle_st_cal() # 语种处理 self.handle_calc_lang() # 处理输出字段 self.handle_column() def handle_base_join(self): self.df_st_asin_join = self.df_st_asin_measure.join( self.df_asin_measure, on=['asin'], how='left' ).join( self.df_asin_detail, on=['asin'], how='left' ).join( self.df_asin_label, on=['asin'], how='left' ).cache() self.df_st_asin_measure.unpersist() self.df_asin_measure.unpersist() self.df_asin_detail.unpersist() self.df_asin_label.unpersist() self.df_st_asin_cal = self.df_st_asin_join.join( self.df_seller_asin_country, on=['asin'], how='left' ) # 计算品牌相关指标df self.df_st_brand_cal = self.df_st_asin_join # 计算卖家相关指标df self.df_st_seller_cal = self.df_st_asin_join.join( self.df_seller_asin_info, on=['asin'], how='left' ) def handle_st_agg(self): self.df_st_asin_cal = self.df_st_asin_cal.withColumn( # 打上是否中国卖家标签 "asin_is_cn", self.u_contains(F.lit('CN'), F.col("country_name")) ).withColumn( # 新品bsr销量 "asin_is_new_bsr_orders", F.when(F.col("asin_is_new") == 1, F.col("asin_bsr_orders")) ).withColumn( # 新品zr销量 "asin_is_new_zr_orders", F.when(F.col("asin_is_new") == 1, F.col("asin_zr_orders")) ).withColumn( # 标记是否有颜色标签 "asin_is_color_flag", self.u_judge_color(F.col("asin_color")) ).withColumn( # 标记标题中是否出现多色关键词 "asin_is_multi_color", self.u_judge_title_color(F.col("asin_title")) ).na.fill({ # 判断是否多尺寸时,先对多尺寸判断字段进行空值处理 "asin_size": "None", "asin_style": "None" }).withColumn( # 判断是否多尺寸 "asin_is_multi_size", self.u_judge_multi_size(F.col("asin_size"), F.col("asin_style")) ).withColumn( # 增加判断是否是影视产品标签 "asin_is_movie_flag", F.when(F.col("asin_label_type") == 1, F.lit(1)) ) # group by 按search_term 聚合 self.df_st_asin_cal = self.df_st_asin_cal.groupby(['search_term']).agg( F.count("asin").alias("asin_count"), F.sum("asin_is_new").alias("asin_is_new_total"), F.sum("asin_is_aadd").alias("asin_aadd_count"), F.sum("asin_is_video").alias("asin_video_count"), F.sum("asin_is_fbm").alias("asin_fbm_count"), F.sum("asin_is_amazon").alias("asin_amazon_count"), F.sum("asin_is_cn").alias("asin_cn_count"), F.sum("asin_is_new_bsr_orders").alias("new_asin_bsr_orders"), F.sum("asin_is_new_zr_orders").alias("new_asin_orders"), F.sum("asin_is_color_flag").alias("asin_color_count"), F.sum("asin_is_multi_color").alias("asin_multi_color_count"), F.sum("asin_is_multi_size").alias("asin_multi_size_count"), F.sum("asin_is_movie_flag").alias("asin_movie_type_count"), F.sum("asin_amazon_orders").alias("amazon_monthly_sales"), F.avg("asin_title_len").alias("title_length_avg"), F.avg("asin_rating").alias("rating_avg"), F.avg("asin_total_comments").alias("total_comments_avg") ).repartition(80, 'search_term').cache() def handle_brand_seller_agg(self): # 计算品牌top3销量和总销量 self.df_st_brand_cal = self.df_st_brand_cal.filter("asin_brand_name is not null") self.df_st_brand_cal = self.df_st_brand_cal.filter("asin_brand_name not in('null','None')") self.df_st_brand_cal = self.df_st_brand_cal.groupby(['search_term', 'asin_brand_name']).agg( F.sum("asin_bsr_orders").alias("asin_brand_bsr_orders_total"), F.sum("asin_zr_orders").alias("asin_brand_zr_orders_total") ) self.df_top3_st_brand_cal = self.df_st_brand_cal # top3品牌bsr销量 brand_bsr_window = Window.partitionBy(["search_term"]).orderBy( self.df_top3_st_brand_cal.asin_brand_bsr_orders_total.desc_nulls_last() ) df_st_brand_top3_bsr_orders = self.df_top3_st_brand_cal.withColumn( "brand_rank", F.row_number().over(window=brand_bsr_window) ) df_st_brand_top3_bsr_orders = df_st_brand_top3_bsr_orders.filter("brand_rank<=3") df_st_brand_top3_bsr_orders = df_st_brand_top3_bsr_orders.groupby(["search_term"]).agg( F.sum("asin_brand_bsr_orders_total").alias("top3_brand_bsr_orders") ) # top3品牌zr销量 brand_zr_window = Window.partitionBy(["search_term"]).orderBy( self.df_top3_st_brand_cal.asin_brand_zr_orders_total.desc_nulls_last() ) df_st_brand_top3_zr_orders = self.df_top3_st_brand_cal.withColumn( "brand_rank", F.row_number().over(window=brand_zr_window) ) df_st_brand_top3_zr_orders = df_st_brand_top3_zr_orders.filter("brand_rank<=3") df_st_brand_top3_zr_orders = df_st_brand_top3_zr_orders.groupby(["search_term"]).agg( F.sum("asin_brand_zr_orders_total").alias("top3_brand_orders") ) # 品牌总销量 self.df_st_brand_cal = self.df_st_brand_cal.groupby(['search_term']).agg( F.count_distinct("asin_brand_name").alias("page3_brand_num") ).repartition(80, 'search_term') # 聚合得到st_brand self.df_st_brand_cal = self.df_st_brand_cal.join( df_st_brand_top3_bsr_orders, on=['search_term'], how='left' ).join( df_st_brand_top3_zr_orders, on=['search_term'], how='left' ) self.df_st_brand_cal = self.df_st_brand_cal.select( "search_term", "page3_brand_num", "top3_brand_bsr_orders", "top3_brand_orders" ).cache() # 计算卖家top3销量和总销量 self.df_st_seller_cal = self.df_st_seller_cal.filter("account_id is not null") self.df_st_seller_cal = self.df_st_seller_cal.groupby(['search_term', 'account_id']).agg( F.sum("asin_bsr_orders").alias("asin_seller_bsr_orders_total"), F.sum("asin_zr_orders").alias("asin_seller_zr_orders_total") ) self.df_top3_st_seller_cal = self.df_st_seller_cal # 计算top3卖家bsr销量 seller_bsr_window = Window.partitionBy(["search_term"]).orderBy( self.df_top3_st_seller_cal.asin_seller_bsr_orders_total.desc_nulls_last() ) df_st_seller_top3_bsr_orders = self.df_top3_st_seller_cal.withColumn( "seller_rank", F.row_number().over(window=seller_bsr_window) ) df_st_seller_top3_bsr_orders = df_st_seller_top3_bsr_orders.filter("seller_rank<=3") df_st_seller_top3_bsr_orders = df_st_seller_top3_bsr_orders.groupby(["search_term"]).agg( F.sum("asin_seller_bsr_orders_total").alias("top3_seller_bsr_orders") ) # 计算top3卖家的zr销量 seller_zr_window = Window.partitionBy(["search_term"]).orderBy( self.df_st_seller_cal.asin_seller_zr_orders_total.desc_nulls_last() ) df_st_seller_top3_zr_orders = self.df_st_seller_cal.withColumn( "seller_rank", F.row_number().over(window=seller_zr_window) ) df_st_seller_top3_zr_orders = df_st_seller_top3_zr_orders.filter("seller_rank<=3") df_st_seller_top3_zr_orders = df_st_seller_top3_zr_orders.groupby(["search_term"]).agg( F.sum("asin_seller_zr_orders_total").alias("top3_seller_orders") ) # 卖家总数量 self.df_st_seller_cal = self.df_st_seller_cal.groupby(['search_term']).agg( F.countDistinct("account_id").alias("page3_seller_num") ).repartition(80, 'search_term') # 聚合得到st_seller self.df_st_seller_cal = self.df_st_seller_cal.join( df_st_seller_top3_bsr_orders, on=['search_term'], how='left' ).join( df_st_seller_top3_zr_orders, on=['search_term'], how='left' ) self.df_st_seller_cal = self.df_st_seller_cal.select( "search_term", "page3_seller_num", "top3_seller_bsr_orders", "top3_seller_orders" ).cache() # 计算最终指标 def handle_st_cal(self): # 将st,st_asin,st_seller,st_brand按search_term聚合 df_st_agg = self.df_st_measure.join( self.df_st_asin_cal, on=['search_term'], how='left' ).join( self.df_st_brand_cal, on=['search_term'], how='left' ).join( self.df_st_seller_cal, on=['search_term'], how='left' ) self.df_st_measure.unpersist() self.df_st_asin_cal.unpersist() self.df_st_brand_cal.unpersist() self.df_st_seller_cal.unpersist() # 求值 df_st_agg = df_st_agg.withColumn( # 新品产品数量/前三页产品总数 "new_asin_proportion", F.round(F.col("asin_is_new_total") / F.col("asin_count"), 3) ).withColumn( # 当日A+商品占比 "aadd_proportion", F.round(F.col("asin_aadd_count") / F.col("asin_count"), 3) ).withColumn( # 当日视频商品占比 "sp_proportion", F.round(F.col("asin_video_count") / F.col("asin_count"), 3) ).withColumn( # 当日FBM商品占比 "fbm_proportion", F.round(F.col("asin_fbm_count") / F.col("asin_count"), 3) ).withColumn( # 中国卖家占比 "cn_proportion", F.round(F.col("asin_cn_count") / F.col("asin_count"), 3) ).withColumn( # Amazon自营占比 "amzon_proportion", F.round(F.col("asin_amazon_count") / F.col("asin_count"), 3) ).withColumn( # 多颜色占比 = 关键字有颜色的asin数/关键字的asin数 "color_proportion", F.round(F.col("asin_color_count") / F.col("asin_count"), 3) ).withColumn( # 多色比例 = 关键词前三页产品标题中出现colorful/assorted color/multi color的词产品个数/前三页产品数量 "multi_color_proportion", F.round(F.col("asin_multi_color_count") / F.col("asin_count"), 3) ).withColumn( # 多尺寸占比 "multi_size_proportion", F.round(F.col("asin_multi_size_count") / F.col("asin_count"), 3) ).withColumnRenamed( # 新品总数 "asin_is_new_total", "new_asin_num" ).withColumnRenamed( # 产品总数 "asin_count", "total_asin_num" ).withColumnRenamed( # 总bsr销量 "st_bsr_orders", "bsr_orders" ).withColumnRenamed( # 总预估销量 "st_zr_orders", "orders" ).withColumn( # 销量占比 新品销量占比 "new_bsr_orders_proportion", F.round(F.col("new_asin_bsr_orders") / F.col("bsr_orders"), 3) ).withColumn( # 品牌垄断系数 "brand_monopoly", F.ceil((F.col("top3_brand_bsr_orders") / F.col("bsr_orders")) * 1000) / 1000 ).withColumn( # 卖家垄断系数 "seller_monopoly", F.ceil((F.col("top3_seller_bsr_orders") / F.col("bsr_orders")) * 1000) / 1000 ).withColumn( # ABA搜索词拆分的单词个数 "st_word_num", self.st_word_count(self.sp_symbols)(F.col("search_term")) ).withColumn( # aba搜索词影视比例 "movie_prop", F.round((F.col("asin_movie_type_count") / F.col("total_asin_num"))*100, 2) ).withColumn( # 影视标记类型 0:非影视; 1:0< 比例 <= 20%; 2: 20% < 比例 <= 50%; 3:50% < 比例 "st_movie_label", F.when( (F.col("movie_prop") > 0) & (F.col("movie_prop") <= 20), F.lit(1) ).when( (F.col("movie_prop") > 20) & (F.col("movie_prop") <= 50), F.lit(2) ).when( (F.col("movie_prop") > 50), F.lit(3) ).otherwise(F.lit(0)) ) self.df_save = df_st_agg.join( self.df_st_detail, on=['search_term'], how='inner' ).join( self.df_st_key, on=['search_term'], how='inner' ).join( self.df_st_num_stats, on=['search_term'], how='left' ).join( self.df_st_market, on=['search_term'], how='left' ).join( self.df_st_volume_fba, on=['search_term'], how='left' ).join( self.df_st_brand, on=['search_term'], how='left' ).join( self.df_is_hidden_cate, on=['st_bsr_cate_1_id_new'], how='left' ) self.df_st_detail.unpersist() self.df_st_key.unpersist() self.df_st_num_stats.unpersist() self.df_st_market.unpersist() self.df_st_volume_fba.unpersist() self.df_st_brand.unpersist() self.df_is_hidden_cate.unpersist() # 语种处理 def handle_calc_lang(self): sql = """ select word, langs from big_data_selection.tmp_lang_word_frequency; """ lang_word_list = self.spark.sql(sql).collect() # 转为map lang_word_map = {row['word']: row['langs'] for row in lang_word_list} self.df_save = self.df_save.withColumn( "lang", F.coalesce(udf_detect_phrase_reg(lang_word_map)(F.col("search_term")).getField("lang"), F.lit("other")) ) def handle_column(self): # 入库前字段处理 self.df_save = self.df_save.select( "id", "search_term", "rank", "category_id", "orders", "bsr_orders", "search_volume", "quantity_being_sold", F.round("st_ao_avg", 3).alias("st_ao_avg"), "st_ao_val_rate", "new_bsr_orders_proportion", "new_asin_proportion", F.round("page1_title_proportion", 3).alias("page1_title_proportion"), F.round("price_avg", 3).alias("price_avg"), F.round("total_comments_avg", 0).alias("total_comments_avg"), F.round("rating_avg", 3).alias("rating_avg"), F.round("weight_avg", 3).alias("weight_avg"), F.round("volume_avg", 3).alias("volume_avg"), F.round("title_length_avg", 0).alias("title_length_avg"), "st_num", "aadd_proportion", "sp_proportion", "fbm_proportion", "cn_proportion", "amzon_proportion", "most_proportion", "max_num", "asin1", "asin2", "asin3", F.round("click_share1", 3).alias("click_share1"), F.round("click_share2", 3).alias("click_share2"), F.round("click_share3", 3).alias("click_share3"), F.round("total_click_share", 3).alias("total_click_share"), F.round("conversion_share1", 3).alias("conversion_share1"), F.round("conversion_share2", 3).alias("conversion_share2"), F.round("conversion_share3", 3).alias("conversion_share3"), F.round("total_conversion_share", 3).alias("total_conversion_share"), "new_asin_num", "total_asin_num", "new_asin_orders", "new_asin_bsr_orders", "is_first_text", "is_ascending_text", "is_search_text", "top3_seller_orders", "top3_seller_bsr_orders", "top3_brand_orders", "top3_brand_bsr_orders", "page3_brand_num", "page3_seller_num", "brand_monopoly", "seller_monopoly", "max_num_asin", "is_self_max_num_asin", "is_new_market_segment", F.when(F.col('category_current_id').isNull(), F.col('category_id')) .otherwise(F.col('category_current_id')).alias('category_current_id'), "supply_demand", "market_cycle_type", "color_proportion", "gross_profit_fee_air", "gross_profit_fee_sea", "multi_color_proportion", "multi_size_proportion", "st_4_20_ao_avg", "st_4_20_ao_rate", "asin_aadd_count", "asin_video_count", "asin_fbm_count", "asin_cn_count", "asin_amazon_count", "asin_color_count", "asin_multi_color_count", "asin_multi_size_count", "st_word_num", "st_movie_label", "st_brand_label", "st_brand1", "st_category1", "st_brand2", "st_category2", "st_brand3", "st_category3", "st_bsr_cate_1_id_new", "st_bsr_cate_current_id_new", "st_crawl_date", "is_high_return_text", F.round("st_zr_page123_title_appear_rate", 3).alias("st_zr_page123_title_appear_rate"), F.round("st_sp_page123_title_appear_rate", 3).alias("st_sp_page123_title_appear_rate"), "st_competition_level", "amazon_monthly_sales", "st_zr_flow_proportion", "st_ao_val_matrix", "st_flow_proportion_matrix", "st_zr_counts", "st_sp_counts", "st_self_asin_counts", "st_self_asin_proportion", "lang", "asin_movie_type_count", "is_hidden_cate" ) # 空值处理 self.df_save = self.df_save.na.fill({ "is_first_text": 0, "is_ascending_text": 0, "is_search_text": 0, "st_movie_label": 0, "st_brand_label": 0, "is_self_max_num_asin": 0, "market_cycle_type": 0, "is_new_market_segment": 0, "is_high_return_text": 0, "amazon_monthly_sales": 0, "is_hidden_cate": 0 }) # 日期字段补全 self.df_save = self.df_save.withColumn( "created_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS') ).withColumn( "updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS') ) # 预留字段补全 self.df_save = self.df_save.withColumn( "re_string_field1", F.lit(None) ) # 用户标记字段(已废弃,济苍直接填充) self.df_save = self.df_save.withColumn( "usr_mask_type", F.lit(None) ).withColumn( "usr_mask_progress", F.lit(None) ) # 分区字段补全 self.df_save = self.df_save.withColumn( "site_name", F.lit(self.site_name) ).withColumn( "date_type", F.lit(self.date_type) ).withColumn( "date_info", F.lit(self.date_info) ) if __name__ == '__main__': start_time = time.time() site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwtAbaStAnalytics(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run() end_time = time.time() consumer_time = end_time - start_time print("aba数据存储完毕, 执行时长为:" + str(consumer_time))