dim_st_brand_info.py 8.39 KB
"""
   @Author      : HuangJian
   @Description : 时间周期内-搜索词top3的品牌库(排除公司asin统计)
   @SourceTable :
                  ①dim_st_detail
                  ②dim_st_asin_info
                  ③dim_asin_detail
                  ④dim_asin_measure
   @SinkTable   : dim_st_brand_info
   @CreateTime  : 2023/04/19 09:20
   @UpdateTime  : 2022/04/19 09:20
"""
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.db_util import DBUtil


class DimStBrandInfo(object):

    def __init__(self, site_name, date_type, date_info):
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.partition_dict = {
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
        self.hive_table = "dim_st_brand_info"
        # 落表路径校验
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict=self.partition_dict)
        print(f"hdfs_path is {self.hdfs_path}")

        app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"


        self.spark = SparkUtil.get_spark_session(app_name)

        self.partitions_num = CommonUtil.reset_partitions(site_name, 2)

        # 初始化全局df
        self.df_dim_st = self.spark.sql(f"select 1+1;")
        self.df_st_asin = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_st_brand = self.spark.sql(f"select 1+1;")
        self.df_asin_measure = self.spark.sql(f"select 1+1;")
        self.df_base_brand = self.spark.sql(f"select 1+1;")
        self.df_brand_black = self.spark.sql(f"select 1+1;")
        self.df_st_key = self.spark.sql(f"select 1+1;")

    def run(self):
        # 读取数据
        self.read_data()
        # 逻辑处理
        self.handle_data()
        # 数据存储
        self.save_data()

    def read_data(self):
        print("======================查询sql如下======================")

        # 读取ods_st_key
        sql = f"select search_term,cast(st_key as int) as st_key from ods_st_key where site_name = '{self.site_name}' "
        self.df_st_key = self.spark.sql(sqlQuery=sql)
        print("sql:", sql)

        # 读取dim_st_detail,取排名100w的搜索词
        sql = f"""select search_term 
                    from dim_st_detail
                    where site_name = '{self.site_name}'
                      and date_type = '{self.date_type}'
                      and date_info = '{self.date_info}'
                      and st_rank < 1000000;
                """
        self.df_dim_st = self.spark.sql(sqlQuery=sql)
        print("sql:", sql)

        # 读取st_asin的关系
        sql = f"""select search_term,
                         asin
                  from dim_st_asin_info
                  where site_name = '{self.site_name}'
                    and date_type = '{self.date_type}'
                    and date_info = '{self.date_info}'
                    group by search_term, asin
               """
        self.df_st_asin = self.spark.sql(sqlQuery=sql)
        print("sql:", sql)

        # 读取dim_asin_detail取asin和brand_name--需排除公司内部asin
        sql = f"""
                 select asin, trim(asin_brand_name) as asin_brand_name
                 from dim_asin_detail
                 where site_name = '{self.site_name}'
                    and date_type = '{self.date_type}'
                    and date_info = '{self.date_info}'
                    and length(asin_brand_name) > 2
                    and trim(asin_brand_name) not in ('null', 'None', 'none', 'Null', '')
                    and asin not in (select asin from ods_self_asin where site_name = '{self.site_name}')
               """
        print("sql:", sql)
        self.df_asin_detail = self.spark.sql(sqlQuery=sql)

        # 读取dwd_asin_measure,取bsr_orders
        sql = f"""select asin, cast(asin_bsr_orders as int) as asin_bsr_orders 
              from dwd_asin_measure 
              where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"""
        print("sql:", sql)
        self.df_asin_measure = self.spark.sql(sqlQuery=sql).cache()

        # 获取品牌词黑名单 从pgsql字典表获取:match_character_dict
        pg_sql = f"""select lower(trim(character_name)) as st_brand_name_lower, 1 as black_flag 
                     from match_character_dict where match_type = '品牌词库黑名单'"""
        conn_info = DBUtil.get_connection_info("mysql", "us")
        self.df_brand_black = SparkUtil.read_jdbc_query(
            session=self.spark,
            url=conn_info["url"],
            pwd=conn_info["pwd"],
            username=conn_info["username"],
            query=pg_sql
        )


    def handle_data(self):
        # 100w内搜索词和st_asin关联,找到范围内, 搜索词和asin的关系
        self.df_st_asin = self.df_dim_st.join(
            self.df_st_asin, on=['search_term'], how='inner'
        )

        self.df_st_asin = self.df_st_asin.join(
            self.df_asin_detail, on=['asin'], how='left'
        ).join(
            self.df_asin_measure, on=['asin'], how='left'
        )

        self.df_st_asin = self.df_st_asin.na.fill({"asin_bsr_orders": 0})

        # 按搜索词、品牌分组 对bsr销量进行统计
        self.df_st_brand = self.df_st_asin.groupby(['search_term', 'asin_brand_name']).agg(
            F.sum("asin_bsr_orders").alias('bsr_orders')
        )

        # 过滤掉品牌-搜索词为null
        self.df_st_brand = self.df_st_brand.filter('asin_brand_name is not null')

        # print("self.df_st_brand", self.df_st_brand.show(10, truncate=False))

        # 开窗给品牌排序,取top3
        brand_window = Window.partitionBy(['search_term']).orderBy(
            self.df_st_brand.bsr_orders.desc_nulls_last()
        )

        self.df_st_brand = self.df_st_brand.withColumn("st_brand_rank", F.row_number().over(window=brand_window))
        self.df_st_brand = self.df_st_brand.filter("st_brand_rank<=3")
        # print("self.df_st_brand", self.df_st_brand.show(10, truncate=False))
        # self.df_st_brand = self.df_st_brand.drop('search_term', 'brand_rank')

        # 将品牌名转换小写
        self.df_st_brand = self.df_st_brand.withColumn('st_brand_name_lower', F.lower('asin_brand_name'))


        # self.df_st_brand = self.df_st_brand.dropDuplicates(['brand_name'])

        # 与品牌黑名单df_brand_black,补充黑名单标签
        self.df_st_brand = self.df_st_brand.join(
            self.df_brand_black, on=['st_brand_name_lower'], how='left'
        )

        # 非黑名单标签置为0
        self.df_st_brand = self.df_st_brand.na.fill({'black_flag': 0})

        # 补充st_key
        self.df_st_brand = self.df_st_brand.join(
            self.df_st_key, on=['search_term'], how='inner'
        )



    def save_data(self):
        # 补全分区字段
        df_save = self.df_st_brand.select(
            F.col('st_key'),
            F.col('search_term'),
            F.col('asin_brand_name').alias('st_brand_name'),
            F.col('st_brand_name_lower'),
            F.col('st_brand_rank'),
            F.col('black_flag'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
            F.lit(self.site_name).alias('site_name'),
            F.lit(self.date_type).alias('date_type'),
            F.lit(self.date_info).alias('date_info')
        )

        df_save = df_save.repartition(self.partitions_num)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"清除hdfs目录中.....{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)
        print(f"当前存储的表名为:{self.hive_table},分区为{partition_by}")
        df_save.write.saveAsTable(name=self.hive_table, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    obj = DimStBrandInfo(site_name, date_type, date_info)
    obj.run()