""" @Author : HuangJian @Description : 时间周期内-搜索词top3的品牌库(排除公司asin统计) @SourceTable : ①dim_st_detail ②dim_st_asin_info ③dim_asin_detail ④dim_asin_measure @SinkTable : dim_st_brand_info @CreateTime : 2023/04/19 09:20 @UpdateTime : 2022/04/19 09:20 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql.window import Window from pyspark.sql import functions as F from utils.db_util import DBUtil class DimStBrandInfo(object): def __init__(self, site_name, date_type, date_info): self.site_name = site_name self.date_type = date_type self.date_info = date_info self.partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } self.hive_table = "dim_st_brand_info" # 落表路径校验 self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict=self.partition_dict) print(f"hdfs_path is {self.hdfs_path}") app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.partitions_num = CommonUtil.reset_partitions(site_name, 2) # 初始化全局df self.df_dim_st = self.spark.sql(f"select 1+1;") self.df_st_asin = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_st_brand = self.spark.sql(f"select 1+1;") self.df_asin_measure = self.spark.sql(f"select 1+1;") self.df_base_brand = self.spark.sql(f"select 1+1;") self.df_brand_black = self.spark.sql(f"select 1+1;") self.df_st_key = self.spark.sql(f"select 1+1;") def run(self): # 读取数据 self.read_data() # 逻辑处理 self.handle_data() # 数据存储 self.save_data() def read_data(self): print("======================查询sql如下======================") # 读取ods_st_key sql = f"select search_term,cast(st_key as int) as st_key from ods_st_key where site_name = '{self.site_name}' " self.df_st_key = self.spark.sql(sqlQuery=sql) print("sql:", sql) # 读取dim_st_detail,取排名100w的搜索词 sql = f"""select search_term from dim_st_detail where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' and st_rank < 1000000; """ self.df_dim_st = self.spark.sql(sqlQuery=sql) print("sql:", sql) # 读取st_asin的关系 sql = f"""select search_term, asin from dim_st_asin_info where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' group by search_term, asin """ self.df_st_asin = self.spark.sql(sqlQuery=sql) print("sql:", sql) # 读取dim_asin_detail取asin和brand_name--需排除公司内部asin sql = f""" select asin, trim(asin_brand_name) as asin_brand_name from dim_asin_detail where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' and length(asin_brand_name) > 2 and trim(asin_brand_name) not in ('null', 'None', 'none', 'Null', '') and asin not in (select asin from ods_self_asin where site_name = '{self.site_name}') """ print("sql:", sql) self.df_asin_detail = self.spark.sql(sqlQuery=sql) # 读取dwd_asin_measure,取bsr_orders sql = f"""select asin, cast(asin_bsr_orders as int) as asin_bsr_orders from dwd_asin_measure where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';""" print("sql:", sql) self.df_asin_measure = self.spark.sql(sqlQuery=sql).cache() # 获取品牌词黑名单 从pgsql字典表获取:match_character_dict pg_sql = f"""select lower(trim(character_name)) as st_brand_name_lower, 1 as black_flag from match_character_dict where match_type = '品牌词库黑名单'""" conn_info = DBUtil.get_connection_info("mysql", "us") self.df_brand_black = SparkUtil.read_jdbc_query( session=self.spark, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=pg_sql ) def handle_data(self): # 100w内搜索词和st_asin关联,找到范围内, 搜索词和asin的关系 self.df_st_asin = self.df_dim_st.join( self.df_st_asin, on=['search_term'], how='inner' ) self.df_st_asin = self.df_st_asin.join( self.df_asin_detail, on=['asin'], how='left' ).join( self.df_asin_measure, on=['asin'], how='left' ) self.df_st_asin = self.df_st_asin.na.fill({"asin_bsr_orders": 0}) # 按搜索词、品牌分组 对bsr销量进行统计 self.df_st_brand = self.df_st_asin.groupby(['search_term', 'asin_brand_name']).agg( F.sum("asin_bsr_orders").alias('bsr_orders') ) # 过滤掉品牌-搜索词为null self.df_st_brand = self.df_st_brand.filter('asin_brand_name is not null') # print("self.df_st_brand", self.df_st_brand.show(10, truncate=False)) # 开窗给品牌排序,取top3 brand_window = Window.partitionBy(['search_term']).orderBy( self.df_st_brand.bsr_orders.desc_nulls_last() ) self.df_st_brand = self.df_st_brand.withColumn("st_brand_rank", F.row_number().over(window=brand_window)) self.df_st_brand = self.df_st_brand.filter("st_brand_rank<=3") # print("self.df_st_brand", self.df_st_brand.show(10, truncate=False)) # self.df_st_brand = self.df_st_brand.drop('search_term', 'brand_rank') # 将品牌名转换小写 self.df_st_brand = self.df_st_brand.withColumn('st_brand_name_lower', F.lower('asin_brand_name')) # self.df_st_brand = self.df_st_brand.dropDuplicates(['brand_name']) # 与品牌黑名单df_brand_black,补充黑名单标签 self.df_st_brand = self.df_st_brand.join( self.df_brand_black, on=['st_brand_name_lower'], how='left' ) # 非黑名单标签置为0 self.df_st_brand = self.df_st_brand.na.fill({'black_flag': 0}) # 补充st_key self.df_st_brand = self.df_st_brand.join( self.df_st_key, on=['search_term'], how='inner' ) def save_data(self): # 补全分区字段 df_save = self.df_st_brand.select( F.col('st_key'), F.col('search_term'), F.col('asin_brand_name').alias('st_brand_name'), F.col('st_brand_name_lower'), F.col('st_brand_rank'), F.col('black_flag'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'), F.lit(self.site_name).alias('site_name'), F.lit(self.date_type).alias('date_type'), F.lit(self.date_info).alias('date_info') ) df_save = df_save.repartition(self.partitions_num) partition_by = ["site_name", "date_type", "date_info"] print(f"清除hdfs目录中.....{self.hdfs_path}") HdfsUtils.delete_file_in_folder(self.hdfs_path) print(f"当前存储的表名为:{self.hive_table},分区为{partition_by}") df_save.write.saveAsTable(name=self.hive_table, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) obj = DimStBrandInfo(site_name, date_type, date_info) obj.run()