""" asinVarNum bsrOrders bsrOrdersSale pageInventory asinBoughtMonth sellerJson """ from utils.templates import Templates from yswg_utils.common_udf import udf_parse_seller_json class AddFields(Templates): def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='lastest', topic_name="us_asin_detail", batch_size=100000): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.consumer_type = consumer_type # 消费实时还是消费历史 self.df_asin_bs = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") self.app_name = self.get_app_name() self.spark = self.create_spark_object( app_name=f"{self.app_name}") self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema) def read_data(self): # 计算bsrOrders print("1.2 读取ods_one_category_report表") if int(self.year) == 2022 and int(self.month) < 3: sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as asin_bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='2022-12';" else: sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as asin_bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';" print("sql:", sql) self.df_bs_report = self.spark.sql(sqlQuery=sql).cache() self.df_bs_report.show(10, truncate=False) # 处理配送方式 def handle_asin_buy_box_seller_type(self, df): df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json)) df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn( "account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed", "seller_json") return df def add_fields(self, df): # df 消费kafka数据的df对象 df_save = df.join( self.df_asin_bs, on='node_id', how='left' ) df_save = df_save.withColumn("asin_bsr_orders_sale", df_save.price * df_save.asin_bsr_orders) # 重命名 df_save = df_save.withColumnRenamed("variat_num", "asinVarNum") df_save = df_save.withColumnRenamed("asin_bsr_orders", "bsrOrders") df_save = df_save.withColumnRenamed("asin_bsr_orders_sale", "bsrOrdersSale") df_save = df_save.withColumnRenamed("page_inventory", "pageInventory") df_save = df_save.withColumnRenamed("buy_sales", "asinBoughtMonth") df_save = self.handle_asin_buy_box_seller_type(df_save) df_save = df_save.withColumnRenamed("seller_json", "sellerJson") df_save = df_save.withColumnRenamed("buy_box_seller_type", "buyBoxSellerType")