""" # author: 方星钧(ffman) # description: 基于ods_bs_category,ods_bs_category_asin_detail表,计算出dim_asin_bs_category维度的asin的bs分类情况 # table_read_name: ods_bs_category,ods_bs_category_asin_detail # table_save_name: dim_asin_bs_category # table_save_level: dim # version: 1.0 # created_date: 2022-11-03 # updated_date: 2022-11-03 """ import os import sys import re sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql import functions as F from pyspark.sql.types import StringType from pyspark.sql.window import Window class DimBsAsinInfo(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-1'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info # 初始化self.spark对 self.db_save = 'dim_asin_bs_category' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.df_save = self.spark.sql("select 1+1;") self.df_save_list = [] self.df_cate = self.spark.sql("select 1+1;") self.df_cate_detail = self.spark.sql("select 1+1;") self.df_cate_asin_detail = self.spark.sql("select 1+1;") self.pattern_1 = '' # 一级分类正则匹配 self.pattern_current = '' # 当前分类正则匹配 self.node_1_list = [] # 一级分类名称对应的列表 self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(partitions_num=20) self.u_cate_1_en_name = self.spark.udf.register("u_cate_1_en_name", self.udf_cate_1_en_name, StringType()) self.u_cate_current_en_name = self.spark.udf.register("u_cate_current_en_name", self.udf_cate_current_en_name, StringType()) @staticmethod def udf_cate_1_en_name(best_sellers_rank, site_name, cate_1_pattern): if site_name in ['us', 'uk']: pattern_1 = f"See Top 100 in ({cate_1_pattern})" elif site_name in ['de']: pattern_1 = f"Siehe Top 100 in ({cate_1_pattern})" elif site_name in ['es']: pattern_1 = f"Ver el Top 100 en ({cate_1_pattern})" elif site_name in ['fr']: pattern_1 = f"Voir les 100 premiers en ({cate_1_pattern})" elif site_name in ['it']: pattern_1 = f"Visualizza i Top 100 nella categoria ({cate_1_pattern})" else: pattern_1 = '' result_pattern = re.findall(pattern_1, best_sellers_rank)[-1] if re.findall(pattern_1, best_sellers_rank) else '' if result_pattern == '': result_pattern = re.findall(cate_1_pattern, best_sellers_rank)[-1] if re.findall(cate_1_pattern, best_sellers_rank) else '' return result_pattern @staticmethod def udf_cate_current_en_name(best_sellers_rank, site_name): if site_name in ['us', 'uk', 'de', 'it']: pattern_current = f"(\d+ in )" elif site_name in ['es', 'fr']: # pattern_current = f"\d+[.,]{0,}\d{0,} en " pattern_current = f"(\d+ en )" else: pattern_current = '' result_pattern = re.findall(pattern_current, best_sellers_rank)[-1] if re.findall(pattern_current, best_sellers_rank) else '' r_id = best_sellers_rank.rfind(result_pattern) r_id_2 = best_sellers_rank.rfind(" (") if r_id != -1: if r_id_2 != -1 and r_id_2 > r_id: return best_sellers_rank[(r_id + len(result_pattern)):r_id_2] else: if ")" in best_sellers_rank[(r_id + len(result_pattern)):]: return '' else: return best_sellers_rank[r_id + len(result_pattern):] else: return '' def read_data(self): sql = f"select id as bs_id, en_name, nodes_num, one_category_id, and_en_name from ods_bs_category where site_name='{self.site_name}';" self.df_cate = self.spark.sql(sqlQuery=sql).cache() self.df_cate.show(10, truncate=False) # sql = f"select * from ods_{self.site_name}_bs_category_detail;" # self.df_cate_detail = self.spark.sql(sqlQuery=sql) sql = f"select asin, best_sellers_rank, site_name from ods_bs_category_asin_detail " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';" print("sql:", sql) self.df_cate_asin_detail = self.spark.sql(sqlQuery=sql).cache() self.df_cate_asin_detail.show(10, truncate=False) def handle_data(self): self.handle_cate_1() self.handle_cate_current() self.handle_cate_by_en_name() self.df_save = self.df_cate_asin_detail self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) # self.df_save.show(10, truncate=False) def handle_cate_1(self): """ 匹配一级分类 """ # 1. 通过see top 100 in 进行匹配一级分类 df_cate_1 = self.df_cate.filter("nodes_num==2").toPandas() pattern1_list = df_cate_1.en_name.to_numpy() # pattern_list = [f"{cate}" for cate in list(df_cate_1.en_name.to_numpy())] pattern = "|".join(pattern1_list) self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn( "bs_cate_1_pattern", F.lit(pattern) ) self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn( "bs_cate_1_en_name", self.u_cate_1_en_name("best_sellers_rank", "site_name", "bs_cate_1_pattern") ) self.df_cate_asin_detail = self.df_cate_asin_detail.drop("bs_cate_1_pattern") # self.df_cate_asin_detail.show(10) print(self.df_cate_asin_detail.filter("bs_cate_1_en_name =''").count(), self.df_cate_asin_detail.count()) def handle_cate_current(self): """ 当前分类:进行正则匹配的规则 """ # df_cate_asin['rank_cuurent'] = df_cate_asin.best_sellers_rank.apply( # lambda x: re.findall('\d+[.,]{0,}\d{0,} in ', x)[-1] if re.findall('\d+[.,]{0,}\d{0,} in ', x) else np.nan) self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn( "bs_cate_current_en_name", self.u_cate_current_en_name('best_sellers_rank', 'site_name') ) # self.df_cate_asin_detail.show(10) print(self.df_cate_asin_detail.filter("bs_cate_current_en_name =''").count(), self.df_cate_asin_detail.count()) def handle_cate_by_en_name(self): print("1. 通过en_name匹配bs_cate_1_id和bs_cate_current_id") df_cate_1 = self.df_cate.filter("nodes_num=2").select("bs_id", "en_name") df_cate_1 = df_cate_1.withColumnRenamed("en_name", "bs_cate_1_en_name").withColumnRenamed("bs_id", "bs_cate_1_id") df_cate_current = self.df_cate.filter("nodes_num>2").select("bs_id", "en_name") df_cate_current = df_cate_current.withColumnRenamed("en_name", "bs_cate_current_en_name").withColumnRenamed( "bs_id", "bs_cate_current_id") self.df_cate_asin_detail = self.df_cate_asin_detail.join( df_cate_1, on=['bs_cate_1_en_name'], how='left' ) # print("self.df_cate_asin_detail1:", self.df_cate_asin_detail.count()) self.df_cate_asin_detail = self.df_cate_asin_detail.join( df_cate_current, on=['bs_cate_current_en_name'], how='left' ) print("2. 当bs_cate_current_id是null的时候, 通过当前分类的别名去匹配bs_cate_current_id") df_current_id_not_null = self.df_cate_asin_detail.filter("bs_cate_current_id is not null") df_current_id_null = self.df_cate_asin_detail.filter("bs_cate_current_id is null").drop("bs_cate_current_id") df_cate_and_en_name = self.df_cate.filter("nodes_num>2").select("and_en_name", "bs_id") \ .withColumnRenamed("and_en_name", "bs_cate_current_en_name") \ .withColumnRenamed("bs_id", "bs_cate_current_id") df_current_id_null = df_current_id_null.join( df_cate_and_en_name, on="bs_cate_current_en_name", how='left' ) self.df_cate_asin_detail = df_current_id_not_null.unionByName( df_current_id_null, allowMissingColumns=True ) # 3. 匹配1级分类为null的情况 print("3.1 根据asin, bs_cate_1_id去重, null值在后") # print("3.2 当bs_cate_1_id为null时, 通过bs_cate_current_id匹配bs_cate_1_id") # 没有一级分类,就不进行匹配 window = Window.partitionBy(['asin', 'bs_cate_1_id']).orderBy( self.df_cate_asin_detail.bs_cate_current_id.asc_nulls_last(), ) self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn( "asin_top", F.row_number().over(window=window) ) self.df_cate_asin_detail = self.df_cate_asin_detail.filter("asin_top=1") self.df_cate_asin_detail = self.df_cate_asin_detail.drop("asin_top") # 暂时去掉通过当前分类寻找一级分类 # df_1_id_not_null = self.df_cate_asin_detail.filter("bs_cate_1_id is not null") # df_1_id_null = self.df_cate_asin_detail.filter("bs_cate_1_id is null").drop("bs_cate_1_id") # # print("df_1_id_null:", df_1_id_null.show(10)) # df_cate_1_id = self.df_cate.filter("nodes_num>2").select("bs_id", "one_category_id") \ # .withColumnRenamed("bs_id", "bs_cate_current_id") \ # .withColumnRenamed("one_category_id", "bs_cate_1_id") # df_1_id_null = df_1_id_null.join( # df_cate_1_id, on="bs_cate_current_id", how='left' # ) # self.df_cate_asin_detail = df_1_id_not_null.unionByName( # df_1_id_null, allowMissingColumns=True # ) # # print("df_1_id_not_null, df_1_id_null:", df_1_id_not_null.count(), df_1_id_null.count()) # # print("df_1_id_null:", df_1_id_null.show(10)) # # print("self.df_cate_asin_detail:", self.df_cate_asin_detail.count(), self.df_cate_asin_detail.show(10)) print("4. 重命名字段名称, 和hive表的字段名称保持一致") self.df_cate_asin_detail = self.df_cate_asin_detail. \ withColumnRenamed("best_sellers_rank", "asin_bs_sellers_rank"). \ withColumnRenamed("bs_cate_1_id", "asin_bs_cate_1_id"). \ withColumnRenamed("bs_cate_current_id", "asin_bs_cate_current_id"). \ withColumnRenamed("bs_cate_1_en_name", "asin_bs_cate_1_en_name"). \ withColumnRenamed("bs_cate_current_en_name", "asin_bs_cate_current_en_name") if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DimBsAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()