import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from pyspark.sql import functions as F from utils.templates import Templates class DwtThemeStAsin(Templates): def __init__(self, site_name='us', date_type='month', date_info='2024-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = "dwt_theme_st_asin" self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.reset_partitions(partitions_num=20) self.partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } self.df_st_theme = self.spark.sql(f"select 1+1;") self.df_st_asin = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") self.df_joined = self.spark.sql(f"select 1+1;") def read_data(self): sql = f""" select theme_ch, theme_en, theme_label_ch, theme_label_en, st_key, search_term from dws_st_theme where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ print(sql) self.df_st_theme = self.spark.sql(sql).cache() sql = f""" select search_term, asin from dim_st_asin_info where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ print(sql) self.df_st_asin = self.spark.sql(sql).cache() def handle_data(self): self.df_st_asin = self.df_st_asin.dropDuplicates(["search_term", "asin"]) self.df_st_theme = self.df_st_theme.dropDuplicates( ["theme_ch", "theme_en", "theme_label_ch", "theme_label_en", "st_key", "search_term"] ) self.df_joined = self.df_st_asin.join(self.df_st_theme, "search_term", "inner") def save_data(self): self.df_save = self.df_joined.select( F.col('theme_ch'), F.col('theme_en'), F.col('theme_label_ch'), F.col('theme_label_en'), F.col('st_key'), F.col('search_term'), F.col('asin'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'), F.lit(self.site_name).alias('site_name'), F.lit(self.date_type).alias('date_type'), F.lit(self.date_info).alias('date_info') ) hdfs_path_asin_info = CommonUtil.build_hdfs_path(self.db_save, partition_dict=self.partition_dict) print(f"清除hdfs目录中:{hdfs_path_asin_info}") HdfsUtils.delete_file_in_folder(hdfs_path_asin_info) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.db_save},分区为{partition_by}", ) self.df_save.write.saveAsTable(name=self.db_save, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) assert site_name is not None, "site_name 不能为空!" assert date_type is not None, "date_type 不能为空!" assert date_info is not None, "date_info 不能为空!" obj = DwtThemeStAsin(site_name=site_name, date_type=date_type, date_info=date_info) obj.run()