ods_brand_analytics.py 14.3 KB
"""
author: 方星钧(ffman)
description: 清洗6大站点对应的 “ods_brand_analytics” 的表: 排名权重计算,用天补全周/30天/月,存储新增的关键词
table_read_name: ods_brand_analytics
table_save_name: ods_brand_analytics
table_save_level: ods
version: 1.0
created_date: 2022-11-21
updated_date: 2022-11-21
"""


import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class OdsBrandAnalytics(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.date_info2 = date_info
        self.db_save = f'ods_brand_analytics'
        self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        # self.df_date = self.get_year_week_tuple()  # pandas的df对象
        self.df_st = self.spark.sql(f"select 1+1;")
        self.df_st_current = self.spark.sql(f"select 1+1;")
        self.df_st_rank = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_num = 1
        self.reset_partitions(partitions_num=self.partitions_num)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.get_year_week_tuple()
        if self.date_type in ['4_week', "last30day"]:
            print(f"date_type={self.date_type}, 无需导入数据")
        else:
            self.handle_st_import()
        # if self.date_type == '4_week':
        #     self.date_info = '2022-12-17'
        self.get_date_info_tuple()

    def read_data(self):
        if (self.date_type == 'week' and date_info >= '2023-21') or self.date_type == 'month_week':
            # 周的搜索词排名从2023-21周开始出现大量重复, 需要动态判断, 决定是否根据id大小给出新的排名
            pass
        else:

            if self.date_type == '4_week':
                # if self.site_name in ['us']:
                #     params1 = f"date_type='day' and date_info in {self.date_info_tuple}"
                # else:
                #     params1 = f"date_type='week' and date_info in {self.year_week_tuple}"
                params1 = f"date_type='week' and date_info in {self.year_week_tuple} and rank <= 1500000"
                params2 = f" limit 0"
            elif self.date_type == 'week_old':
                # 旧版周表导入之后直接退出
                quit()
            elif self.date_type in ['month_old']:
                params1 = f"date_type='week_old' and date_info in {self.year_week_tuple} and rank <= 1500000"
                params2 = f""
            elif self.date_type in ['month']:
                params1 = f"date_type='week' and date_info in {self.year_week_tuple} and rank <= 1500000"
                params2 = f""
            else:
                params1 = f"date_type='day' and date_info in {self.date_info_tuple}"
                params2 = f""
                if self.date_type == "last30day":
                    params2 = f" limit 0"
            print("1.1 读取ods_brand_analytics表")
            # sql = f"select * from ods_brand_analytics where site_name='{self.site_name}' " \
            #       f"and date_type='day' and date_info in {self.date_info_tuple};"
            sql = f"select * from ods_brand_analytics where site_name='{self.site_name}' " \
                  f"and {params1};"
            print("sql:", sql)
            self.df_st = self.spark.sql(sql).cache()
            self.df_st.show(10, truncate=False)
            # if self.df_st.count() == 0:
            #     quit()  # 此处停止会中断程序

            # print("self.df_st:", self.df_st.drop_duplicates(['search_term']).count())
            print("1.2 读取ods_brand_analytics表")
            sql = f"select * from ods_brand_analytics where site_name='{self.site_name}' " \
                  f"and date_type='{self.date_type}' and date_info = '{self.date_info}' {params2};"
            print("sql:", sql)
            self.df_st_current = self.spark.sql(sql).cache()
            self.df_st_current.show(10, truncate=False)

    def handle_us_week_rank(self, year_week='2023-46'):
        if self.date_type == 'month_week':
            sql = f"select * from ods_brand_analytics where site_name='{self.site_name}' and date_type='week' and date_info = '{year_week}';"
        else:
            sql = f"select * from ods_brand_analytics where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';"

        print("sql:", sql)
        self.df_st = self.spark.sql(sql).cache()
        # 将读取的数据写入临时表
        self.df_st.createOrReplaceTempView("temp_table")
        self.df_st.unpersist()  # 停止对表的读取操作, 从而可以进行覆盖写入
        self.df_save = self.spark.sql("select * from temp_table").cache()
        self.df_save.show(10, truncate=False)
        st_count = self.df_save.count()
        # st_max = self.df_save.rank.max(
        # st_max = self.df_save.agg({"rank": "max"}).collect()[0][0]
        # rate = st_max / st_count
        if self.date_type != 'month_week':
            st_max = self.df_save.agg({"rank": "max"}).collect()[0][0]
            rate = st_max / st_count
            if rate >= 0.95:
                print("st_count, st_max, rate:", st_count, st_max, rate)
                quit()
        # elif st_count == 0:
        #     quit()
        else:
            if self.date_type == 'month_week':
                # for year_week in self.year_week_tuple:
                hdf_cmd = f"hdfs dfs -rm -f /home/big_data_selection/ods/ods_brand_analytics/site_name={self.site_name}/date_type=week/date_info={year_week}/*"
                # pass  # 无需删除
            else:
                hdf_cmd = f"hdfs dfs -rm -f /home/big_data_selection/ods/ods_brand_analytics/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}/*"
            print("hdf_cmd:", hdf_cmd)
            os.system(hdf_cmd)
            window = Window.orderBy(
                self.df_save.id.asc()
            )
            self.df_save = self.df_save.withColumn("rank", F.row_number().over(window=window))
            # self.df_save.write.saveAsTable(name=self.db_save, format='hive', mode='overwrite', partitionBy=self.partitions_by)
            # quit()
            if self.date_type == 'month_week':
                self.df_save = self.df_save.withColumn("date_type", F.lit('week'))
                self.df_save = self.df_save.withColumn("date_info", F.lit(year_week))
                self.df_save.show(10, truncate=False)
                self.save_data()

    def handle_data(self):
        if self.date_type in ['week', 'month_week']:
            if self.date_type == 'month_week':
                for year_week in self.year_week_tuple:
                    self.handle_us_week_rank(year_week=year_week)
                    # pass
                # 计算month_week
                sql = f"select * from ods_brand_analytics where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple} and rank <= 1500000;"
                print("sql:", sql)
                self.df_st = self.spark.sql(sql).cache()
                self.df_st.show(10, truncate=False)
                # 将读取的数据写入临时表
                self.df_st.createOrReplaceTempView("temp_table")
                self.df_st.unpersist()  # 停止对表的读取操作, 从而可以进行覆盖写入
                self.df_save = self.spark.sql("select * from temp_table").cache()
                sql = f"select * from ods_brand_analytics where site_name='{self.site_name}' " \
                      f"and date_type='{self.date_type}' and date_info = '{self.date_info}';"
                print("sql:", sql)
                self.df_st_current = self.spark.sql(sql).cache()
                self.df_st_current.show(10, truncate=False)

                self.handle_st_rank()
                self.handle_st_duplicated()
                self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
                self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info2))
                self.df_save.show(10, truncate=False)

                # # df 是您的DataFrame
                # nan_count_df = self.df_save.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in self.df_save.columns])
                # nan_count_df.show()

                self.save_data()
                quit()
            else:
                self.handle_us_week_rank()
        # elif self.site_name == 'us' and self.date_type == 'month' and self.date_info >= '2023-09':
        #     quit()
        else:
            self.handle_st_rank()
            self.handle_st_duplicated()
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info2))
        self.df_save.show(10, truncate=False)

    def handle_st_import(self):
        print(f"导入关键词数据: {self.site_name}, {self.date_type}, {self.date_info}")
        if self.date_type in ['month_week', 'month']:
            # if self.date_type == 'month':
            os.system(f"/mnt/run_shell/sqoop_shell/import/ods_brand_analytics.sh {self.site_name} {self.date_type} {self.date_info}")
            for year_week in self.year_week_tuple:
                os.system(f"/mnt/run_shell/sqoop_shell/import/ods_brand_analytics.sh {self.site_name} week {year_week}")
        else:
            os.system(f"/mnt/run_shell/sqoop_shell/import/ods_brand_analytics.sh {self.site_name} {self.date_type} {self.date_info}")

    def handle_st_rank_old(self):
        self.df_st_rank = self.df_st.select("search_term", "rank", "date_info")

        self.df_st_current = self.df_st_current.withColumn("flag", F.lit(1))
        self.df_st_rank = self.df_st_rank.join(
            self.df_st_current.select("search_term", "flag"), on='search_term', how='left'
        )
        self.df_st_rank = self.df_st_rank.filter("flag is null")
        self.df_st_rank.show(10, truncate=False)

    def handle_st_rank(self):
        self.df_st_rank = self.df_st.select("search_term", "rank", "date_info")
        self.df_st_current = self.df_st_current.withColumn("flag", F.lit(1))
        # self.df_st_rank.show(10, truncate=False)
        self.df_st_rank = self.df_st_rank.join(
            self.df_st_current.select("search_term", "flag"), on='search_term', how='left'
        )
        self.df_st_rank = self.df_st_rank.filter("flag is null")
        self.df_st_rank.show(10, truncate=False)
        # count = self.df_st_current.count()  # 计算当前周/月关键词的数量
        df_count = self.df_st.groupby(['date_info']).count()
        # df_count.show(10, truncate=False)
        df_count = df_count.toPandas()
        date_dict = {date_info: count for date_info, count in zip(df_count.date_info, df_count['count'])}
        print("date_dict:", date_dict)
        self.df_st_rank = self.df_st_rank.groupby(['search_term']). \
            pivot("date_info").agg(F.mean("rank"))
        self.df_st_rank.show(10, truncate=False)
        self.df_st_rank = self.df_st_rank.fillna(date_dict)
        self.df_st_rank = self.df_st_rank.withColumn("rank_sum", F.lit(0))
        for col in date_dict.keys():
            print("col:", col)
            self.df_st_rank = self.df_st_rank.withColumn(
                "rank_sum", self.df_st_rank.rank_sum + self.df_st_rank[col]
            )
        self.df_st_rank = self.df_st_rank.withColumn(
            "rank_sum_avg", self.df_st_rank.rank_sum / len(self.date_info_tuple)
        )
        print("1111==============")
        self.df_st_rank.show(10, truncate=False)
        window = Window.orderBy(
            self.df_st_rank.rank_sum_avg.asc()
        )
        self.df_st_rank = self.df_st_rank.withColumn("rank_avg", F.row_number().over(window=window))
        self.df_st_rank = self.df_st_rank.drop("rank_sum", "rank_sum_avg")
        print("2222==============")
        self.df_st_rank.show(10, truncate=False)  # 这里都没有问题
        for col in date_dict.keys():
            self.df_st_rank = self.df_st_rank.drop(col)
        # self.df_st_rank.show(10, truncate=False)
        self.df_st_rank = self.df_st_rank.withColumnRenamed("rank_avg", "rank")
        # self.df_st_rank = self.df_st_rank.withColumn("rank", self.df_st_rank.rank+F.lit(self.df_st_current.rank.count()))

        df_max_rank = self.df_st_current.agg(F.max('rank').alias("max_rank"))
        df_max_rank.show(10, truncate=False)
        df_max_rank = df_max_rank.toPandas()
        max_rank = list(df_max_rank.max_rank)[0] if self.date_type not in ['4_week', 'last30day'] else 0
        max_rank = max_rank if self.df_st_current.count() != 0 else 0
        if self.date_type == 'last30day':
            self.df_st_rank = self.df_st_rank.fillna({'rank': 0})
        self.df_st_rank = self.df_st_rank.withColumn("rank", self.df_st_rank.rank+F.lit(max_rank))
        # print("self.df_st_rank:", self.df_st_rank.count())
        self.df_st_rank.show(10, truncate=False)

    def handle_st_duplicated(self):
        # 默认取最新一天的关键词数据
        window = Window.partitionBy(['search_term']).orderBy(
            self.df_st.date_info.desc()
        )
        self.df_st = self.df_st.withColumn("rank_top", F.row_number().over(window))
        self.df_st = self.df_st.filter("rank_top=1")
        self.df_st = self.df_st.drop("rank_top", "rank")
        self.df_save = self.df_st_rank.join(
            self.df_st, on='search_term', how='left'
        )
        # print("self.df_save:", self.df_save.count())
        # self.df_save.show(10, truncate=False)


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:day/week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
    handle_obj = OdsBrandAnalytics(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()