dwd_st_asin_info_new.py 14.1 KB
"""
author: 方星钧(ffman)
description: 基于dim_st_asin_base_info等表,计算出search_term和asin维度的基础信息表(包括预估销量)
table_read_name: dim_st_asin_info, ods_rank_flow
table_save_name: dwd_st_asin_info
table_save_level: dwd
version: 3.0
created_date: 2022-05-12
updated_date: 2022-12-15
"""

import os
import sys

from pyspark.storagelevel import StorageLevel

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
#from AmazonSpider.pyspark_job.utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType


class DwdStAsinInfo(Templates):

    def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dwd_st_asin_info"
        self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}")
        self.df_date = self.get_year_week_tuple()
        self.df_save = self.spark.sql(f"select 1+1;")
        # self.df_save_std = self.spark.sql(f"select * from {self.db_save} limit 0;")
        self.df_st_asin = self.spark.sql(f"select 1+1;")
        self.df_rank_flow = self.spark.sql(f"select 1+1;")
        self.df_st = self.spark.sql(f"select 1+1;")
        self.df_zr_page1_counts = self.spark.sql(f"select 1+1;")
        self.week_counts = 1 if self.date_type == 'week' else len(self.year_week_tuple)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        if self.date_type in ["week"]:
            self.reset_partitions(200)
        elif self.date_type in ["month", "4_week"]:
            self.reset_partitions(400)
        elif self.date_type in ["quarter"]:
            self.reset_partitions(600)
        self.u_week_counts_flag = self.spark.udf.register('u_week_counts_flag', self.udf_week_counts_flag, IntegerType())
        self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType())
        self.u_st_type = self.spark.udf.register('u_st_type', self.udf_st_type, StringType())

    @staticmethod
    def udf_week_counts_flag(zr_page1_counts, week_counts):
        if zr_page1_counts == week_counts:
            return 1
        else:
            return 0

    @staticmethod
    def udf_year_week(dt):
        year, week = dt.split("-")[0], dt.split("-")[1]
        if int(week) < 10:
            return f"{year}-0{week}"
        else:
            return f"{year}-{week}"

    @staticmethod
    def udf_st_type(st_asin_zr_rate, zr_page1_flag, st_search_num, st_click_share1, st_conversion_share1, st_click_share2, st_conversion_share2, st_click_share3, st_conversion_share3):
        st_click_share_sum = st_click_share1 + st_click_share2 + st_click_share3
        st_conversion_share_sum = st_conversion_share1 + st_conversion_share2 + st_conversion_share3
        st_type_list = []
        if st_asin_zr_rate >= 0.05:
            st_type_list.append('1')  # 主要流量词
        if zr_page1_flag:
            if st_search_num < 10000:
                st_type_list.append('2')  # 精准长尾词
            else:
                st_type_list.append('3')  # 精准流量词
        if (st_conversion_share_sum - st_click_share_sum) / st_click_share_sum >= 0.2:
            st_type_list.append('4')  # 转化优质词
        else:
            st_type_list.append('5')  # 转化平稳词
        if (st_click_share_sum - st_conversion_share_sum) / st_click_share_sum >= 0.2:
            st_type_list.append('6')  # 转化流失词
        if st_conversion_share_sum > 0:
            st_type_list.append('7')  # 出单词
        if st_click_share_sum > 0 and st_conversion_share_sum == 0:
            st_type_list.append('8')  # 无效曝光词
        return ",".join(st_type_list) if st_type_list else ''

    def read_data(self):
        print("1.1 读取dim_st_asin_info表")
        sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}"
        self.df_st_asin = self.spark.sql(sqlQuery=sql).cache()
        self.df_st_asin = self.df_st_asin.withColumnRenamed("updated_time", "updated_at")
        self.df_st_asin.show(10, truncate=False)
        print("1.2 读取ods_rank_flow表")
        sql = f"select rank as st_asin_zr_page_rank, rank as st_asin_sp_page_rank, flow as st_asin_zr_rate, flow as st_asin_sp_rate from ods_rank_flow " \
              f"where site_name='{self.site_name}'"
        self.df_rank_flow = self.spark.sql(sql).cache()
        self.df_rank_flow.show(10, truncate=False)
        print("1.3 读取dim_st_info表")
        # sql = f"select search_term, st_rank, st_rank_avg, st_search_num, st_search_rate, st_search_sum from dim_st_info " \
        #       f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in {self.date_info_tuple};"
        sql = f"select search_term, st_rank, st_rank as st_rank_avg, st_search_num, st_search_rate, st_search_sum, " \
              f"st_click_share1, st_conversion_share1, st_click_share2, st_conversion_share2, st_click_share3, st_conversion_share3 from dim_st_detail " \
              f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';"
        print("sql:", sql)
        self.df_st = self.spark.sql(sql).cache()
        self.df_st = self.df_st.fillna(0)
        self.df_st.show(10, truncate=False)

    def handle_data(self):
        self.handle_st_zr_page1_counts()
        self.handle_st_duplicated()
        self.handle_st_asin_pivot()
        self.handle_st_asin_orders()
        self.handle_st_type()
        self.handle_st_dtypes()
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        # self.df_save.show(20, truncate=False)

    def handle_st_zr_page1_counts(self):
        print("2.1 计算zr类型下,关键词对应的asin在选择的历史周中page=1出现的次数,如果全出现=1,否则=0")
        self.df_zr_page1_counts = self.df_st_asin.filter("data_type='zr' and page=1").\
            groupby(['search_term', 'asin']).\
            agg(F.count_distinct("date_info").alias("zr_page1_counts"))
        self.df_zr_page1_counts = self.df_zr_page1_counts.withColumn("week_counts", F.lit(self.week_counts))
        self.df_zr_page1_counts = self.df_zr_page1_counts.withColumn(
            "zr_page1_flag", self.u_week_counts_flag("zr_page1_counts", "zr_page1_counts")
        )
        # self.df_zr_page1_counts.show(10, truncate=False)

    def handle_st_duplicated(self):
        print("2.2 根据search_term,asin,data_type进行去重, page_rank选择最小值")
        window = Window.partitionBy(['search_term', 'asin', 'data_type']).orderBy(
            self.df_st_asin.page_rank.asc(),
            self.df_st_asin.date_info.desc(),
        )
        self.df_st_asin = self.df_st_asin. \
            withColumn("page_rank_top", F.row_number().over(window=window))
        # print("self.df_st_asin, 开窗去重前:", self.df_st_asin_base_info.count())
        self.df_st_asin = self.df_st_asin.filter("page_rank_top=1")
        # print("self.df_st_asin, 开窗去重后:", self.df_st_asin.count())
        self.df_st_asin = self.df_st_asin.cache()
        # self.df_st_asin_base_info.show(10, truncate=False)

    def handle_st_asin_pivot(self):
        print(f"2.3 根据search_term和asin进行透视表")
        self.df_st_asin = self.df_st_asin. \
            withColumn("updated_at_data_type",
                       F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_updated_at"))). \
            withColumn("page_data_type",
                       F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page"))). \
            withColumn("page_row_data_type",
                       F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page_row"))). \
            withColumn("page_rank_data_type",
                       F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page_rank")))
        df1 = self.df_st_asin.select("search_term", "asin", "updated_at_data_type", "updated_at"). \
            withColumnRenamed("updated_at_data_type", "pivot_key"). \
            withColumnRenamed("updated_at", "pivot_value")
        df2 = self.df_st_asin.select("search_term", "asin", "page_data_type", "page")
        # page_row和page_rank: 只有zr,sp才有
        self.df_st_asin = self.df_st_asin.filter("data_type in ('zr', 'sp')")
        df3 = self.df_st_asin.select("search_term", "asin", "page_row_data_type", "page_row")
        df4 = self.df_st_asin.select("search_term", "asin", "page_rank_data_type", "page_rank")
        self.df_save = df1.union(df2).union(df3).union(df4)
        self.df_save = self.df_save.groupby(["search_term", "asin"]). \
            pivot(f"pivot_key").agg(F.min(f"pivot_value")). \
            join(self.df_zr_page1_counts, on=["search_term", "asin"], how="left"). \
            join(self.df_rank_flow.select("st_asin_zr_page_rank", "st_asin_zr_rate"), on=["st_asin_zr_page_rank"], how="left"). \
            join(self.df_rank_flow.select("st_asin_sp_page_rank", "st_asin_sp_rate"), on=["st_asin_sp_page_rank"], how="left"). \
            join(self.df_st_info, on=["search_term"], how="inner")  # ["search_term", "dt"]
        self.df_save = self.df_save.fillna(
            {
                "st_asin_zr_rate": 0,
                "st_asin_sp_rate": 0
            }
        )

    def handle_st_asin_orders(self):
        print("2.4 计算zr, sp预估销量")
        self.df_save = self.df_save.withColumn(
            "st_asin_zr_orders", F.ceil(self.df_save.st_asin_zr_rate * self.df_save.st_search_sum)
        ).withColumn(
            "st_asin_sp_orders", F.ceil(self.df_save.st_asin_sp_rate * self.df_save.st_search_sum)
        )
        self.df_save = self.df_save.withColumn(
            "asin_st_zr_orders", self.df_save.st_asin_zr_orders
        ).withColumn(
            "asin_st_sp_orders", self.df_save.st_asin_sp_orders
        )
        df_asin_st_zr_orders_sum = self.df_save.groupby(['asin']). \
            agg({"st_asin_zr_orders": "sum"})
        df_asin_st_sp_orders_sum = self.df_save.groupby(['asin']). \
            agg({"st_asin_sp_orders": "sum"})
        df_asin_st_zr_orders_sum = df_asin_st_zr_orders_sum.withColumnRenamed("sum(st_asin_zr_orders)", "asin_st_zr_orders_sum")
        df_asin_st_sp_orders_sum = df_asin_st_sp_orders_sum.withColumnRenamed("sum(st_asin_sp_orders)", "asin_st_sp_orders_sum")
        df_asin_st_zr_orders_sum = df_asin_st_zr_orders_sum.withColumn(f"is_zr_flag", F.lit(1))
        df_asin_st_sp_orders_sum = df_asin_st_sp_orders_sum.withColumn(f"is_sp_flag", F.lit(1))

        df_st_asin_zr_orders_sum = self.df_save.groupby(['search_term']). \
            agg({"st_asin_zr_orders": "sum"})
        df_st_asin_zr_orders_sum = df_st_asin_zr_orders_sum.withColumnRenamed("sum(st_asin_zr_orders)", "st_asin_zr_orders_sum")
        df_st_asin_zr_orders_sum = df_st_asin_zr_orders_sum.withColumn(f"is_zr_flag", F.lit(1))
        df_st_asin_sp_orders_sum = self.df_save.groupby(['search_term']). \
            agg({"st_asin_sp_orders": "sum"})
        df_st_asin_sp_orders_sum = df_st_asin_sp_orders_sum.withColumnRenamed("sum(st_asin_sp_orders)", "st_asin_sp_orders_sum")
        df_st_asin_sp_orders_sum = df_st_asin_sp_orders_sum.withColumn(f"is_sp_flag", F.lit(1))
        self.df_save = self.df_save.withColumn("is_zr_flag", F.when(self.df_save.st_asin_zr_page > 0, 1))
        self.df_save = self.df_save.withColumn("is_sp_flag", F.when(self.df_save.st_asin_sp_page > 0, 1))
        self.df_save = self.df_save. \
            join(df_asin_st_zr_orders_sum, on=['asin', "is_zr_flag"], how='left'). \
            join(df_asin_st_sp_orders_sum, on=['asin', "is_sp_flag"], how='left'). \
            join(df_st_asin_zr_orders_sum, on=['search_term', "is_zr_flag"], how='left'). \
            join(df_st_asin_sp_orders_sum, on=['search_term', "is_sp_flag"], how='left')
        self.df_save = self.df_save.withColumn(
            "st_asin_zr_flow", self.df_save.st_asin_zr_orders / self.df_save.st_asin_zr_orders_sum
        )
        self.df_save = self.df_save.withColumn(
            "st_asin_sp_flow", self.df_save.st_asin_sp_orders / self.df_save.st_asin_sp_orders_sum
        )
        self.df_save = self.df_save.withColumn(
            "asin_st_zr_flow", self.df_save.asin_st_zr_orders / self.df_save.asin_st_zr_orders_sum
        )
        self.df_save = self.df_save.withColumn(
            "asin_st_sp_flow", self.df_save.asin_st_sp_orders / self.df_save.asin_st_sp_orders_sum
        )
        self.df_save = self.df_save.drop("is_zr_flag", "is_sp_flag")
        print("self.df_save.columns:", self.df_save.columns)
        # self.df_save.show(10, truncate=False)

    def handle_st_type(self):
        print("2.5 根据search_term,asin等信息进行计算关键词的分类情况")
        self.df_save = self.df_save.withColumn(
            "st_type", self.u_st_type(
                "st_asin_zr_rate", "zr_page1_flag", "st_search_num", "st_click_share1", "st_conversion_share1",
                "st_click_share2", "st_conversion_share2", "st_click_share3", "st_conversion_share3"
            )
        )

    def handle_st_dtypes(self):
        print("2.5 更改pivot之后的列的数据类型, 保持和hive的数据类型一致")
        for col in self.df_save.columns:
            if ("_page" in col) or ("_page_row" in col) or ("_page_rank" in col):
                print("col:", col)
                self.df_save = self.df_save.withColumn(col, self.df_save[f'{col}'].cast("int"))


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-01
    handle_obj = DwdStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()