dwt_theme_st_asin.py 3.85 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from pyspark.sql import functions as F
from utils.templates import Templates


class DwtThemeStAsin(Templates):

    def __init__(self, site_name='us', date_type='month', date_info='2024-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = "dwt_theme_st_asin"
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.reset_partitions(partitions_num=20)
        self.partition_dict = {
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }

        self.df_st_theme = self.spark.sql(f"select 1+1;")
        self.df_st_asin = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_joined = self.spark.sql(f"select 1+1;")

    def read_data(self):
        sql = f"""
            select 
                theme_ch,
                theme_en,
                theme_label_ch,
                theme_label_en,
                st_key,
                search_term
            from dws_st_theme
            where site_name = '{self.site_name}' 
              and date_type = '{self.date_type}' 
              and date_info = '{self.date_info}';
        """
        print(sql)
        self.df_st_theme = self.spark.sql(sql).cache()

        sql = f"""
            select 
                search_term,
                asin
            from dim_st_asin_info
            where site_name = '{self.site_name}' 
              and date_type = '{self.date_type}' 
              and date_info = '{self.date_info}';
        """
        print(sql)
        self.df_st_asin = self.spark.sql(sql).cache()

    def handle_data(self):
        self.df_st_asin = self.df_st_asin.dropDuplicates(["search_term", "asin"])
        self.df_st_theme = self.df_st_theme.dropDuplicates(
            ["theme_ch", "theme_en", "theme_label_ch", "theme_label_en", "st_key", "search_term"]
        )
        self.df_joined = self.df_st_asin.join(self.df_st_theme, "search_term", "inner")

    def save_data(self):
        self.df_save = self.df_joined.select(
            F.col('theme_ch'),
            F.col('theme_en'),
            F.col('theme_label_ch'),
            F.col('theme_label_en'),
            F.col('st_key'),
            F.col('search_term'),
            F.col('asin'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
            F.lit(self.site_name).alias('site_name'),
            F.lit(self.date_type).alias('date_type'),
            F.lit(self.date_info).alias('date_info')
        )
        hdfs_path_asin_info = CommonUtil.build_hdfs_path(self.db_save, partition_dict=self.partition_dict)
        print(f"清除hdfs目录中:{hdfs_path_asin_info}")
        HdfsUtils.delete_file_in_folder(hdfs_path_asin_info)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.db_save},分区为{partition_by}", )
        self.df_save.write.saveAsTable(name=self.db_save, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    assert site_name is not None, "site_name 不能为空!"
    assert date_type is not None, "date_type 不能为空!"
    assert date_info is not None, "date_info 不能为空!"
    obj = DwtThemeStAsin(site_name=site_name, date_type=date_type, date_info=date_info)
    obj.run()