dwt_st_info.py 11.7 KB
"""
1. 热搜词,上升词,新出词,在售商品数等
2. 预估销量
3. bs销量, bs的category_id
4,st_ao_val
"""
"""
author: 方星钧(ffman)
description: 基于dwd层等表,计算出search_term和asin维度的基础信息表(包括预估销量)
table_read_name: dwd_st_counts系列, dwd_st_info系列, dwd_st_asin_info系列, dwd_asin_bs_info
table_save_name: dwt_st_info系列
table_save_level: dwt
version: 2.0
created_date: 2022-06-20
updated_date: 2022-12-25
"""

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType
from sqlalchemy import create_engine
import pandas as pd


class DwtStInfo(Templates):

    def __init__(self, site_name="us", date_type="week", date_info="2022-01"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dwt_st_info"
        self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}")
        self.df_date = self.get_year_week_tuple()
        self.get_date_info_tuple()
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_bs_report = self.spark.sql(f"select 1+1;")
        self.df_st_detail = self.spark.sql(f"select 1+1;")
        self.df_st_counts = self.spark.sql(f"select 1+1;")
        self.df_st_asin_info = self.spark.sql(f"select 1+1;")
        self.df_st_asin_zr = self.spark.sql(f"select 1+1;")
        self.df_st_key = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(1)
        self.u_is_title_appear = self.spark.udf.register("u_is_title_appear", self.udf_is_title_appear, IntegerType())
        print("self.date_info_tuple:", self.date_info_tuple)
        self.current_date = self.date_info_tuple[0]
        print("self.current_date:", self.current_date)

    @staticmethod
    def udf_is_title_appear(search_term, title):
        if str(search_term).lower() in str(title).lower():
            return 1
        else:
            return 0

    def read_data(self):
        print("1.1 读取asin维度: dim_cal_asin_history_detail表")
        sql = f"select asin, bsr_cate_1_id, bsr_cate_current_id as st_asin_bs_cate_current_id, " \
              f"asin_rank, asin_title, asin_launch_time, asin_price as asin1_price, asin_rating as asin1_rating, " \
              f"asin_total_comments as asin1_total_comments " \
              f"from dim_cal_asin_history_detail " \
              f"where site_name='{self.site_name}';"
        print("sql:", sql)
        self.df_asin_detail = self.spark.sql(sql).cache()
        self.df_asin_detail.show(10)
        print("1.2 读取bsr维度: ods_one_category_report表")
        sql = f"select cate_1_id as bsr_cate_1_id, rank as asin_rank, orders as asin_bs_orders from ods_one_category_report " \
              f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';"
        print("sql:", sql)
        self.df_bs_report = self.spark.sql(sqlQuery=sql).cache()
        self.df_bs_report.show(10)
        print("1.3 读取dim_st_detail系列表")
        sql = f"select * from dim_st_detail " \
              f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';"
        print("sql:", sql)
        self.df_st_detail = self.spark.sql(sql).cache()
        self.df_st_detail.show(10, truncate=False)
        print("1.4 读取dwd_st_counts系列表")
        sql = f"select search_term, st_ao_val, st_ao_val_rank, st_ao_val_rate, st_zr_counts, st_sp_counts, " \
              f"st_sb_counts, st_sb1_counts, st_sb2_counts, st_sb3_counts, st_adv_counts, " \
              f"st_ac_counts, st_bs_counts, st_er_counts, st_tr_counts " \
              f" from dwd_st_counts " \
              f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';"
        print("sql:", sql)
        self.df_st_counts = self.spark.sql(sql).cache()
        self.df_st_counts.show(10, truncate=False)
        print("1.5 读取dwd_st_asin_info系列表")
        sql = f"select search_term, asin, st_asin_zr_orders as st_asin_orders, st_asin_zr_orders_sum as st_asin_orders_sum from dwd_st_asin_info " \
              f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';"
        print("sql:", sql)
        self.df_st_asin_info = self.spark.sql(sql).cache()
        self.df_st_asin_info.show(10, truncate=False)
        sql = f"select search_term, asin from ods_search_term_zr " \
              f"where site_name ='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple} and page=1"
        self.df_st_asin_zr = self.spark.sql(sql).cache()
        self.df_st_asin_zr = self.df_st_asin_zr.drop_duplicates(['search_term', 'asin'])
        self.df_st_asin_zr.show(10)
        print("1.6 读取ods_st_key表")
        sql = f"select st_key, search_term from ods_st_key " \
              f"where site_name='{self.site_name}';"
        print("sql:", sql)
        self.df_st_key = self.spark.sql(sql).cache()
        self.df_st_key.show(10, truncate=False)

    def handle_data(self):
        self.handle_join()
        self.handle_data_asin_new()
        self.handle_data_st_orders()
        self.handle_data_asin_detail()
        self.handle_st_asin_zr_title()
        self.df_save = self.df_st_detail
        print(self.df_save.columns)
        self.df_save.show(10)
        self.df_save = self.df_save.drop("st_updated_time", "st_bsr_cate_1_id_new", "st_bsr_cate_current_id_new")
        # quit()

    def handle_join(self):
        # st维度
        self.df_st_detail = self.df_st_detail.join(
            self.df_st_counts, on="search_term", how="left"
        ).join(
            self.df_st_key, on="search_term", how="left"
        )
        # asin维度
        self.df_asin_detail = self.df_asin_detail.join(
            self.df_bs_report, on=['asin_rank', 'bsr_cate_1_id'], how='left'
        )
        # self.df_asin_detail = self.df_asin_detail.drop("bsr_cate_1_id")
        # st+asin维度
        self.df_st_asin_zr = self.df_st_asin_zr.join(
            self.df_asin_detail.select("asin", "asin_title"), on='asin', how='left'
        )

    def handle_data_asin_new(self):
        """
        1. 对self.df_asin_bs_info对象,选择asin最新一周的数据,并删掉不需要的字段
        2. 获取asin
        """
        # 获取新品的判定
        self.df_asin_detail = self.df_asin_detail.withColumn("current_date", F.lit(self.date_info_tuple[-1]))
        self.df_asin_detail = self.df_asin_detail.withColumn("days_diff", F.datediff("current_date", "asin_launch_time"))
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "asin_new_flag",
            F.when(
                self.df_asin_detail.days_diff > 180, 0
            ).when(
                self.df_asin_detail.days_diff > 0, 1
            ).otherwise(2)
        )
        self.df_asin_detail.show(10, truncate=False)

    def handle_data_st_orders(self):
        """
        计算关键词维度的st_asin_bs_orders_sum和st_asin_orders_sum
        """
        self.df_st_asin_info = self.df_st_asin_info.join(
            self.df_asin_detail.select("asin", "asin_bs_orders", "asin_new_flag"), on="asin", how="left"
        )

        # df_st_search_sum = self.df_st_asin_info.groupby(['search_term']). \
        #     agg({"st_search_sum": "max"})
        # df_st_search_sum = df_st_search_sum.withColumnRenamed("max(st_search_sum)", "st_search_sum")
        self.df_st_asin_info = self.df_st_asin_info.withColumnRenamed("asin_bs_orders", "st_asin_bs_orders")
        df_st_asin_bs_orders_sum = self.df_st_asin_info.groupby(['search_term']). \
            agg({"st_asin_bs_orders": "sum"})
        df_st_asin_bs_orders_sum = df_st_asin_bs_orders_sum.withColumnRenamed("sum(st_asin_bs_orders)",
                                                                              "st_asin_bs_orders_sum")
        df_st_asin_orders_sum = self.df_st_asin_info.groupby(['search_term']). \
            agg({"st_asin_orders_sum": "max", "asin": "count"})
        # df_st_asin_orders_sum.show(10, truncate=False)
        df_st_asin_orders_sum = df_st_asin_orders_sum.withColumnRenamed("max(st_asin_orders_sum)", "st_asin_orders_sum")
        df_st_asin_orders_sum = df_st_asin_orders_sum.withColumnRenamed("count(asin)", "st_asin_counts")
        df_st_asin_new_orders_sum = self.df_st_asin_info.filter("asin_new_flag = 1").groupby(['search_term']). \
            agg({"st_asin_orders": "sum", "asin": "count"})
        df_st_asin_new_orders_sum = df_st_asin_new_orders_sum.withColumnRenamed("sum(st_asin_orders)", "st_asin_new_orders_sum")
        df_st_asin_new_orders_sum = df_st_asin_new_orders_sum.withColumnRenamed("count(asin)", "st_asin_new_counts")
        # df_st_asin_new_orders_sum.show(10, truncate=False)
        self.df_st_detail = self.df_st_detail.join(
            df_st_asin_bs_orders_sum, on="search_term", how="left"
        ).join(
            df_st_asin_orders_sum, on="search_term", how="left"
        ).join(
            df_st_asin_new_orders_sum, on="search_term", how="left"
        )
        self.df_st_detail = self.df_st_detail.withColumn("st_asin_new_orders_rate", self.df_st_detail.st_asin_new_orders_sum/self.df_st_detail.st_asin_orders_sum)
        self.df_st_detail = self.df_st_detail.withColumn("st_asin_new_counts_rate", self.df_st_detail.st_asin_new_counts/self.df_st_detail.st_asin_counts)

    def handle_data_asin_detail(self):
        self.df_st_detail = self.df_st_detail.join(
            self.df_asin_detail.select("asin", "asin_bs_orders", "asin1_price", "asin1_rating", "asin1_total_comments").withColumnRenamed("asin", "st_top_asin1"), on="st_top_asin1", how="left"
        ).withColumnRenamed("asin_bs_orders", "st_asin1_bs_orders")

    def handle_st_asin_zr_title(self):
        self.df_st_asin_zr = self.df_st_asin_zr.withColumn(
            "st_asin_in_title_flag",
            self.u_is_title_appear(self.df_st_asin_zr.search_term, self.df_st_asin_zr.asin_title)
        )
        df_st_zr_page1_counts = self.df_st_asin_zr.groupby("search_term").count()
        df_st_zr_page1_counts = df_st_zr_page1_counts.withColumnRenamed("count", "st_zr_page1_counts")

        df_st_zr_page1_in_title_counts = self.df_st_asin_zr.filter("st_asin_in_title_flag=1").groupby(
            "search_term").count()
        df_st_zr_page1_in_title_counts = df_st_zr_page1_in_title_counts.withColumnRenamed("count", "st_zr_page1_in_title_counts")
        df_st_zr_page1_counts = df_st_zr_page1_counts.join(
            df_st_zr_page1_in_title_counts, on='search_term', how='left'
        )
        df_st_zr_page1_counts = df_st_zr_page1_counts.fillna(0)
        df_st_zr_page1_counts = df_st_zr_page1_counts.withColumn(
            "st_zr_page1_in_title_rate", df_st_zr_page1_counts.st_zr_page1_in_title_counts / df_st_zr_page1_counts.st_zr_page1_counts
        )
        df_st_zr_page1_counts.show(10, truncate=False)
        self.df_st_detail = self.df_st_detail.join(
            df_st_zr_page1_counts, on='search_term', how='left'
        )


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtStInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()