dwt_amazon_report.py 5.52 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.functions import concat_ws


class DwtAmazonReport(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2021-10'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dwt_amazon_report'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.reset_partitions(partitions_num=120)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.df_dwd_new = self.spark.sql(f"select 1+1;")
        self.df_dwd_old = self.spark.sql(f"select 1+1;")
        self.df_joined = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")


    def read_data(self):
        # 从dwd层读取本月数据
        sql1 = f"""
        select 
            asin,
            monthly_sales as new_monthly_sales,
            zr_count as new_zr_count,
            sp_count as new_sp_count,
            total_count as new_total_count,
            date_info as new_date_info_list 
        from 
            dwd_amazon_report 
        where 
            site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}';
        """
        print(sql1)
        self.df_dwd_new = self.spark.sql(sqlQuery=sql1).repartition(15, 'asin').cache()
        self.df_dwd_new.show(10, truncate=True)

        # 从dwt层读取上月数据
        date_info_pre = CommonUtil.get_month_offset(self.date_info, -1)
        sql2 = f"""
        select 
            asin,
            monthly_sales as old_monthly_sales,
            zr_count as old_zr_count,
            sp_count as old_sp_count,
            total_count as old_total_count,
            date_info_list as old_date_info_list 
        from 
            dwt_amazon_report 
        where 
            site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{date_info_pre}';
        """
        print(sql2)
        self.df_dwd_old = self.spark.sql(sqlQuery=sql2).repartition(15, 'asin').cache()
        self.df_dwd_old.show(10, truncate=True)

    def handle_data(self):
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)
        # 关联后的列名
        join_columns = ['monthly_sales', 'zr_count', 'sp_count', 'total_count', 'date_info_list']
        # 获取历史df对象中,date_info的数量,用来确定关联不到的历史asin填充多少个 -1
        old_date_info_first = self.df_dwd_old.select('old_date_info_list').distinct().first()
        if old_date_info_first is None:
            old_date_info_list = None
            old_date_info_list_len = 0
        else:
            old_date_info_list = old_date_info_first[0]
            old_date_info_list_len = len(old_date_info_list.split(','))
        fillna_old = ('-1,' * old_date_info_list_len).rstrip(',')
        # 本月数据如果关联不上,填充一个 -1
        fillna_new = '-1'
        # 关联df,并填充null值
        self.df_joined = self.df_dwd_new.join(
            self.df_dwd_old, on='asin', how='full'
        )
        for col in join_columns:
            self.df_joined = self.df_joined.fillna({'old_' + col: fillna_old})
            self.df_joined = self.df_joined.fillna({'new_' + col: fillna_new})
        # 填充date_info_list
        self.df_joined = self.df_joined.withColumn(
            "old_date_info_list", F.lit(old_date_info_list)
        ).withColumn(
            "new_date_info_list", F.lit(self.date_info)
        )
        # 拼接历史数据和本月数据,生成新的列
        if old_date_info_first is None:
            for col in join_columns:
                self.df_joined = self.df_joined.withColumn(
                    col,
                    self.df_joined['new_' + col]
                )
        else:
            for col in join_columns:
                self.df_joined = self.df_joined.withColumn(
                    col,
                    concat_ws(',', self.df_joined['old_' + col], self.df_joined['new_' + col])
                )
        # 选择需要的列
        selected_columns = ['asin'] + join_columns
        self.df_save = self.df_joined.select(selected_columns)
        self.df_save = self.df_save.withColumn(
            "weekly_sales", F.lit(None)
        ).withColumn(
            "weekly_views", F.lit(None)
        ).withColumn(
            "monthly_views", F.lit(None)
        ).withColumn(
            "site_name", F.lit(self.site_name)
        ).withColumn(
            "date_type", F.lit(self.date_type)
        ).withColumn(
            "date_info", F.lit(self.date_info)
        )


if __name__ == '__main__':
    site_name = sys.argv[1]
    date_type = sys.argv[2]
    date_info = sys.argv[3]
    if (site_name in ['us', 'uk', 'de']) and (date_type == 'month') and (date_info >= '2024-04'):
        handle_obj = DwtAmazonReport(site_name=site_name, date_type=date_type, date_info=date_info)
        handle_obj.run()
    else:
        print("暂不计算该维度数据!")
        quit()