""" @Author : HuangJian @Description : 找到月关键词下zr类型的top10asin和ac类型asin @SourceTable : 1.ods_st_key 2.dim_st_asin_info 3.dwd_st_measure @SinkTable : dwt_st_top_asin_info @CreateTime : 2023/02/13 17:01 @UpdateTime : 2023/02/13 17:01 """ import os import sys import re from functools import reduce sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType, DoubleType from utils.common_util import CommonUtil, DateTypes class DwtStTopAsinInfo(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-1"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dwt_st_top_asin_info" self.spark = self.create_spark_object( app_name=f"{self.db_save}, {self.site_name}, {self.date_type}, {self.date_info}") self.partitions_num = 10 self.reset_partitions(partitions_num=self.partitions_num) self.partitions_by = ['site_name', 'date_type', 'date_info'] self.get_date_info_tuple() self.get_year_month_days_dict(year=int(self.year)) # 获取周流程的周tuple整合数据 self.complete_date_info_tuple = CommonUtil.transform_week_tuple(self.spark, self.date_type, self.date_info) self.df_st_key = self.spark.sql(f"select 1+1;") self.df_st_asin_zr = self.spark.sql(f"select 1+1;") self.df_st_asin_ac = self.spark.sql(f"select 1+1;") self.df_st_zr = self.spark.sql(f"select 1+1;") self.df_st_ac = self.spark.sql(f"select 1+1;") self.df_zr_info = self.spark.sql(f"select 1+1;") self.df_ac_info = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") self.date_sql = self.date_sql_padding() def date_sql_padding(self): if self.site_name == 'us': if self.date_type == DateTypes.month_week.name: date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'" elif self.date_type == DateTypes.month.name and self.date_info >= '2023-10': date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'" else: date_sql = f"and date_type='week' and date_info in {self.complete_date_info_tuple}" elif self.site_name in ['uk', 'de'] and self.date_type == DateTypes.month.name and self.date_info >= '2024-05': date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'" else: date_sql = f" and date_type='week' and date_info in {self.complete_date_info_tuple}" print(date_sql) return date_sql def read_data(self): print("1.1 读取st的key: ods_st_key表") sql = f"select search_term,cast(st_key as int) as search_term_id from ods_st_key where site_name = '{self.site_name}' " print("sql:", sql) self.df_st_key = self.spark.sql(sqlQuery=sql) print("1.3 读取关键词的zr类型数据: ods_search_term_zr") sql = f"""select search_term, asin, page_row as zr_rank, 'zr' as data_type,date_info from ods_search_term_zr where site_name='{self.site_name}' {self.date_sql} and page_row <=10 ;""" print("sql:", sql) self.df_st_asin_zr = self.spark.sql(sqlQuery=sql).cache() print("1.4 读取关键词的ac类型数据: ods_search_term_ac") sql = f"""select search_term, asin, null as zr_rank, 'ac' as data_type from ( select search_term, asin, row_number() over (partition by search_term order by date_info desc, updated_time asc) as ac_rank from ods_search_term_ac where site_name = '{self.site_name}' {self.date_sql} and page = 1) t1 where t1.ac_rank = 1;""" print("sql:", sql) self.df_st_asin_ac = self.spark.sql(sqlQuery=sql).cache() def handle_data(self): self.handle_st_asin_duplicated() self.handle_st_asin_join() # 日期字段补全 self.df_save = self.df_save.withColumn("created_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')) self.df_save = self.df_save.withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')) # 补全分区字段 self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) def handle_st_asin_duplicated(self): self.df_zr_info = self.df_st_asin_zr.select("search_term", "date_info") # 对zr类型搜索词进行去重:按搜索词取日期最大 得到最后出现该搜索词日期 window_zr = Window.partitionBy(['search_term']).orderBy( self.df_zr_info.date_info.desc() ) self.df_zr_info = self.df_zr_info \ .withColumn("zr_rank_row", F.row_number().over(window=window_zr)) self.df_zr_info = self.df_zr_info.filter("zr_rank_row=1") self.df_zr_info = self.df_zr_info.drop("zr_rank_row") # self.df_st_asin_zr = self.df_st_asin_zr.cache() # 得到最近日期zr类型的st-asin self.df_st_asin_zr = self.df_zr_info.join( self.df_st_asin_zr, on=['search_term', 'date_info'], how="inner") self.df_st_asin_zr = self.df_st_asin_zr.drop("date_info") # 对zr类型的同周期下进行去重按照st-asin-rank self.df_st_asin_zr = self.df_st_asin_zr.drop_duplicates(['search_term', 'asin', 'zr_rank']) # 对zr词进行二次排序 self.df_st_asin_zr = self.df_st_asin_zr.groupby(['search_term', 'asin', 'data_type']).agg( F.max('zr_rank').alias('max_zr_rank') ) print(self.df_st_asin_zr.columns) window_zr_rank = Window.partitionBy(['search_term']).orderBy( self.df_st_asin_zr.max_zr_rank.asc() ) self.df_st_asin_zr = self.df_st_asin_zr.withColumn('zr_rank', F.row_number().over(window=window_zr_rank)) self.df_st_asin_zr = self.df_st_asin_zr.filter("zr_rank <= 10") self.df_st_asin_zr = self.df_st_asin_zr.drop('max_zr_rank') def handle_st_asin_join(self): # 将zr类型关键词和ac类型关键词union df_st_asin_union = self.df_st_asin_zr.unionByName(self.df_st_asin_ac, allowMissingColumns=False) # st关联获取search_term_id self.df_save = df_st_asin_union.join( self.df_st_key, on=['search_term'], how="inner" ) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwtStTopAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()