add_fileds.py 3 KB
"""
asinVarNum  bsrOrders bsrOrdersSale pageInventory asinBoughtMonth sellerJson
"""


from utils.templates import Templates
from yswg_utils.common_udf import udf_parse_seller_json


class AddFields(Templates):
    def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='lastest', topic_name="us_asin_detail", batch_size=100000):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.consumer_type = consumer_type  # 消费实时还是消费历史
        self.df_asin_bs = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.app_name = self.get_app_name()
        self.spark = self.create_spark_object(
            app_name=f"{self.app_name}")
        self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema)

    def read_data(self):
        # 计算bsrOrders
        print("1.2 读取ods_one_category_report表")
        if int(self.year) == 2022 and int(self.month) < 3:
            sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as asin_bsr_orders from ods_one_category_report " \
                  f"where site_name='{self.site_name}' and date_type='month' and date_info='2022-12';"
        else:
            sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as asin_bsr_orders from ods_one_category_report " \
                  f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';"
        print("sql:", sql)
        self.df_bs_report = self.spark.sql(sqlQuery=sql).cache()
        self.df_bs_report.show(10, truncate=False)

    # 处理配送方式
    def handle_asin_buy_box_seller_type(self, df):
        df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json))
        df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn(
            "account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed", "seller_json")
        return df

    def add_fields(self, df):
        # df 消费kafka数据的df对象
        df_save = df.join(
            self.df_asin_bs, on='node_id', how='left'
        )
        df_save = df_save.withColumn("asin_bsr_orders_sale", df_save.price * df_save.asin_bsr_orders)
        # 重命名
        df_save = df_save.withColumnRenamed("variat_num", "asinVarNum")
        df_save = df_save.withColumnRenamed("asin_bsr_orders", "bsrOrders")
        df_save = df_save.withColumnRenamed("asin_bsr_orders_sale", "bsrOrdersSale")
        df_save = df_save.withColumnRenamed("page_inventory", "pageInventory")
        df_save = df_save.withColumnRenamed("buy_sales", "asinBoughtMonth")
        df_save = self.handle_asin_buy_box_seller_type(df_save)
        df_save = df_save.withColumnRenamed("seller_json", "sellerJson")
        df_save = df_save.withColumnRenamed("buy_box_seller_type", "buyBoxSellerType")