dwt_st_asin_reverse.py 16.2 KB
import os
import sys

from pyspark.storagelevel import StorageLevel

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType


class DwTStAsinReverse(Templates):

    def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dwt_st_asin_reverse"
        self.spark = self.create_spark_object(app_name=f"{self.db_save}, {self.site_name}, {self.date_type}, {self.date_info}")
        self.get_date_info_tuple()
        self.get_year_week_tuple()
        self.get_year_month_days_dict(year=int(self.year))
        self.df_st_asin = self.spark.sql(f"select 1+1;")
        self.df_st_asin_flow = self.spark.sql(f"select 1+1;")
        self.df_st = self.spark.sql(f"select 1+1;")
        self.df_st_measure = self.spark.sql(f"select 1+1;")
        self.df_st_key = self.spark.sql(f"select 1+1;")
        self.df_st_brand_label = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_save_std = self.spark.sql(f"select * from {self.db_save} limit 0;")
        self.u_st_type = self.spark.udf.register('u_st_type', self.udf_st_type, StringType())
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        if self.date_type in ["week"]:
            self.reset_partitions(400)
        else:
            self.reset_partitions(1000)

    @staticmethod
    def udf_st_type(st_asin_zr_rate, zr_page1_flag, st_search_num, st_click_share_sum, st_conversion_share_sum):
        st_type_list = []
        if st_asin_zr_rate >= 0.05:
            st_type_list.append('1')  # 主要流量词
        if zr_page1_flag == 1:
            if st_search_num < 10000:
                st_type_list.append('2')  # 精准长尾词
            else:
                st_type_list.append('3')  # 精准流量词
        if st_click_share_sum > 0:
            if (st_conversion_share_sum - st_click_share_sum) / st_click_share_sum >= 0.2:
                st_type_list.append('4')  # 转化优质词
            else:
                st_type_list.append('5')  # 转化平稳词
            if (st_click_share_sum - st_conversion_share_sum) / st_click_share_sum >= 0.2:
                st_type_list.append('6')  # 转化流失词
        if st_conversion_share_sum > 0:
            st_type_list.append('7')  # 出单词
        if st_click_share_sum > 0 and st_conversion_share_sum == 0:
            st_type_list.append('8')  # 无效曝光词
        return ",".join(st_type_list) if st_type_list else ''

    def read_data(self):
        print("1 读取st+asin两个维度: dim_st_asin_info表和ods_rank_flow表")
        print("1.1 读取dim_st_asin_info表")
        # if (int(self.year) == 2022 and int(self.month) < 10) or int(self.year) <= 2021:
        #     sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
        # else:
        #     sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}"  #  测试: and date_info>='2023-01-19'
        if date_type in ['month', 'month_week'] and ((self.site_name == 'us' and date_info >= '2023-10') or (self.site_name in ['uk', 'de'] and self.date_info >= '2024-05')):
            sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}'"
        else:
            sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}"
        print("sql:", sql)
        self.df_st_asin = self.spark.sql(sqlQuery=sql)
        self.df_st_asin.persist(storageLevel=StorageLevel.MEMORY_ONLY)
        # self.df_st_asin = self.spark.sql(sqlQuery=sql).cache()
        self.df_st_asin = self.df_st_asin.withColumnRenamed("updated_time", "updated_at")
        self.df_st_asin.show(10, truncate=False)
        print("1.2 读取ods_rank_flow表")
        # sql = f"select rank as page_rank, flow from ods_rank_flow " \
        #       f"where site_name='{self.site_name}'"
        sql = f"select rank as st_asin_zr_page_rank, rank as st_asin_sp_page_rank, flow as st_asin_zr_rate, flow as st_asin_sp_rate from ods_rank_flow " \
              f"where site_name='{self.site_name}'"
        self.df_st_asin_flow = self.spark.sql(sql).cache()
        self.df_st_asin_flow.show(10, truncate=False)
        print("1.3 读取dim_st_detail表")
        sql = f"select search_term, st_rank, st_search_num, st_search_rate, st_search_sum, " \
              f"st_quantity_being_sold, st_click_share_sum, st_conversion_share_sum from dim_st_detail " \
              f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';"
        print("sql:", sql)
        self.df_st = self.spark.sql(sql).cache()
        self.df_st = self.df_st.fillna(0)
        self.df_st.show(10, truncate=False)
        print("1.4 读取dwd_st_measure表")
        sql = f"select search_term, st_adv_counts, st_ao_val, st_zr_page1_title_appear_rate as zr_page1_flag from dwd_st_measure " \
              f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';"
        print("sql:", sql)
        self.df_st_measure = self.spark.sql(sql).cache()
        self.df_st_measure.show(10, truncate=False)
        print("1.5 读取ods_st_key表")
        sql = f"select st_key, search_term from ods_st_key " \
              f"where site_name='{self.site_name}';"
        print("sql:", sql)
        self.df_st_key = self.spark.sql(sql).cache()
        self.df_st_key.show(10, truncate=False)
        print("1.6 读取dws_st_brand_info表")
        sql = f"select search_term, st_brand_label from dws_st_brand_info " \
              f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';"
        print("sql:", sql)
        self.df_st_brand_label = self.spark.sql(sql).cache()
        self.df_st_brand_label.show(10, truncate=False)

    def handle_data(self):
        self.handle_st_duplicated()
        self.handle_st_asin_pivot()
        self.handle_st_asin_orders()
        self.handle_st_type()
        self.handle_st_dtypes()
        # self.handle_st_current_page_asin_counts()
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        self.df_save = self.df_save.drop("zr_page1_flag", "st_click_share_sum", "st_conversion_share_sum")
        self.df_save = self.df_save_std.unionByName(self.df_save, allowMissingColumns=True)
        # self.df_save.show(20, truncate=False)
        print("cols:", self.df_save.columns)
        # quit()

    def handle_st_duplicated(self):
        print("2.2 根据search_term,asin,data_type进行去重, page_rank选择最小值")
        window = Window.partitionBy(['search_term', 'asin', 'data_type']).orderBy(
            self.df_st_asin.page_rank.asc(),
            self.df_st_asin.date_info.desc(),
        )
        self.df_st_asin = self.df_st_asin. \
            withColumn("page_rank_top", F.row_number().over(window=window))
        # print("self.df_st_asin_info, 开窗去重前:", self.df_st_asin_info.count())
        self.df_st_asin = self.df_st_asin.filter("page_rank_top=1")
        # print("self.df_st_asin_info, 开窗去重后:", self.df_st_asin_info.count())
        self.df_st_asin = self.df_st_asin.cache()
        # self.df_st_asin = self.df_st_asin.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
        # self.df_st_asin.show(10, truncate=False)

    def handle_st_asin_pivot(self):
        print(f"2.3 根据search_term和asin进行透视表")
        self.df_st_asin = self.df_st_asin. \
            withColumn("updated_at_data_type",
                       F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_updated_at"))). \
            withColumn("page_data_type",
                       F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page"))). \
            withColumn("page_row_data_type",
                       F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page_row"))). \
            withColumn("page_rank_data_type",
                       F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page_rank")))
        df1 = self.df_st_asin.select("search_term", "asin", "updated_at_data_type", "updated_at"). \
            withColumnRenamed("updated_at_data_type", "pivot_key"). \
            withColumnRenamed("updated_at", "pivot_value")
        df2 = self.df_st_asin.select("search_term", "asin", "page_data_type", "page")
        # page_row和page_rank: 只有zr,sp才有
        self.df_st_asin = self.df_st_asin.filter("data_type in ('zr', 'sp')")
        df3 = self.df_st_asin.select("search_term", "asin", "page_row_data_type", "page_row")
        df4 = self.df_st_asin.select("search_term", "asin", "page_rank_data_type", "page_rank")
        self.df_save = df1.union(df2).union(df3).union(df4)
        df_st_zr_counts = self.df_st_asin.filter("data_type='zr'").groupby(["search_term", "page"]).agg(
            F.max('page_row').alias("st_zr_current_page_asin_counts"))
        df_st_sp_counts = self.df_st_asin.filter("data_type='sp'").groupby(["search_term", "page"]).agg(
            F.max('page_row').alias("st_sp_current_page_asin_counts"))
        df_st_zr_counts = df_st_zr_counts.withColumnRenamed("page", "st_asin_zr_page")
        df_st_sp_counts = df_st_sp_counts.withColumnRenamed("page", "st_asin_sp_page")
        self.df_save = self.df_save.groupby(["search_term", "asin"]). \
            pivot(f"pivot_key").agg(F.min(f"pivot_value")). \
            join(self.df_st_asin_flow.select("st_asin_zr_page_rank", "st_asin_zr_rate"), on=["st_asin_zr_page_rank"], how="left"). \
            join(self.df_st_asin_flow.select("st_asin_sp_page_rank", "st_asin_sp_rate"), on=["st_asin_sp_page_rank"], how="left"). \
            join(self.df_st_measure, on=["search_term"], how="left"). \
            join(self.df_st_key, on=["search_term"], how="left"). \
            join(self.df_st_brand_label, on=["search_term"], how="left"). \
            join(self.df_st, on=["search_term"], how="inner").join(
            df_st_zr_counts, on=["search_term", "st_asin_zr_page"], how='left'
        ).join(df_st_sp_counts, on=["search_term", "st_asin_sp_page"], how='left')

        # join(self.df_st_measure, on=["search_term"], how="inner"). \
        #     join(self.df_st_key, on=["search_term"], how="inner"). \

        self.df_save = self.df_save.fillna(
            {
                "st_asin_zr_rate": 0,
                "st_asin_sp_rate": 0
            }
        )
        # 释放内存
        del self.df_st_asin
        self.df_save.persist(storageLevel=StorageLevel.MEMORY_ONLY)

    def handle_st_asin_orders(self):
        print("2.4 计算zr, sp预估销量")
        self.df_save = self.df_save.withColumn(
            "st_asin_zr_orders", F.ceil(self.df_save.st_asin_zr_rate * self.df_save.st_search_sum)
        ).withColumn(
            "st_asin_sp_orders", F.ceil(self.df_save.st_asin_sp_rate * self.df_save.st_search_sum)
        )
        self.df_save = self.df_save.withColumn(
            "asin_st_zr_orders", self.df_save.st_asin_zr_orders
        ).withColumn(
            "asin_st_sp_orders", self.df_save.st_asin_sp_orders
        )
        df_asin_st_zr_orders_sum = self.df_save.groupby(['asin']). \
            agg({"st_asin_zr_orders": "sum"})
        df_asin_st_sp_orders_sum = self.df_save.groupby(['asin']). \
            agg({"st_asin_sp_orders": "sum"})
        df_asin_st_zr_orders_sum = df_asin_st_zr_orders_sum.withColumnRenamed("sum(st_asin_zr_orders)", "asin_st_zr_orders_sum")
        df_asin_st_sp_orders_sum = df_asin_st_sp_orders_sum.withColumnRenamed("sum(st_asin_sp_orders)", "asin_st_sp_orders_sum")
        df_asin_st_zr_orders_sum = df_asin_st_zr_orders_sum.withColumn(f"is_zr_flag", F.lit(1))
        df_asin_st_sp_orders_sum = df_asin_st_sp_orders_sum.withColumn(f"is_sp_flag", F.lit(1))

        df_st_asin_zr_orders_sum = self.df_save.groupby(['search_term']). \
            agg({"st_asin_zr_orders": "sum"})
        df_st_asin_zr_orders_sum = df_st_asin_zr_orders_sum.withColumnRenamed("sum(st_asin_zr_orders)", "st_asin_zr_orders_sum")
        df_st_asin_zr_orders_sum = df_st_asin_zr_orders_sum.withColumn(f"is_zr_flag", F.lit(1))
        df_st_asin_sp_orders_sum = self.df_save.groupby(['search_term']). \
            agg({"st_asin_sp_orders": "sum"})
        df_st_asin_sp_orders_sum = df_st_asin_sp_orders_sum.withColumnRenamed("sum(st_asin_sp_orders)", "st_asin_sp_orders_sum")
        df_st_asin_sp_orders_sum = df_st_asin_sp_orders_sum.withColumn(f"is_sp_flag", F.lit(1))
        self.df_save = self.df_save.withColumn("is_zr_flag", F.when(self.df_save.st_asin_zr_page > 0, 1))
        self.df_save = self.df_save.withColumn("is_sp_flag", F.when(self.df_save.st_asin_sp_page > 0, 1))
        self.df_save = self.df_save. \
            join(df_asin_st_zr_orders_sum, on=['asin', "is_zr_flag"], how='left'). \
            join(df_asin_st_sp_orders_sum, on=['asin', "is_sp_flag"], how='left'). \
            join(df_st_asin_zr_orders_sum, on=['search_term', "is_zr_flag"], how='left'). \
            join(df_st_asin_sp_orders_sum, on=['search_term', "is_sp_flag"], how='left')
        self.df_save = self.df_save.withColumn(
            "st_asin_zr_flow", F.round(self.df_save.st_asin_zr_orders / self.df_save.st_asin_zr_orders_sum, 4)
        )
        self.df_save = self.df_save.withColumn(
            "st_asin_sp_flow", F.round(self.df_save.st_asin_sp_orders / self.df_save.st_asin_sp_orders_sum, 4)
        )
        self.df_save = self.df_save.withColumn(
            "asin_st_zr_flow", F.round(self.df_save.asin_st_zr_orders / self.df_save.asin_st_zr_orders_sum, 4)
        )
        self.df_save = self.df_save.withColumn(
            "asin_st_sp_flow", F.round(self.df_save.asin_st_sp_orders / self.df_save.asin_st_sp_orders_sum, 4)
        )
        self.df_save = self.df_save.drop("is_zr_flag", "is_sp_flag")
        print("self.df_save.columns:", self.df_save.columns)

    def handle_st_type(self):
        print("2.5 根据search_term,asin等信息进行计算关键词的分类情况")
        self.df_save = self.df_save.withColumn(
            "st_type", self.u_st_type(
                "st_asin_zr_rate", "zr_page1_flag", "st_search_num", "st_click_share_sum", "st_conversion_share_sum"
            )
        )

    def handle_st_dtypes(self):
        print("2.5 更改pivot之后的列的数据类型, 保持和hive的数据类型一致")
        for col in self.df_save.columns:
            if ("_page" in col) or ("_page_row" in col) or ("_page_rank" in col):
                print("col:", col)
                self.df_save = self.df_save.withColumn(col, self.df_save[f'{col}'].cast("int"))

    def handle_st_current_page_asin_counts(self):
        df_st_zr_counts = self.df_st_asin.filter("data_type='zr'").groupby(["search_term", "page"]).agg(F.max('page_row').alias("st_zr_current_page_asin_counts"))
        df_st_sp_counts = self.df_st_asin.filter("data_type='sp'").groupby(["search_term", "page"]).agg(F.max('page_row').alias("st_sp_current_page_asin_counts"))
        df_st_zr_counts = df_st_zr_counts.withColumnRenamed("page", "st_asin_zr_page")
        df_st_sp_counts = df_st_sp_counts.withColumnRenamed("page", "st_asin_sp_page")
        self.df_save = self.df_save.join(
            df_st_zr_counts, on=["search_term", "st_asin_zr_page"], how='left'
        ).join(
            df_st_sp_counts, on=["search_term", "st_asin_sp_page"], how='left'
        )


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwTStAsinReverse(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()