dim_st_asin_history_info.py 5.02 KB
"""
author: 方星钧(ffman)
description: 清洗6大站点对应的 单周的zr,sp,sb,ac,bs,er,tr等7大类型数据表(计算zr,sp类型表的page_rank+合并7张表)
table_read_name: ods_st_rank_zr/sp/sb/ac/bs/er/tr
table_save_name: dim_st_asin_info
table_save_level: dim
version: 3.0
created_date: 2022-05-10
updated_date: 2022-11-07
"""

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import IntegerType


class DimStAsinInfo(Templates):

    def __init__(self, site_name='us', date_type="day", date_info='2022-10-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dim_st_asin_info'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
        self.df_date = self.get_year_week_tuple()
        self.df_save = self.spark.sql(f"select search_term, asin, page, page_row, 'zr' as data_type, updated_time,site_name,date_type,date_info from ods_search_term_zr limit 0;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(partitions_num=60)
        self.data_type_list = ['tr', 'er', 'bs', 'ac', 'sb1', 'sb2', 'sb3', 'sp', 'zr']  # 小表拼大表

    @staticmethod
    def udf_page_rank(page, page_1_count, page_2_count, page_row):
        """
        处理 zr, sp 的page_rank字段
        :param page:
        :param page_1_count:
        :param page_2_count:
        :param page_row:
        :return: page_rank
        """
        if page == 1:
            return page_row
        elif page == 2:
            return page_1_count + page_row
        else:
            return page_2_count + page_row

    def handle_data_page_rank(self, df, data_type):
        print(f"{data_type}--page_rank计算")
        u_page_rank = self.spark.udf.register('u_page_rank', self.udf_page_rank, IntegerType())
        # 由于zr,sp存在重复值,改成max,而不是使用count
        df_page_1 = df.filter(f"page=1").groupBy(['search_term']).agg({f"page_row": "max"})
        df_page_2 = df.filter(df[f'page'] == 2).groupBy(['search_term']).agg(
            {f"page_row": "max"})
        df_page_1 = df_page_1.withColumnRenamed(f'max(page_row)', 'page_1_count')
        df_page_2 = df_page_2.withColumnRenamed(f'max(page_row)', 'page_2_count_old')
        df = df.join(df_page_1, on='search_term', how='left'). \
            join(df_page_2, on='search_term', how='left')
        df = df.fillna(0)
        df = df.withColumn("page_2_count", df.page_1_count + df.page_2_count_old)
        df = df.withColumn(f"page_rank", u_page_rank(
            df[f'page'], df.page_1_count, df.page_2_count, df[f'page_row']))
        # df.show(n=10, truncate=False)
        return df

    def read_data(self):
        for data_type in self.data_type_list:
            print(f"site_name: {self.site_name}, data_type: {data_type}")
            if data_type in ['zr', 'sp']:
                sql = f"select search_term, asin, page, page_row, '{data_type}' as data_type, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \
                      f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
                df = self.spark.sql(sqlQuery=sql)
                # 处理page_rank
                df = self.handle_data_page_rank(df=df, data_type=data_type)
                df = df.drop('page_1_count', 'page_2_count', 'page_2_count_old')
            else:
                if data_type in ['sb1', 'sb2', 'sb3']:
                    sql = f"select search_term, asin, page, '{data_type}' as data_type, updated_time, site_name,date_type,date_info from ods_search_term_sb " \
                          f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and data_type={int(data_type[-1])};"
                else:
                    sql = f"select search_term, asin, page, '{data_type}' as data_type, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \
                          f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
                df = self.spark.sql(sqlQuery=sql)
            # print(f"site_name: {self.site_name}, data_type: {data_type}, partitions: {df.rdd.getNumPartitions()}")
            self.df_save = self.df_save.unionByName(df, allowMissingColumns=True)
        # self.df_save.show(n=10, truncate=False)
        # print("self.df_save.count():", self.df_save.count())


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter/day
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    handle_obj = DimStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()