""" @Author : HuangJian @Description : 店铺top20数据详情表 @SourceTable : ①ods_st_key ②dim_st_detail @SinkTable : dwt_fb_top20_info @CreateTime : 2022/07/24 14:55 @UpdateTime : 2022/07/24 14:55 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.hdfs_utils import HdfsUtils from utils.common_util import CommonUtil from pyspark.sql.types import IntegerType from utils.spark_util import SparkUtil from pyspark.sql import functions as F from yswg_utils.common_udf import udf_new_asin_flag class DwtFbTop20Info(object): def __init__(self, site_name, date_type, date_info): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.hive_tb = f"dwt_fb_top20_asin_info" self.partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } # 落表路径校验 self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict) # 创建spark_session对象相关 app_name = f"{self.__class__.__name__}:{site_name}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) # 获取不同维度日期下的计算日期YYYY-MM-DD self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info) # 初始化全局df self.df_fb_top20_asin_info = self.spark.sql(f"select 1+1;") self.df_seller_account = self.spark.sql(f"select 1+1;") # 初始化UDF函数 self.udf_new_asin_flag = self.spark.udf.register("udf_new_asin_flag", udf_new_asin_flag, IntegerType()) def read_data(self): # 获取店铺抓取top20的基础信息数据 sql = f""" with base_table as( select seller_id, asin, title, img_url, price, rating, total_comments, row_num from ods_asin_detail_product where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' ), history_asin as( select asin, asin_volume, asin_weight, asin_launch_time from dim_cal_asin_history_detail where site_name = '{self.site_name}' ) select base_table.seller_id, base_table.asin, base_table.title as asin_title, base_table.img_url as asin_img_url, base_table.price as asin_price, base_table.rating as asin_rating, base_table.total_comments as asin_total_comments, base_table.row_num as fb_row_num, history_asin.asin_volume, history_asin.asin_weight, history_asin.asin_launch_time, udf_new_asin_flag(history_asin.asin_launch_time,'{self.cal_date}') as is_asin_new from base_table left join history_asin on base_table.asin = history_asin.asin """ self.df_fb_top20_asin_info = self.spark.sql(sqlQuery=sql).cache() print(sql) self.df_fb_top20_asin_info = self.df_fb_top20_asin_info.drop_duplicates(['seller_id', 'asin']) # print("self.df_fb_top20_asin_info", self.df_fb_top20_asin_info.show(10, truncate=False)) # 获取ods_seller_account_syn提取account_name print("获取 ods_seller_account_syn") sql = f""" select seller_id, account_name, id from ods_seller_account_syn where site_name='{self.site_name}' """ self.df_seller_account = self.spark.sql(sqlQuery=sql) # 进行去重 self.df_seller_account = self.df_seller_account.orderBy(self.df_seller_account.id.desc()) self.df_seller_account = self.df_seller_account.drop_duplicates(['seller_id']) self.df_seller_account = self.df_seller_account.drop('id') print(sql) def sava_data(self): # 关联ods_seller_account_syn,带回account_name df_save = self.df_fb_top20_asin_info.join( self.df_seller_account, on='seller_id', how='inner' ) df_save = df_save.select( F.col('seller_id'), F.col('account_name'), F.col('asin'), F.col('asin_title'), F.col('asin_img_url'), F.col('asin_price'), F.col('asin_rating'), F.col('asin_total_comments'), F.col('fb_row_num'), F.col('asin_volume'), F.col('asin_weight'), F.col('asin_launch_time'), F.col('is_asin_new'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'), F.lit(self.site_name).alias('site_name'), F.lit(self.date_type).alias('date_type'), F.lit(self.date_info).alias('date_info') ) # CommonUtil.check_schema(self.spark, df_save, self.hive_tb) print(f"清除hdfs目录中:{self.hdfs_path}") HdfsUtils.delete_file_in_folder(self.hdfs_path) df_save = df_save.repartition(10) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") def run(self): # 读取数据 self.read_data() # 字段处理 self.sava_data() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwtFbTop20Info(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()