""" author: wangrui description: 根据ods_keepa_asin_bsr_rank得到最小产品线市场数据汇总 table_read_name: ods_keepa_asin_bsr_rank\dim_asin_detail table_save_name: dwt_keepa_asin_bsr_rank table_save_level: dwt version: 2.0 created_date: 2023-08-22 updated_date: 2023-08-22 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql import functions as F from pyspark.sql import Window from yswg_utils.common_df import get_node_first_id_df from pyspark.sql.types import IntegerType from utils.db_util import DBUtil from utils.spark_util import SparkUtil from yswg_utils.common_udf import udf_parse_amazon_orders class DwtAsinBsrRank(Templates): def __init__(self, site_name="us", date_type="month", date_info="2023-01", run_type=1): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.run_type = int(run_type) self.db_save_summary = f"dwt_keepa_asin_bsr_rank" self.db_save_detail = f"dws_keepa_asin_bsr_rank" self.spark = self.create_spark_object( app_name=f"keepa_asin_bsr_rank {self.site_name} {self.date_type} {self.date_info}") self.year, self.month = self.date_info.split('-') self.get_year_month_days_dict(year=int(self.year)) self.complete_date_info_tuple = self.get_complete_week_tuple() self.orders_transform_rate = self.get_orders_transform_rate() self.df_save_summary = self.spark.sql(f"select 1+1;") self.df_save_detail = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_keepa_asin = self.spark.sql(f"select 1+1;") self.df_asin_new_cate = self.spark.sql(f"select 1+1;") self.df_inv_asin = self.spark.sql(f"select 1+1;") self.df_inv_asin_detail = self.spark.sql(f"select 1+1;") self.df_complete_inv_asin = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(60) self.u_parse_amazon_orders = F.udf(udf_parse_amazon_orders, IntegerType()) def get_complete_week_tuple(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;") df = self.df_date.toPandas() df_loc = df.loc[(df.year_month == f"{self.date_info}") & (df.week_day == 1)] return tuple(df_loc.year_week) def get_orders_transform_rate(self): month_days = self.year_month_days_dict[int(self.month)] if self.date_type in ['day', 'week']: if self.date_type == 'day': return 1 / month_days if self.date_type == 'week': return 7 / month_days else: return 1 def read_data(self): print("1. 读取dim_asin_detail, 获取上个月所有asin信息") if self.run_type == 1: sql = f""" select asin, rank, price, node_id as cat_id, category, buy_sales, created_at as dt from ods_asin_detail where site_name='{self.site_name}' and date_type='month' and date_info = '{self.date_info}' and node_id is not null; """ else: sql = f""" select asin, rank, price, node_id as cat_id, category, buy_sales, created_at as dt from ods_asin_detail where site_name='{self.site_name}' and date_type='month_week' and date_info = '{self.date_info}' and node_id is not null; """ print("sql:", sql) self.df_asin_detail = self.spark.sql(sqlQuery=sql).repartition(60).cache() self.df_asin_detail.show(20, truncate=False) print("2. 读取内部asin信息") sql = f"select asin, 1 as is_self_asin from us_self_asin where delete_time is null group by asin " print("sql:", sql) mysql_con_info = DBUtil.get_connection_info(db_type='mysql', site_name=self.site_name) if mysql_con_info is not None: self.df_self_asin = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con_info['url'], pwd=mysql_con_info['pwd'], username=mysql_con_info['username'], query=sql).cache() self.df_self_asin.show(20, truncate=False) # 读取dim_bsr_category_tree 新的类目树 获取一级分类id self.df_asin_new_cate = get_node_first_id_df(self.site_name, self.spark) self.df_asin_new_cate = self.df_asin_new_cate.withColumnRenamed("node_id", "cat_id") self.df_asin_new_cate = self.df_asin_new_cate.withColumnRenamed("category_first_id", "cate_1_id").cache() self.df_asin_new_cate.show(20, truncate=False) print("3. 读取inv_asin数据") sql = f""" select asin, 1 as is_inner from us_inv_asin group by asin """ if mysql_con_info is not None: self.df_inv_asin = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con_info['url'], pwd=mysql_con_info['pwd'], username=mysql_con_info['username'], query=sql).cache() self.df_inv_asin.show(20, truncate=False) print("4. 读取self_asin_detail") sql = f""" select asin ,node_id as cat_id from us_self_asin_detail where asin_type like '%%11%%' and created_at >= DATE_FORMAT(CURDATE(), '%Y-%m-01') group by asin, node_id """ if mysql_con_info is not None: self.df_inv_asin_detail = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con_info['url'], pwd=mysql_con_info['pwd'], username=mysql_con_info['username'], query=sql).cache() self.df_inv_asin_detail.show(20, truncate=False) def handle_inv_asin_cat(self): self.df_inv_asin = self.df_inv_asin.join( self.df_inv_asin_detail, on=['asin'], how='left' ) self.df_inv_asin = self.df_inv_asin.filter((F.col("cat_id").isNotNull()) & (F.col("cat_id") != '')).cache() self.df_keepa_asin = self.df_inv_asin.select("cat_id", "is_inner") def handle_asin_detail(self): self.df_asin_detail = self.df_asin_detail.repartition(60) window = Window.partitionBy(['asin']).orderBy( self.df_asin_detail.dt.desc() ) self.df_asin_detail = self.df_asin_detail.withColumn('dt_rank', F.row_number().over(window=window)) self.df_asin_detail = self.df_asin_detail.filter("dt_rank = 1") self.df_asin_detail = self.df_asin_detail.drop("dt_rank") self.df_asin_detail = self.df_asin_detail.withColumn("orders", self.u_parse_amazon_orders(self.df_asin_detail.buy_sales)) self.df_asin_detail = self.df_asin_detail.join( self.df_self_asin, on=['asin'], how='left' ) self.df_asin_detail = self.df_asin_detail.filter((F.col("is_self_asin") != 1) | (F.col("is_self_asin").isNull())).drop("is_self_asin").cache() def handle_data_join(self): self.df_asin_detail = self.df_asin_detail.repartition(60) self.df_asin_detail = self.df_asin_detail.join( self.df_keepa_asin, on=['cat_id'], how='left' ).join( self.df_asin_new_cate, on=['cat_id'], how='left' ) self.df_asin_detail = self.df_asin_detail.drop_duplicates(['asin']).cache() self.df_asin_detail = self.df_asin_detail.na.fill({"is_inner": 2}) def handle_keepa_asin_detail(self): type_expr = """ CASE WHEN (price is not null and price < 7 and orders >= 150) or (price >= 7 and price < 15 and orders >= 100) or (price >= 15 and price < 20 and orders >= 80) or (price >= 20 and price < 35 and orders >= 40) or (price >= 35 and price < 50 and orders >= 30) or (price >= 50 and orders >= 20) or (price is null and orders >=50) THEN 1 ELSE 2 END """ self.df_asin_detail = self.df_asin_detail.withColumn('type', F.expr(type_expr)) self.df_save_detail = self.df_asin_detail.select("cat_id", "asin", "rank", "price", "cate_1_id", "orders", "type", "category").cache() def handle_keepa_asin_summary(self): self.df_save_summary = self.df_asin_detail.select("cat_id", "orders", "type", "is_inner").cache() self.df_save_summary = self.df_save_summary.groupby(['cat_id']).agg( F.first(F.col("is_inner")).alias("is_inner"), F.sum(F.col("orders")).alias("orders_sum"), F.sum(F.when(F.col('type') == 1, F.col("orders")).otherwise(F.lit(0))).alias("success_orders_sum"), F.sum(F.when(F.col('type') == 2, F.col("orders")).otherwise(F.lit(0))).alias("fail_orders_sum"), F.sum(F.when(F.col('type') == 1, 1).otherwise(F.lit(0))).alias("success_num"), F.sum(F.when(F.col('type') == 2, 1).otherwise(F.lit(0))).alias("fail_num"), F.sum(F.when(F.col("orders") >= 50, 1).otherwise(0)).alias("50_qty_asin_count") ) df_category_info = self.df_asin_detail.select("cat_id", "category", "dt") df_with_category_info = df_category_info.filter(F.col("category").isNotNull()) window = Window.partitionBy(['cat_id']).orderBy( df_with_category_info.dt.desc() ) df_category_info = df_with_category_info.withColumn("c_rank", F.row_number().over(window=window)) df_new_category_info = df_category_info.filter("c_rank = 1") df_new_category_info = df_new_category_info.drop("c_rank", "dt") self.df_save_summary = self.df_save_summary.join( df_new_category_info, on=['cat_id'], how='left' ).cache() def handle_data_group(self): self.df_save_detail = self.df_save_detail.withColumn("created_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \ withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \ withColumn("string_field1", F.lit("null")). \ withColumn("string_field2", F.lit("null")). \ withColumn("string_field3", F.lit("null")). \ withColumn("int_field1", F.lit(0)). \ withColumn("int_field2", F.lit(0)). \ withColumn("int_field3", F.lit(0)). \ withColumn("site_name", F.lit(self.site_name)). \ withColumn("date_type", F.lit(self.date_type)). \ withColumn("date_info", F.lit(self.date_info)) self.df_save_summary = self.df_save_summary.withColumn("create_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \ withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \ withColumn("string_field1", F.lit("null")). \ withColumn("string_field2", F.lit("null")). \ withColumn("string_field3", F.lit("null")). \ withColumn("int_field1", F.lit(0)). \ withColumn("int_field2", F.lit(0)). \ withColumn("int_field3", F.lit(0)). \ withColumn("site_name", F.lit(self.site_name)). \ withColumn("date_type", F.lit(self.date_type)). \ withColumn("date_info", F.lit(self.date_info)) def handle_data(self): self.read_data() self.handle_inv_asin_cat() self.handle_asin_detail() self.handle_data_join() self.handle_keepa_asin_detail() self.handle_keepa_asin_summary() self.handle_data_group() def save_data(self): self.save_data_common( df_save=self.df_save_detail, db_save=self.db_save_detail, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) self.save_data_common( df_save=self.df_save_summary, db_save=self.db_save_summary, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 run_type = sys.argv[4] handle_obj = DwtAsinBsrRank(site_name=site_name, date_type=date_type, date_info=date_info, run_type=run_type) handle_obj.run()