dwt_fb_top20_asin_info.py 6.3 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
"""
   @Author      : HuangJian
   @Description : 店铺top20数据详情表
   @SourceTable :
                  ①ods_st_key
                  ②dim_st_detail

   @SinkTable   : dwt_fb_top20_info
   @CreateTime  : 2022/07/24 14:55
   @UpdateTime  : 2022/07/24 14:55
"""

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from pyspark.sql.types import IntegerType
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_udf import udf_new_asin_flag


class DwtFbTop20Info(object):
    def __init__(self, site_name, date_type, date_info):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.hive_tb = f"dwt_fb_top20_asin_info"
        self.partition_dict = {
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
        # 落表路径校验
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)

        # 创建spark_session对象相关
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)

        # 获取不同维度日期下的计算日期YYYY-MM-DD
        self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info)

        # 初始化全局df
        self.df_fb_top20_asin_info = self.spark.sql(f"select 1+1;")
        self.df_seller_account = self.spark.sql(f"select 1+1;")

        # 初始化UDF函数
        self.udf_new_asin_flag = self.spark.udf.register("udf_new_asin_flag", udf_new_asin_flag, IntegerType())

    def read_data(self):
        # 获取店铺抓取top20的基础信息数据
        sql = f"""
            with base_table as(
                select 
                    seller_id,
                    asin,
                    title,
                    img_url,
                    price,
                    rating,
                    total_comments,
                    row_num
                from ods_asin_detail_product
                where site_name = '{self.site_name}'
                  and date_type = '{self.date_type}'
                  and date_info = '{self.date_info}'
            ), 
            history_asin as(
                select
                    asin,
                    asin_volume,
                    asin_weight,
                    asin_launch_time
                from dim_cal_asin_history_detail
                where site_name = '{self.site_name}'
            )
            select
                base_table.seller_id,
                base_table.asin,
                base_table.title as asin_title,
                base_table.img_url as asin_img_url,
                base_table.price as asin_price,
                base_table.rating as asin_rating,
                base_table.total_comments as asin_total_comments,
                base_table.row_num as fb_row_num,
                history_asin.asin_volume,
                history_asin.asin_weight,
                history_asin.asin_launch_time,
                udf_new_asin_flag(history_asin.asin_launch_time,'{self.cal_date}') as is_asin_new
            from base_table
            left join history_asin
            on base_table.asin = history_asin.asin
        """
        self.df_fb_top20_asin_info = self.spark.sql(sqlQuery=sql).cache()
        print(sql)
        self.df_fb_top20_asin_info = self.df_fb_top20_asin_info.drop_duplicates(['seller_id', 'asin'])
        # print("self.df_fb_top20_asin_info", self.df_fb_top20_asin_info.show(10, truncate=False))

        # 获取ods_seller_account_syn提取account_name
        print("获取 ods_seller_account_syn")
        sql = f"""
            select 
                seller_id,
                account_name,
                id 
            from ods_seller_account_syn 
            where site_name='{self.site_name}'
        """
        self.df_seller_account = self.spark.sql(sqlQuery=sql)
        # 进行去重
        self.df_seller_account = self.df_seller_account.orderBy(self.df_seller_account.id.desc())
        self.df_seller_account = self.df_seller_account.drop_duplicates(['seller_id'])
        self.df_seller_account = self.df_seller_account.drop('id')
        print(sql)

    def sava_data(self):
        # 关联ods_seller_account_syn,带回account_name
        df_save = self.df_fb_top20_asin_info.join(
            self.df_seller_account, on='seller_id', how='inner'
        )

        df_save = df_save.select(
            F.col('seller_id'),
            F.col('account_name'),
            F.col('asin'),
            F.col('asin_title'),
            F.col('asin_img_url'),
            F.col('asin_price'),
            F.col('asin_rating'),
            F.col('asin_total_comments'),
            F.col('fb_row_num'),
            F.col('asin_volume'),
            F.col('asin_weight'),
            F.col('asin_launch_time'),
            F.col('is_asin_new'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
            F.lit(self.site_name).alias('site_name'),
            F.lit(self.date_type).alias('date_type'),
            F.lit(self.date_info).alias('date_info')
        )

        # CommonUtil.check_schema(self.spark, df_save, self.hive_tb)

        print(f"清除hdfs目录中:{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)

        df_save = df_save.repartition(10)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")

    def run(self):
        # 读取数据
        self.read_data()
        # 字段处理
        self.sava_data()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtFbTop20Info(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()