dwd_st_asin_measure_old.py 14.3 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType, IntegerType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class DwdStMeasure(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-1'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save_st_asin = f'dwd_st_asin_measure'
        self.db_save_st = f'dwd_st_measure'
        self.db_save_asin = f'dwd_asin_measure'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save_st_asin}, {self.db_save_st}, {self.db_save_asin}: {self.site_name}, {self.date_type}, {self.date_info}")
        # self.df_date = self.get_year_week_tuple()  # pandas的df对象
        self.get_date_info_tuple()
        self.df_st_detail = self.spark.sql(f"select 1+1;")
        self.df_st_asin = self.spark.sql(f"select 1+1;")
        self.df_st_asin_flow = self.spark.sql(f"select 1+1;")
        self.df_st_rate = self.spark.sql(f"select 1+1;")
        self.df_asin_history = self.spark.sql(f"select 1+1;")
        self.df_bs_report = self.spark.sql(f"select 1+1;")
        self.df_st_quantity = self.spark.sql(f"select 1+1;")
        self.df_st_asin_duplicated = self.spark.sql(f"select 1+1;")
        self.df_save_st_asin = self.spark.sql(f"select 1+1;")
        self.df_save_asin = self.spark.sql(f"select 1+1;")
        self.df_save_st = self.spark.sql(f"select 1+1;")
        self.partitions_num = 3
        self.reset_partitions(partitions_num=self.partitions_num)
        self.partitions_by = ['site_name', 'date_type', 'date_info']

    def read_data(self):
        print("1.1 读取dim_st_detail表")
        sql = f"select search_term, st_rank from dim_st_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in {self.date_info_tuple}"
        print("sql:", sql)
        self.df_st_detail = self.spark.sql(sqlQuery=sql).cache()
        # self.df_st_detail.show(10, truncate=False)
        print("1.1 读取dim_st_asin_info表")
        sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}"
        print("sql:", sql)
        self.df_st_asin = self.spark.sql(sqlQuery=sql).cache()
        self.df_st_asin = self.df_st_asin.drop_duplicates(["search_term", "asin", "data_type", "date_info"]).cache()
        self.df_st_asin_duplicated = self.df_st_asin.drop_duplicates(['search_term', 'asin']).cache()
        print("self.df_st_asin:", self.df_st_asin.count())
        print("self.df_st_asin_duplicated:", self.df_st_asin_duplicated.count())
        # self.df_st_asin.show(10, truncate=False)
        # self.df_asin = self.df_st_asin.select("asin").drop_duplicates(["asin"])
        # self.df_st = self.df_st_asin.select("search_term").drop_duplicates(["search_term"])
        print("1.2 读取ods_rank_flow表")
        sql = f"select rank as st_asin_zr_page_rank, rank as st_asin_sp_page_rank, flow as st_asin_zr_rate, flow as st_asin_sp_rate from ods_rank_flow " \
              f"where site_name='{self.site_name}'"
        self.df_st_asin_flow = self.spark.sql(sql).cache()
        # self.df_st_asin_flow.show(10, truncate=False)
        print("1.3 读取ods_rank_search_rate_repeat表")
        sql = f"select rank as st_rank, search_num as st_search_num, rate as st_search_rate, search_sum as st_search_sum " \
              f"from ods_rank_search_rate_repeat where site_name='{self.site_name}';"
        self.df_st_rate = self.spark.sql(sql).cache()
        # self.df_st_rate.show(10)
        # 1.4 获取asin的bs_id, 卖家, 店铺等
        print("1.4 读取dim_cal_asin_history_detail表")
        sql = f"select asin, asin_rank, bsr_cate_1_id, asin_title " \
              f"from dim_cal_asin_history_detail where site_name='{self.site_name}';"
        self.df_asin_history = self.spark.sql(sql).cache()
        # self.df_asin_history.show(10)
        # 1.5 ods_one_category_report
        print("1.5 读取ods_one_category_report表")
        sql = f"select cate_1_id as bsr_cate_1_id, rank as asin_rank, orders as asin_bsr_orders from ods_one_category_report " \
              f"where site_name='{self.site_name}' and dm='2022-11';"
        self.df_bs_report = self.spark.sql(sqlQuery=sql).cache()
        # self.df_bs_report.show(10, truncate=False)
        # 1.6 ods_st_quantity_being_sold
        print("1.6 读取ods_st_quantity_being_sold表")
        sql = f"select search_term, quantity_being_sold as st_quantity_being_sold from ods_st_quantity_being_sold " \
              f"where site_name='{self.site_name}'  and date_type='{self.date_type}' and date_info in {self.date_info_tuple};"
        self.df_st_quantity = self.spark.sql(sqlQuery=sql).cache()

    def save_data(self):
        self.reset_partitions(partitions_num=50)
        self.save_data_common(
            df_save=self.df_save_st_asin,
            db_save=self.db_save_st_asin,
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )
        self.reset_partitions(partitions_num=10)
        self.save_data_common(
            df_save=self.df_save_st,
            db_save=self.db_save_st,
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )
        self.reset_partitions(partitions_num=10)
        self.save_data_common(
            df_save=self.df_save_asin,
            db_save=self.db_save_asin,
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )

    def handle_st_zr_page1_title_rate(self):
        df_zr_page1 = self.df_st_asin.filter(
            "data_type='zr' and page=1"
        )
        df_zr_page1 = df_zr_page1.drop_duplicates(["search_term", "asin"])
        df_zr_page1 = df_zr_page1.join(
            self.df_asin_history
        )

    def handle_data(self):
        self.handle_st_info()
        self.handle_st_asin_info()
        self.df_save_asin = self.handle_st_asin_counts(cal_type="asin")
        self.df_save_st = self.handle_st_asin_counts(cal_type="st")
        self.df_save_st = self.df_save_st.join(
            self.df_st_detail, on=['search_term'], how='left'
        )
        self.handle_st_asin_ao()
        self.handle_st_asin_orders()
        self.handle_st_asin_bsr_orders()
        self.df_save_st_asin.show(10, truncate=False)
        self.df_save_st.show(10, truncate=False)
        self.df_save_asin.show(10, truncate=False)
        quit()

    def handle_st_info(self):
        # 处理在售商品数
        self.df_st_quantity = self.df_st_quantity.filter("st_quantity_being_sold > 0").groupby(['search_term']).agg(
            {"st_quantity_being_sold": "mean"}
        )
        self.df_st_quantity = self.df_st_quantity.withColumnRenamed(
            "avg(st_quantity_being_sold)", "st_quantity_being_sold"
        )
        self.df_st_detail = self.df_st_detail.join(
            self.df_st_quantity, on=["search_term"], how="left"
        ).join(
            self.df_st_rate, on=["st_rank"], how="left"
        )

    def handle_st_asin_info(self):
        # self.df_save_st_asin = self.df_st_asin.filter("data_type in ('zr', 'sp')").withColumn(
        self.df_save_st_asin = self.df_st_asin.withColumn(
            "page_rank_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page_rank"))
        )
        self.df_save_st_asin = self.df_save_st_asin.groupby(["search_term", "asin"]). \
            pivot("page_rank_data_type").agg(F.min(f"page_rank"))
        self.df_save_st_asin = self.df_save_st_asin. \
            join(self.df_st_asin_flow.select("st_asin_zr_page_rank", "st_asin_zr_rate"), on=["st_asin_zr_page_rank"],
                 how="left"). \
            join(self.df_st_asin_flow.select("st_asin_sp_page_rank", "st_asin_sp_rate"), on=["st_asin_sp_page_rank"],
                 how="left"). \
            join(self.df_st_detail, on=["search_term"], how="inner")
        # self.df_save_st_asin.show(10, truncate=False)

    def handle_st_asin_counts(self, cal_type="asin"):
        print(f"计算{cal_type}_counts")
        cal_type_complete = "search_term" if cal_type == "st" else cal_type
        self.df_st_asin = self.df_st_asin.withColumn(
            f"{cal_type}_data_type",
            F.concat(F.lit(f"{cal_type}_"), self.df_st_asin.data_type, F.lit(f"_counts"))
        )
        df = self.df_st_asin.groupby([f'{cal_type_complete}']). \
            pivot(f"{cal_type}_data_type").count()
        df = df.fillna(0)
        # df.show(10, truncate=False)
        df = df.withColumn(
            f"{cal_type}_sb_counts",
            df[f"{cal_type}_sb1_counts"] + df[f"{cal_type}_sb2_counts"] + df[f"{cal_type}_sb3_counts"]
        )
        df = df.withColumn(
            f"{cal_type}_adv_counts",
            df[f"{cal_type}_sb_counts"] + df[f"{cal_type}_sp_counts"]
        )
        df = df.withColumn(f"site_name", F.lit(self.site_name))
        df = df.withColumn(f"date_type", F.lit(self.date_type))
        df = df.withColumn(f"date_info", F.lit(self.date_info))
        # df.show(10, truncate=False)
        return df

    def handle_st_asin_ao(self):
        print("计算st和asin各自维度的ao")
        # asin_ao_val
        self.df_save_asin = self.df_save_asin.withColumn(
            "asin_ao_val", self.df_save_asin.asin_adv_counts / self.df_save_asin.asin_zr_counts
        )
        self.df_save_asin = self.df_save_asin.fillna({"asin_ao_val": 0})
        # st_ao_val和st_ao_val_rate
        df_asin_ao = self.df_save_asin.select("asin", "asin_ao_val")
        df_st_ao = self.df_save_st_asin.select("search_term", "asin").join(
            df_asin_ao, on=['asin'], how='left'
        )
        df_st_ao = df_st_ao.groupby(["search_term"]).agg({"asin_ao_val": "mean"})
        df_st_ao = df_st_ao.withColumnRenamed("avg(asin_ao_val)", "st_ao_val")
        self.df_save_st = self.df_save_st.join(
            df_st_ao, on=['search_term'], how='left'
        )
        window = Window.orderBy(self.df_save_st.st_ao_val.asc())
        self.df_save_st = self.df_save_st.withColumn("st_ao_val_rate", F.percent_rank().over(window=window))

    def handle_st_asin_bsr_orders(self):
        self.df_save_st_asin = self.df_save_st_asin.join(
            self.df_asin_history, on=['asin'], how='left'
        )
        self.df_save_st_asin = self.df_save_st_asin.join(
            self.df_bs_report, on=['asin_rank', 'bsr_cate_1_id'], how='left'
        )
        # self.df_st_asin_duplicated.show(10, truncate=False)
        df_st_bsr_orders = self.df_save_st_asin.groupby(['search_term']).agg({"asin_bsr_orders": "sum"})
        df_st_bsr_orders = df_st_bsr_orders.withColumnRenamed(
            "sum(asin_bsr_orders)", "st_bsr_orders"
        )
        df_asin_bsr_orders = self.df_save_st_asin.select("asin", "asin_bsr_orders").drop_duplicates(['asin'])
        # df_st_bsr_orders.show(10, truncate=False)
        # df_asin_bsr_orders.show(10, truncate=False)
        self.df_save_st = self.df_save_st.join(
            df_st_bsr_orders, on='search_term', how='left'
        )
        self.df_save_asin = self.df_save_asin.join(
            df_asin_bsr_orders, on='asin', how='left'
        )

    def handle_st_asin_orders(self):
        print("计算zr, sp预估销量")
        # 1. st+asin维度的zr和sp预估销量
        self.df_save_st_asin = self.df_save_st_asin.withColumn(
            "st_asin_zr_orders",
            F.ceil(self.df_save_st_asin.st_asin_zr_rate * self.df_save_st_asin.st_search_sum)
        ).withColumn(
            "st_asin_sp_orders",
            F.ceil(self.df_save_st_asin.st_asin_sp_rate * self.df_save_st_asin.st_search_sum)
        )
        # self.df_save_st_asin.show(10, truncate=False)
        self.df_save_st_asin = self.df_save_st_asin.select(
            "search_term", "asin", "st_asin_zr_orders", "st_asin_sp_orders",
        )
        self.df_save_st_asin = self.df_save_st_asin.groupby(["search_term", "asin"]).agg(
            {
                "st_asin_zr_orders": "mean",
                "st_asin_sp_orders": "mean",
            }
        )
        self.df_save_st_asin = self.df_save_st_asin.withColumnRenamed(
            "avg(st_asin_zr_orders)", "st_asin_zr_orders"
        ).withColumnRenamed(
            "avg(st_asin_sp_orders)", "st_asin_sp_orders"
        )
        self.df_save_st_asin = self.df_save_st_asin.withColumn(f"site_name", F.lit(self.site_name))
        self.df_save_st_asin = self.df_save_st_asin.withColumn(f"date_type", F.lit(self.date_type))
        self.df_save_st_asin = self.df_save_st_asin.withColumn(f"date_info", F.lit(self.date_info))
        print("self.df_save_st_asin:", self.df_save_st_asin.count())
        # self.df_save_st_asin.show(10, truncate=False)
        # 2. st维度的zr和sp预估销量
        df_st_orders = self.df_save_st_asin.groupby(['search_term']).agg(
            {
                "st_asin_zr_orders": "sum",
                "st_asin_sp_orders": "sum",
            }
        )
        df_st_orders = df_st_orders.withColumnRenamed(
            "sum(st_asin_zr_orders)", "st_zr_orders"
        ).withColumnRenamed(
            "sum(st_asin_sp_orders)", "st_sp_orders"
        )
        self.df_save_st = self.df_save_st.join(
            df_st_orders, on=['search_term'], how='left'
        )
        # 3. asin维度的zr和sp预估销量
        df_asin_orders = self.df_save_st_asin.groupby(['asin']).agg(
            {
                "st_asin_zr_orders": "mean",
                "st_asin_sp_orders": "mean",
            }
        )
        df_asin_orders = df_asin_orders.withColumnRenamed(
            "avg(st_asin_zr_orders)", "asin_zr_orders"
        ).withColumnRenamed(
            "avg(st_asin_sp_orders)", "asin_sp_orders"
        )
        self.df_save_asin = self.df_save_asin.join(
            df_asin_orders, on=['asin'], how='left'
        )


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:day/week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwdStMeasure(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()