import os import sys import re sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql import functions as F from pyspark.sql.window import Window from pyspark.sql.types import StructType, StructField, IntegerType, StringType # 导入udf公共方法 from yswg_utils.common_udf import udf_parse_bs_category # from ..yswg_utils.common_udf import udf_parse_bs_category class DimBsAsinInfo(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-1'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info # 初始化self.spark对 self.db_save = 'dim_asin_bs_info' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.df_save = self.spark.sql("select 1+1;") self.df_asin_node_id = self.spark.sql("select 1+1;") self.df_bs_asin_detail = self.spark.sql("select 1+1;") self.df_bs_category = self.spark.sql("select 1+1;") # 定义 UDF 的返回类型,即一个包含三个 DoubleType 字段的 StructType schema = StructType([ StructField('asin_bs_cate_1_id', StringType(), True), StructField('asin_bs_cate_current_id', StringType(), True), StructField('asin_bs_cate_1_rank', IntegerType(), True), StructField('asin_bs_cate_current_rank', IntegerType(), True), ]) # self.u_parse_bs_category = F.udf(self.udf_parse_bs_category, schema) self.u_parse_bs_category = F.udf(udf_parse_bs_category, schema) # self.pattern1_dict = { # "us": "(\d+).*?See Top 100 in ".lower(), # "uk": "(\d+).*?See Top 100 in ".lower(), # "de": "(\d+).*?Siehe Top 100 in ".lower(), # "es": "(\d+).*?Ver el Top 100 en ".lower(), # "fr": "(\d+).*?Voir les 100 premiers en ".lower(), # "it": "(\d+).*?Visualizza i Top 100 nella categoria ".lower(), # } self.pattern1_dict = { "us": "See Top 100 in ".lower(), "uk": "See Top 100 in ".lower(), "de": "Siehe Top 100 in ".lower(), "es": "Ver el Top 100 en ".lower(), "fr": "Voir les 100 premiers en ".lower(), "it": "Visualizza i Top 100 nella categoria ".lower(), } self.pattern_current_dict = { "us": "#(\d+) ", "uk": "(\d+) in ", "de": "(\d+) in ", "es": "(\d+) en ", "fr": "(\d+) en ", "it": "(\d+) in ", } self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(partitions_num=20) self.get_year_week_tuple() @staticmethod def udf_parse_bs_category(asin_bs_sellers_rank_lower, last_herf, all_best_sellers_href, cate_current_pattern, cate_1_pattern): # if (site_name == 'us' and date_type in ['month', 'month_week'] and date_info >= '2023-11') or (site_name != 'us' and date_type in ['week'] and date_info >= '2023-41'): # href_list = all_best_sellers_href.split("&&&&") # 1. 判断用哪个字段来解析分类 if str(all_best_sellers_href).lower() not in ['', 'none', 'null']: bs_href = all_best_sellers_href elif str(last_herf).lower() not in ['', 'none', 'null']: bs_href = last_herf else: bs_href = '' href_list = bs_href.replace("?tf=1", "").split("&&&&") # 2. 解析一级和当前 分类 + 排名 # 2.1 提取分类 if href_list: if len(href_list) == 1: cate_list = re.findall('bestsellers/(.*)/ref', href_list[0]) if cate_list: if "/" in cate_list[0]: cate_1_id, cate_current_id = cate_list[0].split("/")[0], cate_list[0].split("/")[-1] else: cate_1_id, cate_current_id = cate_list[0].split("/")[0], None else: cate_1_id, cate_current_id = None, None else: cate_1_id = re.findall('bestsellers/(.*)/ref', href_list[0])[0] if re.findall('bestsellers/(.*)/ref', href_list[0]) else None cate_current_id = re.findall('bestsellers/(.*)/ref', href_list[-1])[0] if re.findall('bestsellers/(.*)/ref', href_list[-1]) else None if "/" in cate_1_id: cate_1_id = cate_1_id.split("/")[0] if "/" in cate_current_id: cate_current_id = cate_current_id.split("/")[-1] else: cate_1_id, cate_current_id = None, None # 2.2 提取排名 asin_bs_sellers_rank_lower2 = asin_bs_sellers_rank_lower.replace(",", "").replace(" 100 ", "") rank_list = re.findall(cate_current_pattern, asin_bs_sellers_rank_lower2) # 匹配排名 rank_list = [int(rank) for rank in rank_list] # 转换成int类型 if rank_list: if len(rank_list) == 1: if cate_1_pattern in asin_bs_sellers_rank_lower: cate_1_rank, cate_current_rank = rank_list[0], None else: cate_1_rank, cate_current_rank = None, rank_list[0] else: if cate_1_pattern in asin_bs_sellers_rank_lower: cate_1_rank, cate_current_rank = rank_list[0], rank_list[-1] else: cate_1_rank, cate_current_rank = None, rank_list[0] else: cate_1_rank, cate_current_rank = None, None return cate_1_id, cate_current_id, cate_1_rank, cate_current_rank def read_data(self): sql = f"select asin, category_id as asin_bs_cate_current_id_node, category_first_id as asin_bs_cate_1_id_node from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info ='{self.date_info}';" # and date_info>='2023-15' print(f"1. 读取dim_asin_detail表node_id数据: sql -- {sql}") self.df_asin_node_id = self.spark.sql(sqlQuery=sql).cache() self.df_asin_node_id.show(10, truncate=False) # 2. 读取ods_bs_category_asin_detail对应周期的详情表 params = f" date_info <= '2022-42'" if max(self.year_week_tuple) <= '2022-42' and date_type == 'month' else f" date_info in {self.year_week_tuple}" sql = f"select asin, best_sellers_rank as asin_bs_sellers_rank, last_herf, all_best_sellers_href, date_type, date_info, created_at from ods_bs_category_asin_detail " \ f"where site_name='{self.site_name}' and date_type='week' and {params};" if date_type in ['month', 'month_week'] and ((self.site_name == 'us' and date_info >= '2023-10') or (self.site_name in ['uk', 'de'] and self.date_info >= '2024-05')): sql = f"select asin, best_sellers_rank as asin_bs_sellers_rank, last_herf, all_best_sellers_href, date_type, date_info, created_at from ods_bs_category_asin_detail " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';" print(f"2. 读取ods_bs_category_asin_detail对应周期的详情表: sql -- {sql}") self.df_bs_asin_detail = self.spark.sql(sqlQuery=sql).cache() # self.df_bs_asin_detail = self.df_bs_asin_detail.drop_duplicates(['asin', 'date_info']) self.df_bs_asin_detail.show(10, truncate=False) def handle_df_asin_node_id(self): # 保留asin最新的node_id # self.df_asin_node_id = self.df_asin_node_id.filter("asin_bs_cate_current_id is not null") # window = Window.partitionBy(['asin']).orderBy( # self.df_asin_node_id.date_info.desc() # ) # self.df_asin_node_id = self.df_asin_node_id.withColumn( # "row_number", F.row_number().over(window=window) # ) # self.df_asin_node_id = self.df_asin_node_id.filter("row_number=1") # self.df_asin_node_id = self.df_asin_node_id.drop("row_number") pass # dim_asin_detail已经去重过 def handle_df_bs_asin_detail(self): # 保留asin最新的asin_bs_sellers_rank_lower window = Window.partitionBy(['asin']).orderBy( self.df_bs_asin_detail.created_at.desc() ) self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn( "row_number", F.row_number().over(window=window) ) self.df_bs_asin_detail = self.df_bs_asin_detail.filter("row_number=1") self.df_bs_asin_detail = self.df_bs_asin_detail.drop("row_number", "date_info") # 小写 self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn("asin_bs_sellers_rank_lower", F.lower("asin_bs_sellers_rank")) # self.df_bs_asin_detail.show(10, truncate=False) # 提取分类字符串中的asin_bs_cate_1_rank, asin_bs_cate_current_rank # 生成当前分类匹配规则 cate_current_pattern = self.pattern_current_dict[self.site_name] cate_1_pattern = self.pattern1_dict[self.site_name] self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn( 'asin_bs_cate_ranks', self.u_parse_bs_category('asin_bs_sellers_rank_lower', 'last_herf', 'all_best_sellers_href', F.lit(cate_current_pattern), F.lit(cate_1_pattern)) ) # self.df_bs_asin_detail.show(10, truncate=False) self.df_bs_asin_detail = self.df_bs_asin_detail \ .withColumn('asin_bs_cate_1_id', self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_1_id')) \ .withColumn('asin_bs_cate_current_id', self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_current_id')) \ .withColumn('asin_bs_cate_1_rank', self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_1_rank')) \ .withColumn('asin_bs_cate_current_rank', self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_current_rank')) \ .drop('asin_bs_cate_ranks') self.df_bs_asin_detail.show(10, truncate=False) # self.df_save = self.df_asin_node_id.join( # self.df_bs_asin_detail, 'asin', how='left' # ).join( # self.df_category_desc_id, 'asin_bs_cate_current_id', how='left' # ) self.df_save = self.df_asin_node_id.join( self.df_bs_asin_detail, 'asin', how='left' ) # 用node_id的分类去补充一级分类和当前分类 self.df_save = self.df_save.withColumn( "asin_bs_cate_1_id", F.when(F.col("asin_bs_cate_1_id").isNull(), F.col("asin_bs_cate_1_id_node")).otherwise(F.col("asin_bs_cate_1_id")) ).withColumn( "asin_bs_cate_current_id", F.when(F.col("asin_bs_cate_current_id").isNull(), F.col("asin_bs_cate_current_id_node")).otherwise(F.col("asin_bs_cate_current_id")) ) self.df_save = self.df_save.drop("asin_bs_sellers_rank_lower", "asin_bs_cate_1_id_node", "asin_bs_cate_current_id_node") self.df_save.show(20) def handle_data(self): self.handle_df_asin_node_id() self.handle_df_bs_asin_detail() self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DimBsAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()