dwt_keepa_asin_bsr_rank.py 13.4 KB
"""
author: wangrui
description: 根据ods_keepa_asin_bsr_rank得到最小产品线市场数据汇总
table_read_name: ods_keepa_asin_bsr_rank\dim_asin_detail
table_save_name: dwt_keepa_asin_bsr_rank
table_save_level: dwt
version: 2.0
created_date: 2023-08-22
updated_date: 2023-08-22
"""

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql import functions as F
from pyspark.sql import Window
from yswg_utils.common_df import get_node_first_id_df
from pyspark.sql.types import IntegerType
from utils.db_util import DBUtil
from utils.spark_util import SparkUtil
from yswg_utils.common_udf import udf_parse_amazon_orders


class DwtAsinBsrRank(Templates):

    def __init__(self, site_name="us", date_type="month", date_info="2023-01", run_type=1):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.run_type = int(run_type)
        self.db_save_summary = f"dwt_keepa_asin_bsr_rank"
        self.db_save_detail = f"dws_keepa_asin_bsr_rank"
        self.spark = self.create_spark_object(
            app_name=f"keepa_asin_bsr_rank {self.site_name} {self.date_type} {self.date_info}")
        self.year, self.month = self.date_info.split('-')
        self.get_year_month_days_dict(year=int(self.year))
        self.complete_date_info_tuple = self.get_complete_week_tuple()
        self.orders_transform_rate = self.get_orders_transform_rate()
        self.df_save_summary = self.spark.sql(f"select 1+1;")
        self.df_save_detail = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_keepa_asin = self.spark.sql(f"select 1+1;")
        self.df_asin_new_cate = self.spark.sql(f"select 1+1;")
        self.df_inv_asin = self.spark.sql(f"select 1+1;")
        self.df_inv_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_complete_inv_asin = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(60)
        self.u_parse_amazon_orders = F.udf(udf_parse_amazon_orders, IntegerType())

    def get_complete_week_tuple(self):
        self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;")
        df = self.df_date.toPandas()
        df_loc = df.loc[(df.year_month == f"{self.date_info}") & (df.week_day == 1)]
        return tuple(df_loc.year_week)

    def get_orders_transform_rate(self):
        month_days = self.year_month_days_dict[int(self.month)]
        if self.date_type in ['day', 'week']:
            if self.date_type == 'day':
                return 1 / month_days
            if self.date_type == 'week':
                return 7 / month_days
        else:
            return 1

    def read_data(self):
        print("1. 读取dim_asin_detail, 获取上个月所有asin信息")
        if self.run_type == 1:
            sql = f"""
                            select 
                                asin,
                                rank,
                                price,
                                node_id as cat_id,
                                category,
                                buy_sales,
                                created_at as dt
                            from ods_asin_detail where site_name='{self.site_name}' and date_type='month' and date_info = '{self.date_info}' and node_id is not null;         
                    """
        else:
            sql = f"""
                    select 
                        asin,
                        rank,
                        price,
                        node_id as cat_id,
                        category,
                        buy_sales,
                        created_at as dt
                    from ods_asin_detail where site_name='{self.site_name}' and date_type='month_week' and date_info = '{self.date_info}' and node_id is not null;         
            """
        print("sql:", sql)
        self.df_asin_detail = self.spark.sql(sqlQuery=sql).repartition(60).cache()
        self.df_asin_detail.show(20, truncate=False)
        print("2. 读取内部asin信息")
        sql = f"select asin, 1 as is_self_asin from us_self_asin where delete_time is null group by asin "
        print("sql:", sql)
        mysql_con_info = DBUtil.get_connection_info(db_type='mysql', site_name=self.site_name)
        if mysql_con_info is not None:
            self.df_self_asin = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con_info['url'],
                                                          pwd=mysql_con_info['pwd'],
                                                          username=mysql_con_info['username'],
                                                          query=sql).cache()
            self.df_self_asin.show(20, truncate=False)

        # 读取dim_bsr_category_tree 新的类目树 获取一级分类id
        self.df_asin_new_cate = get_node_first_id_df(self.site_name, self.spark)
        self.df_asin_new_cate = self.df_asin_new_cate.withColumnRenamed("node_id", "cat_id")
        self.df_asin_new_cate = self.df_asin_new_cate.withColumnRenamed("category_first_id", "cate_1_id").cache()
        self.df_asin_new_cate.show(20, truncate=False)
        print("3. 读取inv_asin数据")
        sql = f"""
            select asin, 1 as is_inner from us_inv_asin group by asin
        """
        if mysql_con_info is not None:
            self.df_inv_asin = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con_info['url'],
                                                         pwd=mysql_con_info['pwd'],
                                                         username=mysql_con_info['username'],
                                                         query=sql).cache()
            self.df_inv_asin.show(20, truncate=False)
        print("4. 读取self_asin_detail")
        sql = f"""
            select asin ,node_id as cat_id from us_self_asin_detail where  asin_type like '%%11%%' and created_at >= DATE_FORMAT(CURDATE(), '%Y-%m-01') group by asin, node_id
        """
        if mysql_con_info is not None:
            self.df_inv_asin_detail = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con_info['url'],
                                                         pwd=mysql_con_info['pwd'],
                                                         username=mysql_con_info['username'],
                                                         query=sql).cache()
            self.df_inv_asin_detail.show(20, truncate=False)

    def handle_inv_asin_cat(self):
        self.df_inv_asin = self.df_inv_asin.join(
            self.df_inv_asin_detail, on=['asin'], how='left'
        )
        self.df_inv_asin = self.df_inv_asin.filter((F.col("cat_id").isNotNull()) & (F.col("cat_id") != '')).cache()
        self.df_keepa_asin = self.df_inv_asin.select("cat_id", "is_inner")

    def handle_asin_detail(self):
        self.df_asin_detail = self.df_asin_detail.repartition(60)
        window = Window.partitionBy(['asin']).orderBy(
            self.df_asin_detail.dt.desc()
        )
        self.df_asin_detail = self.df_asin_detail.withColumn('dt_rank', F.row_number().over(window=window))
        self.df_asin_detail = self.df_asin_detail.filter("dt_rank = 1")
        self.df_asin_detail = self.df_asin_detail.drop("dt_rank")
        self.df_asin_detail = self.df_asin_detail.withColumn("orders",
                                                             self.u_parse_amazon_orders(self.df_asin_detail.buy_sales))
        self.df_asin_detail = self.df_asin_detail.join(
            self.df_self_asin, on=['asin'], how='left'
        )
        self.df_asin_detail = self.df_asin_detail.filter((F.col("is_self_asin") != 1) | (F.col("is_self_asin").isNull())).drop("is_self_asin").cache()

    def handle_data_join(self):
        self.df_asin_detail = self.df_asin_detail.repartition(60)
        self.df_asin_detail = self.df_asin_detail.join(
            self.df_keepa_asin, on=['cat_id'], how='left'
        ).join(
            self.df_asin_new_cate, on=['cat_id'], how='left'
        )
        self.df_asin_detail = self.df_asin_detail.drop_duplicates(['asin']).cache()
        self.df_asin_detail = self.df_asin_detail.na.fill({"is_inner": 2})

    def handle_keepa_asin_detail(self):
        type_expr = """
                CASE
                    WHEN 
                        (price is not null and price < 7 and orders >= 150) or 
                        (price >= 7 and price < 15 and orders >= 100) or 
                        (price >= 15 and price < 20 and orders >= 80) or
                        (price >= 20 and price < 35 and orders >= 40) or
                        (price >= 35 and price < 50 and orders >= 30) or
                        (price >= 50 and orders >= 20) or
                        (price is null and orders >=50) 
                    THEN
                        1
                    ELSE
                        2
                END
        """
        self.df_asin_detail = self.df_asin_detail.withColumn('type', F.expr(type_expr))
        self.df_save_detail = self.df_asin_detail.select("cat_id", "asin", "rank", "price", "cate_1_id", "orders",
                                                         "type", "category").cache()

    def handle_keepa_asin_summary(self):
        self.df_save_summary = self.df_asin_detail.select("cat_id", "orders", "type", "is_inner").cache()
        self.df_save_summary = self.df_save_summary.groupby(['cat_id']).agg(
            F.first(F.col("is_inner")).alias("is_inner"),
            F.sum(F.col("orders")).alias("orders_sum"),
            F.sum(F.when(F.col('type') == 1, F.col("orders")).otherwise(F.lit(0))).alias("success_orders_sum"),
            F.sum(F.when(F.col('type') == 2, F.col("orders")).otherwise(F.lit(0))).alias("fail_orders_sum"),
            F.sum(F.when(F.col('type') == 1, 1).otherwise(F.lit(0))).alias("success_num"),
            F.sum(F.when(F.col('type') == 2, 1).otherwise(F.lit(0))).alias("fail_num"),
            F.sum(F.when(F.col("orders") >= 50, 1).otherwise(0)).alias("50_qty_asin_count")
        )

        df_category_info = self.df_asin_detail.select("cat_id", "category", "dt")
        df_with_category_info = df_category_info.filter(F.col("category").isNotNull())
        window = Window.partitionBy(['cat_id']).orderBy(
            df_with_category_info.dt.desc()
        )
        df_category_info = df_with_category_info.withColumn("c_rank", F.row_number().over(window=window))
        df_new_category_info = df_category_info.filter("c_rank = 1")
        df_new_category_info = df_new_category_info.drop("c_rank", "dt")
        self.df_save_summary = self.df_save_summary.join(
            df_new_category_info, on=['cat_id'], how='left'
        ).cache()

    def handle_data_group(self):
        self.df_save_detail = self.df_save_detail.withColumn("created_time", F.date_format(F.current_timestamp(),
                                                                                           'yyyy-MM-dd HH:mm:SS')). \
            withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \
            withColumn("string_field1", F.lit("null")). \
            withColumn("string_field2", F.lit("null")). \
            withColumn("string_field3", F.lit("null")). \
            withColumn("int_field1", F.lit(0)). \
            withColumn("int_field2", F.lit(0)). \
            withColumn("int_field3", F.lit(0)). \
            withColumn("site_name", F.lit(self.site_name)). \
            withColumn("date_type", F.lit(self.date_type)). \
            withColumn("date_info", F.lit(self.date_info))

        self.df_save_summary = self.df_save_summary.withColumn("create_time", F.date_format(F.current_timestamp(),
                                                                                             'yyyy-MM-dd HH:mm:SS')). \
            withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \
            withColumn("string_field1", F.lit("null")). \
            withColumn("string_field2", F.lit("null")). \
            withColumn("string_field3", F.lit("null")). \
            withColumn("int_field1", F.lit(0)). \
            withColumn("int_field2", F.lit(0)). \
            withColumn("int_field3", F.lit(0)). \
            withColumn("site_name", F.lit(self.site_name)). \
            withColumn("date_type", F.lit(self.date_type)). \
            withColumn("date_info", F.lit(self.date_info))

    def handle_data(self):
        self.read_data()
        self.handle_inv_asin_cat()
        self.handle_asin_detail()
        self.handle_data_join()
        self.handle_keepa_asin_detail()
        self.handle_keepa_asin_summary()
        self.handle_data_group()

    def save_data(self):
        self.save_data_common(
            df_save=self.df_save_detail,
            db_save=self.db_save_detail,
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )
        self.save_data_common(
            df_save=self.df_save_summary,
            db_save=self.db_save_summary,
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    run_type = sys.argv[4]
    handle_obj = DwtAsinBsrRank(site_name=site_name, date_type=date_type, date_info=date_info, run_type=run_type)
    handle_obj.run()