ods_brand_analytics.py 8 KB
"""
author: 方星钧(ffman)
description: 清洗6大站点对应的 “ods_brand_analytics” 的表: 排名权重计算,用天补全周/30天/月,存储新增的关键词
table_read_name: ods_brand_analytics
table_save_name: ods_brand_analytics
table_save_level: ods
version: 1.0
created_date: 2022-11-21
updated_date: 2022-11-21
"""


import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class OdsBrandAnalytics(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.date_info2 = date_info
        self.db_save = f'ods_brand_analytics'
        self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        # self.df_date = self.get_year_week_tuple()  # pandas的df对象
        self.df_st = self.spark.sql(f"select 1+1;")
        self.df_st_current = self.spark.sql(f"select 1+1;")
        self.df_st_rank = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_num = 1
        self.reset_partitions(partitions_num=self.partitions_num)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        if self.date_type in ['4_week', "last30day"]:
            print(f"date_type={self.date_type}, 无需导入数据")
        else:
            self.handle_st_import()
        self.get_year_week_tuple()
        # if self.date_type == '4_week':
        #     self.date_info = '2022-12-17'
        self.get_date_info_tuple()

    def read_data(self):
        if self.date_type == '4_week':
            # if self.site_name in ['us']:
            #     params1 = f"date_type='day' and date_info in {self.date_info_tuple}"
            # else:
            #     params1 = f"date_type='week' and date_info in {self.year_week_tuple}"
            params1 = f"date_type='week' and date_info in {self.year_week_tuple}"
            params2 = f" limit 0"
        elif self.date_type == 'week_old':
            # 旧版周表导入之后直接退出
            quit()
        elif self.date_type == 'month_old':
            params1 = f"date_type='week_old' and date_info in {self.year_week_tuple}"
            params2 = f""
        else:
            params1 = f"date_type='day' and date_info in {self.date_info_tuple}"
            params2 = f""
            if self.date_type == "last30day":
                params2 = f" limit 0"
        print("1.1 读取ods_brand_analytics表")
        # sql = f"select * from ods_brand_analytics where site_name='{self.site_name}' " \
        #       f"and date_type='day' and date_info in {self.date_info_tuple};"
        sql = f"select * from ods_brand_analytics where site_name='{self.site_name}' " \
              f"and {params1};"
        print("sql:", sql)
        self.df_st = self.spark.sql(sql).cache()
        self.df_st.show(10, truncate=False)
        if self.df_st.count() == 0:
            quit()

        # print("self.df_st:", self.df_st.drop_duplicates(['search_term']).count())
        print("1.2 读取ods_brand_analytics表")
        sql = f"select * from ods_brand_analytics where site_name='{self.site_name}' " \
              f"and date_type='{self.date_type}' and date_info = '{self.date_info}' {params2};"
        print("sql:", sql)
        self.df_st_current = self.spark.sql(sql).cache()
        self.df_st_current.show(10, truncate=False)

    def handle_data(self):
        self.handle_st_rank()
        self.handle_st_duplicated()
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info2))
        self.df_save.show(10, truncate=False)

    def handle_st_import(self):
        print(f"导入关键词数据: {self.site_name}, {self.date_type}, {self.date_info}")
        os.system(f"/mnt/run_shell/sqoop_shell/import/ods_brand_analytics.sh {self.site_name} {self.date_type} {self.date_info}")

    def handle_st_rank(self):
        self.df_st_rank = self.df_st.select("search_term", "rank", "date_info")
        self.df_st_current = self.df_st_current.withColumn("flag", F.lit(1))
        # self.df_st_rank.show(10, truncate=False)
        self.df_st_rank = self.df_st_rank.join(
            self.df_st_current.select("search_term", "flag"), on='search_term', how='left'
        )
        self.df_st_rank = self.df_st_rank.filter("flag is null")
        # self.df_st_rank.show(10, truncate=False)
        # count = self.df_st_current.count()  # 计算当前周/月关键词的数量
        df_count = self.df_st.groupby(['date_info']).count()
        # df_count.show(10, truncate=False)
        df_count = df_count.toPandas()
        date_dict = {date_info: count for date_info, count in zip(df_count.date_info, df_count['count'])}
        # print("date_dict:", date_dict)
        self.df_st_rank = self.df_st_rank.groupby(['search_term']). \
            pivot("date_info").agg(F.mean("rank"))
        # self.df_st_rank.show(10, truncate=False)
        self.df_st_rank = self.df_st_rank.fillna(date_dict)
        self.df_st_rank = self.df_st_rank.withColumn("rank_sum", F.lit(0))
        for col in date_dict.keys():
            print("col:", col)
            self.df_st_rank = self.df_st_rank.withColumn(
                "rank_sum", self.df_st_rank.rank_sum + self.df_st_rank[col]
            )
        self.df_st_rank = self.df_st_rank.withColumn(
            "rank_sum_avg", self.df_st_rank.rank_sum / len(self.date_info_tuple)
        )
        window = Window.orderBy(
            self.df_st_rank.rank_sum_avg.asc()
        )
        self.df_st_rank = self.df_st_rank.withColumn("rank_avg", F.row_number().over(window=window))
        self.df_st_rank = self.df_st_rank.drop("rank_sum", "rank_sum_avg")
        # self.df_st_rank.show(10, truncate=False)
        for col in date_dict.keys():
            self.df_st_rank = self.df_st_rank.drop(col)
        # self.df_st_rank.show(10, truncate=False)
        self.df_st_rank = self.df_st_rank.withColumnRenamed("rank_avg", "rank")
        # self.df_st_rank = self.df_st_rank.withColumn("rank", self.df_st_rank.rank+F.lit(self.df_st_current.rank.count()))
        df_max_rank = self.df_st_current.agg(F.max('rank').alias("max_rank"))
        df_max_rank = df_max_rank.toPandas()
        max_rank = list(df_max_rank.max_rank)[0] if self.date_type not in ['4_week', 'last30day'] else 0
        if self.date_type == 'last30day':
            self.df_st_rank = self.df_st_rank.fillna({'rank': 0})
        self.df_st_rank = self.df_st_rank.withColumn("rank", self.df_st_rank.rank+F.lit(max_rank))
        # print("self.df_st_rank:", self.df_st_rank.count())
        # self.df_st_rank.show(10, truncate=False)

    def handle_st_duplicated(self):
        # 默认取最新一天的关键词数据
        window = Window.partitionBy(['search_term']).orderBy(
            self.df_st.date_info.desc()
        )
        self.df_st = self.df_st.withColumn("rank_top", F.row_number().over(window))
        self.df_st = self.df_st.filter("rank_top=1")
        self.df_st = self.df_st.drop("rank_top", "rank")
        self.df_save = self.df_st_rank.join(
            self.df_st, on='search_term', how='left'
        )
        # print("self.df_save:", self.df_save.count())
        # self.df_save.show(10, truncate=False)


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:day/week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
    handle_obj = OdsBrandAnalytics(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()