""" author: 方星钧(ffman) description: 清洗6大站点对应的 单周的zr,sp,sb,ac,bs,er,tr等7大类型数据表(计算zr,sp类型表的page_rank+合并7张表) table_read_name: ods_st_rank_zr/sp/sb/ac/bs/er/tr table_save_name: dim_st_asin_info table_save_level: dim version: 3.0 created_date: 2022-05-10 updated_date: 2022-11-07 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql.types import IntegerType class DimStAsinInfo(Templates): def __init__(self, site_name='us', date_type="day", date_info='2022-10-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dim_st_asin_info' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}") self.df_date = self.get_year_week_tuple() self.df_save = self.spark.sql(f"select search_term, asin, page, page_row, 'zr' as data_type, updated_time,site_name,date_type,date_info from ods_search_term_zr limit 0;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(partitions_num=60) self.data_type_list = ['tr', 'er', 'bs', 'ac', 'sb1', 'sb2', 'sb3', 'sp', 'zr'] # 小表拼大表 @staticmethod def udf_page_rank(page, page_1_count, page_2_count, page_row): """ 处理 zr, sp 的page_rank字段 :param page: :param page_1_count: :param page_2_count: :param page_row: :return: page_rank """ if page == 1: return page_row elif page == 2: return page_1_count + page_row else: return page_2_count + page_row def handle_data_page_rank(self, df, data_type): print(f"{data_type}--page_rank计算") u_page_rank = self.spark.udf.register('u_page_rank', self.udf_page_rank, IntegerType()) # 由于zr,sp存在重复值,改成max,而不是使用count df_page_1 = df.filter(f"page=1").groupBy(['search_term']).agg({f"page_row": "max"}) df_page_2 = df.filter(df[f'page'] == 2).groupBy(['search_term']).agg( {f"page_row": "max"}) df_page_1 = df_page_1.withColumnRenamed(f'max(page_row)', 'page_1_count') df_page_2 = df_page_2.withColumnRenamed(f'max(page_row)', 'page_2_count_old') df = df.join(df_page_1, on='search_term', how='left'). \ join(df_page_2, on='search_term', how='left') df = df.fillna(0) df = df.withColumn("page_2_count", df.page_1_count + df.page_2_count_old) df = df.withColumn(f"page_rank", u_page_rank( df[f'page'], df.page_1_count, df.page_2_count, df[f'page_row'])) # df.show(n=10, truncate=False) return df def read_data(self): for data_type in self.data_type_list: print(f"site_name: {self.site_name}, data_type: {data_type}") if data_type in ['zr', 'sp']: sql = f"select search_term, asin, page, page_row, '{data_type}' as data_type, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';" df = self.spark.sql(sqlQuery=sql) # 处理page_rank df = self.handle_data_page_rank(df=df, data_type=data_type) df = df.drop('page_1_count', 'page_2_count', 'page_2_count_old') else: if data_type in ['sb1', 'sb2', 'sb3']: sql = f"select search_term, asin, page, '{data_type}' as data_type, updated_time, site_name,date_type,date_info from ods_search_term_sb " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and data_type={int(data_type[-1])};" else: sql = f"select search_term, asin, page, '{data_type}' as data_type, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';" df = self.spark.sql(sqlQuery=sql) # print(f"site_name: {self.site_name}, data_type: {data_type}, partitions: {df.rdd.getNumPartitions()}") self.df_save = self.df_save.unionByName(df, allowMissingColumns=True) # self.df_save.show(n=10, truncate=False) # print("self.df_save.count():", self.df_save.count()) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 handle_obj = DimStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()