dwt_fb_top20_asin_info.py 6.3 KB
"""
   @Author      : HuangJian
   @Description : 店铺top20数据详情表
   @SourceTable :
                  ①ods_st_key
                  ②dim_st_detail

   @SinkTable   : dwt_fb_top20_info
   @CreateTime  : 2022/07/24 14:55
   @UpdateTime  : 2022/07/24 14:55
"""

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from pyspark.sql.types import IntegerType
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_udf import udf_new_asin_flag


class DwtFbTop20Info(object):
    def __init__(self, site_name, date_type, date_info):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.hive_tb = f"dwt_fb_top20_asin_info"
        self.partition_dict = {
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
        # 落表路径校验
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)

        # 创建spark_session对象相关
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)

        # 获取不同维度日期下的计算日期YYYY-MM-DD
        self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info)

        # 初始化全局df
        self.df_fb_top20_asin_info = self.spark.sql(f"select 1+1;")
        self.df_seller_account = self.spark.sql(f"select 1+1;")

        # 初始化UDF函数
        self.udf_new_asin_flag = self.spark.udf.register("udf_new_asin_flag", udf_new_asin_flag, IntegerType())

    def read_data(self):
        # 获取店铺抓取top20的基础信息数据
        sql = f"""
            with base_table as(
                select 
                    seller_id,
                    asin,
                    title,
                    img_url,
                    price,
                    rating,
                    total_comments,
                    row_num
                from ods_asin_detail_product
                where site_name = '{self.site_name}'
                  and date_type = '{self.date_type}'
                  and date_info = '{self.date_info}'
            ), 
            history_asin as(
                select
                    asin,
                    asin_volume,
                    asin_weight,
                    asin_launch_time
                from dim_cal_asin_history_detail
                where site_name = '{self.site_name}'
            )
            select
                base_table.seller_id,
                base_table.asin,
                base_table.title as asin_title,
                base_table.img_url as asin_img_url,
                base_table.price as asin_price,
                base_table.rating as asin_rating,
                base_table.total_comments as asin_total_comments,
                base_table.row_num as fb_row_num,
                history_asin.asin_volume,
                history_asin.asin_weight,
                history_asin.asin_launch_time,
                udf_new_asin_flag(history_asin.asin_launch_time,'{self.cal_date}') as is_asin_new
            from base_table
            left join history_asin
            on base_table.asin = history_asin.asin
        """
        self.df_fb_top20_asin_info = self.spark.sql(sqlQuery=sql).cache()
        print(sql)
        self.df_fb_top20_asin_info = self.df_fb_top20_asin_info.drop_duplicates(['seller_id', 'asin'])
        # print("self.df_fb_top20_asin_info", self.df_fb_top20_asin_info.show(10, truncate=False))

        # 获取ods_seller_account_syn提取account_name
        print("获取 ods_seller_account_syn")
        sql = f"""
            select 
                seller_id,
                account_name,
                id 
            from ods_seller_account_syn 
            where site_name='{self.site_name}'
        """
        self.df_seller_account = self.spark.sql(sqlQuery=sql)
        # 进行去重
        self.df_seller_account = self.df_seller_account.orderBy(self.df_seller_account.id.desc())
        self.df_seller_account = self.df_seller_account.drop_duplicates(['seller_id'])
        self.df_seller_account = self.df_seller_account.drop('id')
        print(sql)

    def sava_data(self):
        # 关联ods_seller_account_syn,带回account_name
        df_save = self.df_fb_top20_asin_info.join(
            self.df_seller_account, on='seller_id', how='inner'
        )

        df_save = df_save.select(
            F.col('seller_id'),
            F.col('account_name'),
            F.col('asin'),
            F.col('asin_title'),
            F.col('asin_img_url'),
            F.col('asin_price'),
            F.col('asin_rating'),
            F.col('asin_total_comments'),
            F.col('fb_row_num'),
            F.col('asin_volume'),
            F.col('asin_weight'),
            F.col('asin_launch_time'),
            F.col('is_asin_new'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
            F.lit(self.site_name).alias('site_name'),
            F.lit(self.date_type).alias('date_type'),
            F.lit(self.date_info).alias('date_info')
        )

        # CommonUtil.check_schema(self.spark, df_save, self.hive_tb)

        print(f"清除hdfs目录中:{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)

        df_save = df_save.repartition(10)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")

    def run(self):
        # 读取数据
        self.read_data()
        # 字段处理
        self.sava_data()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtFbTop20Info(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()