dwd_st_info.py 13.1 KB
"""
1. 计算上升词,热搜词,新出词
2. quantity_being_sold在售商品数
"""

import os
import sys

import pandas as pd

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import IntegerType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class DwdStInfo(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-1'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.date_info_last = str()
        self.date_info_last2 = str()
        self.db_save = f'dwd_st_info'
        self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_date = self.get_year_week_tuple()
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_st_info = self.spark.sql(f"select 1+1;")
        self.df_repeat = self.spark.sql(f"select 1+1;")
        self.df_st_asin_title = self.spark.sql(f"select 1+1;")
        self.df_st_info_current = self.spark.sql(f"select 1+1;")
        self.df_st_info_last = self.spark.sql(f"select 1+1;")  # 上周/月/季度
        self.df_st_info_last2 = self.spark.sql(f"select 1+1;")  # 上上周/月/季度
        self.df_st_info_duplicated = self.spark.sql(f"select 1+1;")  # 不在当前周/月/季度
        self.df_st_zr_page1_counts = self.spark.sql(f"select 1+1;")
        self.date_info_tuple = tuple()
        self.u_is_first = self.spark.udf.register("u_is_first", self.udf_is_first, IntegerType())
        self.u_is_ascending = self.spark.udf.register("u_is_ascending", self.udf_is_ascending, IntegerType())
        self.u_is_search = self.spark.udf.register("u_is_search", self.udf_is_search, IntegerType())
        self.u_is_title_appear = self.spark.udf.register("u_is_title_appear", self.udf_is_title_appear, IntegerType())
        self.reset_partitions(partitions_num=3)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.get_date_info_tuple()

    @staticmethod
    def udf_is_first(x):
        """针对flag字段判断是否为当前周新出的关键词"""
        if x:
            return 0
        else:
            return 1

    @staticmethod
    def udf_is_ascending(x):
        if x >= 0.5:
            return 1
        else:
            return 0

    @staticmethod
    def udf_is_search(x):
        if x >= 0.8:
            return 1
        else:
            return 0

    @staticmethod
    def udf_is_title_appear(search_term, title):
        if search_term.lower() in title.lower():
            return 1
        else:
            return 0

    def read_data(self):
        print("1.1 读取dim_st_info表")
        if self.date_type == '4_week':
            sql = f"select * from dim_st_info where (site_name='{self.site_name}' and date_type='month' and date_info in {self.date_info_tuple}) or " \
                  f"(site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}');"
        else:
            sql = f"select * from dim_st_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in {self.date_info_tuple};"
        print("sql:", sql)
        self.df_st_info = self.spark.sql(sql).cache()
        self.df_st_info.show(10)
        print("1.2 读取ods_rank_search_rate_repeat表")
        sql = f"select rank as st_rank_avg, search_num as st_search_sum, rate as st_search_rate, search_sum as st_search_sum " \
              f"from ods_rank_search_rate_repeat where site_name='{self.site_name}';"
        self.df_repeat = self.spark.sql(sql).cache()
        self.df_repeat.show(10)
        print("1.3 读取ods_search_term_rank_zr和ods_asin_detail表")
        # sql = f"select * from dim_st_asin_base_info left join" \
        #       f"where site_name='{self.site_name}' and dt in {self.year_week_tuple} and data_type='zr' and page=1 ;"
        # self.df_st_asin_base_info = self.spark.sql(sql).cache()
        # self.df_st_asin_base_info.show(10)
        sql = f"""
        SELECT a.search_term, a.asin, b.title  FROM ods_search_term_rank_zr a left join 
        ods_asin_detail b on a.asin=b.asin and a.dt=b.dt where a.site_name ='{self.site_name}' and b.site_name ='{self.site_name}' 
        and a.page=1 and a.dt in {self.year_week_tuple} and b.dt in {self.year_week_tuple}"""
        self.df_st_asin_title = self.spark.sql(sql).cache()
        self.df_st_asin_title = self.df_st_asin_title.drop_duplicates(['search_term', 'asin'])
        self.df_st_asin_title.show(10)

    def handle_data(self):
        self.handle_st_first()
        self.handle_st_ascending()
        self.handle_st_search()
        self.handle_st_asin_title()
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        self.df_save.show(10)

    def get_date_info_tuple(self):
        df_week_start = self.df_date.loc[(self.df_date.year_week == '2020-44')]
        id_start = list(df_week_start.id)[0] if list(df_week_start.id) else 0
        if self.date_type in ['week', '4_week']:
            df_week_current = self.df_date.loc[self.df_date.year_week == self.date_info]
        elif self.date_type == 'month':
            df_week_current = self.df_date.loc[self.df_date.year_month == self.date_info]
        elif self.date_type == 'quarter':
            df_week_current = self.df_date.loc[self.df_date.year_quarter == self.date_info]
        else:
            print("date_type输入错误, 退出")
            df_week_current = pd.DataFrame()
        id_current_max = max(list(df_week_current.id)) if list(df_week_current.id) else 0
        df_week_all = self.df_date.loc[(self.df_date.id >= id_start) & (self.df_date.id <= id_current_max)]
        if self.date_type == 'week':
            self.date_info_tuple = tuple(df_week_all.year_week)
        if self.date_type == "4_week":
            df_week_all = self.df_date.loc[(self.df_date.id >= id_start) & (self.df_date.id <= id_current_max - 5)]
            self.date_info_tuple = tuple(set(df_week_all.year_month))
        if self.date_type == 'month':
            self.date_info_tuple = tuple(set(df_week_all.year_month))
        if self.date_type == 'quarter':
            self.date_info_tuple = tuple(set(df_week_all.year_quarter))

    def handle_st_first(self):
        print("新出词(当前周/4周/月/季度,第1次出现)")
        # 匹配上周/月/季度
        df_week_current = self.df_date.loc[(self.df_date.year_week == self.date_info)]
        id_current = list(df_week_current.id)[0] if list(df_week_current.id) else 0
        id_last = id_current - 1
        id_last2 = id_current - 2
        df_week_last = self.df_date.loc[self.df_date.id == id_last]
        df_week_last2 = self.df_date.loc[self.df_date.id == id_last2]
        self.date_info_last = list(df_week_last.year_week)[0] if list(df_week_last.year_week) else ''
        self.date_info_last2 = list(df_week_last2.year_week)[0] if list(df_week_last2.year_week) else ''
        self.df_st_info_current = self.df_st_info.filter(f"date_info='{self.date_info}'")
        self.df_st_info_last = self.df_st_info.filter(f"date_info='{self.date_info_last}'").select("search_term", "st_rank_avg").withColumnRenamed("st_rank_avg", "st_rank_avg_last")
        self.df_st_info_last2 = self.df_st_info.filter(f"date_info='{self.date_info_last2}'")
        self.df_st_info_duplicated = self.df_st_info.filter(f"date_info!='{self.date_info}'")
        self.df_st_info_duplicated = self.df_st_info_duplicated.select('search_term').dropDuplicates(
            ['search_term']).withColumn("st_is_first_text", F.lit(0))
        self.df_st_info_current = self.df_st_info_current.join(
            self.df_st_info_duplicated, on='search_term', how='left'
        )
        self.df_st_info_current = self.df_st_info_current.fillna(
            {"st_is_first_text": 1}
        )
        self.df_st_info_current.show(10, truncate=False)

    def handle_st_ascending(self):
        print("上升词(相邻2周/月/季度,上升超过50%的排名)")
        self.df_st_info_current = self.df_st_info_current.join(
            self.df_st_info_last, on='search_term', how='left'
        )
        self.df_st_info_current = self.df_st_info_current.na.fill({'st_rank_avg_last': 0})
        self.df_st_info_current = self.df_st_info_current.withColumn(
            "st_is_ascending_text_rate",
            (self.df_st_info_current.st_rank_avg_last - self.df_st_info_current.st_rank_avg) / self.df_st_info_current.st_rank_avg_last)
        self.df_st_info_current = self.df_st_info_current.na.fill({'st_is_ascending_text_rate': -1})
        self.df_st_info_current = self.df_st_info_current.withColumn(
            "st_is_ascending_text", self.u_is_ascending(self.df_st_info_current.st_is_ascending_text_rate))
        # self.df_st_info_current.select("search_term", "st_rank", "st_rank_last", "st_is_ascending_text_rate", "st_is_ascending_text").show(10, truncate=False)
        self.df_st_info_current = self.df_st_info_current.drop("st_rank_avg_last")
        self.df_st_info_current.show(10, truncate=False)

    def handle_st_search(self):
        print("热搜词(历史出现占比>=80%)")
        df_counts = self.df_st_info.groupby(['search_term']).agg(F.count_distinct("date_info").alias("st_week_appear_counts"))
        df_distinct = self.df_st_info.drop_duplicates(["date_info"])
        df_distinct.select("search_term", "date_info").show(20, truncate=False)
        self.df_st_info_current = self.df_st_info_current.join(
            df_counts, on='search_term', how='left'
        )
        self.df_st_info_current = self.df_st_info_current.withColumn(f"st_week_counts", F.lit(len(df_distinct.to_pandas_on_spark().date_info.to_numpy())))
        self.df_st_info_current.show(20, truncate=False)
        self.df_st_info_current = self.df_st_info_current.withColumn(
            "st_is_search_text_rate",
            self.df_st_info_current[f"st_week_appear_counts"] / self.df_st_info_current[f"st_week_counts"])
        self.df_st_info_current = self.df_st_info_current.withColumn(
            "st_is_search_text", self.u_is_search(self.df_st_info_current.st_is_search_text_rate))
        self.df_st_info_current.show(10, truncate=False)

    def handle_st_quantity(self):
        pass

    def handle_st_asin_title(self):
        # 只针对page=1的zr类型数据进行统计
        self.df_st_asin_title = self.df_st_asin_title.withColumn(
            "st_asin_in_tile_flag", self.u_is_title_appear(self.df_st_asin_title.search_term, self.df_st_asin_title.title)
        )
        df_st_zr_page1_counts = self.df_st_asin_title.groupby("search_term").count()
        df_st_zr_page1_counts = df_st_zr_page1_counts.withColumnRenamed("count", "st_zr_page1_counts")
        df_st_zr_page1_in_title_counts = self.df_st_asin_title.filter("st_asin_in_tile_flag=1").groupby("search_term").count()
        df_st_zr_page1_in_title_counts = df_st_zr_page1_in_title_counts.withColumnRenamed("count", "st_zr_page1_in_title_counts")
        self.df_st_zr_page1_counts = df_st_zr_page1_counts.join(
            df_st_zr_page1_in_title_counts, on='search_term', how='left'
        )
        self.df_st_zr_page1_counts = self.df_st_zr_page1_counts.fillna(0)
        self.df_st_zr_page1_counts = self.df_st_zr_page1_counts.withColumn(
            "st_zr_page1_in_title_rate", self.df_st_zr_page1_counts.st_zr_page1_in_title_counts / self.df_st_zr_page1_counts.st_zr_page1_counts
        )
        self.df_st_zr_page1_counts.show(10, truncate=False)
        self.df_save = self.df_st_info_current.join(
            self.df_st_zr_page1_counts, on='search_term', how='left'
        )

    def handle_data_st_sold(self):
        print("在售商品数")
        if self.year >= 2022:
            df_quantity = self.df_brand_week.filter("quantity_being_sold>0").select("search_term", "quantity_being_sold")
            df_quantity = df_quantity.groupby(['search_term']).agg({"quantity_being_sold": "mean"})
            df_quantity = df_quantity.withColumnRenamed("avg(quantity_being_sold)", "st_quantity_being_sold")
            df_quantity = df_quantity.withColumn("st_quantity_being_sold", F.ceil(df_quantity.st_quantity_being_sold))  # 向上取整
            self.df_brand_current = self.df_brand_current.join(df_quantity, on='search_term', how='left')
            self.df_brand_current = self.df_brand_current.fillna({"st_quantity_being_sold": 0})
        else:
            self.df_brand_current = self.df_brand_current.withColumn("st_quantity_being_sold", F.lit(0))


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwdStInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()