import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.templates import Templates from utils.common_util import CommonUtil from datetime import datetime, timedelta from pyspark.sql import functions as F from pyspark.sql.types import StringType from utils.db_util import DBUtil class DwtUserStoreCollectionsInfo(Templates): def __init__(self, site_name, date_type, date_info, run_type, seller_id_tuple): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.run_type = run_type self.seller_id_tuple = seller_id_tuple self.db_save = "dwt_user_store_collections_info" if self.run_type == 'real_time': self.seller_id_tuple = str(self.seller_id_tuple).split(',') self.spark = self.create_spark_object( app_name=f"{self.db_save}:{self.site_name}-{self.date_type}-{self.date_info}") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(100) self.previous_date_info = self.get_previous_date_info() # df初始化 self.df_store_asin_detail = self.spark.sql("select 1+1;") self.df_previous_store_asin_detail = self.spark.sql("select 1+1;") self.df_user_store_collections_info = self.spark.sql("select 1+1;") # udf注册 self.u_judge_store_label = self.spark.udf.register('u_judge_store_label', self.udf_judge_store_label, StringType()) @staticmethod def udf_judge_store_label(high_quantity_num, standard_ao_num, total_num, new_asin_sales_surge_num, old_asin_sales_surge_num): if total_num > 0 and old_asin_sales_surge_num > 0: if high_quantity_num / total_num >= 0.8 and standard_ao_num / total_num >= 0.5 and new_asin_sales_surge_num / old_asin_sales_surge_num >= 0.5: return 'A' elif high_quantity_num / total_num >= 0.5 and standard_ao_num / total_num >= 0.4 and new_asin_sales_surge_num / old_asin_sales_surge_num >= 0.4: return 'B' elif high_quantity_num / total_num >= 0.3 and standard_ao_num / total_num >= 0.3 and new_asin_sales_surge_num / old_asin_sales_surge_num >= 0.3: return 'C' elif high_quantity_num / total_num >= 0.1 and standard_ao_num / total_num >= 0.2 and new_asin_sales_surge_num / old_asin_sales_surge_num >= 0.2: return 'D' else: return 'E' else: return 'E' def get_previous_date_info(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;") df = self.df_date.toPandas() df_loc = df.loc[df.date == f'{self.date_info}'] current_date_id = list(df_loc.id)[0] previous_date_id = int(current_date_id) - 1 df_loc = df.loc[df.id == previous_date_id] previous_date = list(df_loc.date)[0] return previous_date def read_data(self): print("1. 读取店铺收藏下asin详情") if self.run_type == 'real_time': hdfs_path = "/home/big_data_selection/tmp/user_collect_store_asin_detail_tmp/*.parquet" self.df_store_asin_detail = self.spark.read.parquet(hdfs_path) else: sql = f""" select seller_id, store_name, store_location, store_crawl_time, store_asin_total_num, asin, asin_price, asin_rating, asin_total_comments, asin_ao_val, is_standard_ao, asin_bsr_rank, asin_bsr_orders, category_first_id, category_first_name, category_id, category_name, parent_asin, asin_type, is_raise_asin, is_popular_asin, is_high_quantity_asin, is_sales_surge_asin from dws_user_collect_store_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' """ print("sql=", sql) self.df_store_asin_detail = self.spark.sql(sqlQuery=sql).cache() print("2. 读取店铺收藏上个维度的asin详情") if self.run_type == 'real_time': sql1 = f""" select seller_id, asin, asin_price as previous_asin_price, asin_bsr_rank as previous_asin_bsr_rank from dws_user_collect_store_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.previous_date_info}' and seller_id """ query_store = ', '.join([f"'{value}'" for value in self.seller_id_tuple]) sql2 = f" in ({query_store})" sql = sql1 + sql2 else: sql = f""" select seller_id, asin, asin_price as previous_asin_price, asin_bsr_rank as previous_asin_bsr_rank from dws_user_collect_store_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.previous_date_info}' """ print("sql=", sql) self.df_previous_store_asin_detail = self.spark.sql(sqlQuery=sql).cache() def handle_asin_change(self): self.df_store_asin_detail = self.df_store_asin_detail.join( self.df_previous_store_asin_detail, on=['seller_id', 'asin'], how='left' ) # 判断asin价格是否上涨 self.df_store_asin_detail = self.df_store_asin_detail.withColumn( "is_asin_price_raise", F.when(F.col("asin_price") - F.col("previous_asin_price") > 0, F.lit(1)).otherwise(F.lit(0)) ) # 判断asin价格是否下跌 self.df_store_asin_detail = self.df_store_asin_detail.withColumn( "is_asin_price_decline", F.when(F.col("asin_price") - F.col("previous_asin_price") < 0, F.lit(1)).otherwise(F.lit(0)) ) # 判断asin的bsr排名是否上升超过一倍 self.df_store_asin_detail = self.df_store_asin_detail.withColumn( "is_asin_rank_raise", F.when((F.col("previous_asin_bsr_rank").isNotNull()) & ( (F.col("asin_bsr_rank") - F.col("previous_asin_bsr_rank")) / F.col("previous_asin_bsr_rank") <= -0.5), F.lit(1)).otherwise(F.lit(0)) ) # 判断asin的bsr排名是否下降超过一倍 self.df_store_asin_detail = self.df_store_asin_detail.withColumn( "is_asin_rank_decline", F.when((F.col("previous_asin_bsr_rank").isNotNull()) & ( (F.col("asin_bsr_rank") - F.col("previous_asin_bsr_rank")) / F.col("previous_asin_bsr_rank") >= 0.5), F.lit(1)).otherwise(F.lit(0)) ) self.df_store_asin_detail = self.df_store_asin_detail.drop("previous_asin_price", "previous_asin_bsr_rank") def handle_data_group(self): # 获取多数量占比 df_variant_ratio = self.df_store_asin_detail.select("seller_id", "asin", "parent_asin").withColumn( "parent_asin", F.when(F.col("parent_asin").isNull(), F.col("asin")).otherwise(F.col("parent_asin"))) df_variant_ratio = df_variant_ratio.groupby(['seller_id', 'parent_asin']).agg( F.count('asin').alias("asin_son_count") ) df_variant_ratio = df_variant_ratio.withColumn("is_variant_flag", F.when(F.col("asin_son_count") > 1, F.lit(1))) df_variant_ratio = df_variant_ratio.groupby(['seller_id']).agg( F.sum("is_variant_flag").alias("store_more_variant_num"), F.count("parent_asin").alias("store_variant_asin_total") ) df_variant_ratio = df_variant_ratio.withColumn( "store_page20_variant_rate", F.round(F.col("store_more_variant_num") / F.col("store_variant_asin_total"), 4)) df_variant_ratio = df_variant_ratio.drop("store_more_variant_num", "store_variant_asin_total") self.df_user_store_collections_info = self.df_store_asin_detail.groupby(['seller_id']).agg( F.first("store_name").alias("store_name"), F.first("store_asin_total_num").alias("store_asin_total_num"), F.count("asin").alias("store_page20_asin_total_num"), F.count(F.when(F.col("asin_type") == 1, True)).alias("store_page20_new_asin_total_num"), F.count(F.when(F.col("asin_type") == 2, True)).alias("store_page20_old_asin_total_num"), F.round(F.avg("asin_price"), 4).alias("store_page20_asin_avg_price"), F.round(F.avg("asin_rating"), 4).alias("store_page20_asin_avg_rating"), F.round(F.avg("asin_total_comments"), 4).alias("store_page20_asin_avg_comments"), F.count(F.when(F.col("is_raise_asin") == 1, True)).alias("store_page20_raise_asin_num"), F.count(F.when(F.col("is_popular_asin") == 1, True)).alias("store_page20_popular_asin_num"), F.count(F.when(F.col("is_high_quantity_asin") == 1, True)).alias("store_page20_high_quantity_asin_num"), F.count(F.when((F.col("is_sales_surge_asin") == 1) & (F.col("asin_type") == 1), True)).alias( "store_page20_new_asin_sales_surge_num"), F.count(F.when((F.col("is_sales_surge_asin") == 1) & (F.col("asin_type") == 2), True)).alias( "store_page20_old_asin_sales_surge_num"), F.first("store_location").alias("store_location"), F.first("store_crawl_time").alias("store_crawl_time"), F.count(F.when(F.col("is_standard_ao") == 1, True)).alias("standard_ao_num"), F.sum("is_asin_price_raise").alias("store_page20_price_raise_asin_num"), F.sum("is_asin_price_decline").alias("store_page20_price_decline_asin_num"), F.sum("is_asin_rank_raise").alias("store_page20_rank_raise_asin_num"), F.sum("is_asin_rank_decline").alias("store_page20_rank_decline_asin_num") ) self.df_user_store_collections_info = self.df_user_store_collections_info.withColumn( "store_label_type", self.u_judge_store_label(F.col("store_page20_high_quantity_asin_num"), F.col("standard_ao_num"), F.col("store_page20_asin_total_num"), F.col("store_page20_new_asin_sales_surge_num"), F.col("store_page20_old_asin_sales_surge_num")) ) self.df_user_store_collections_info = self.df_user_store_collections_info.drop("standard_ao_num") self.df_user_store_collections_info = self.df_user_store_collections_info.withColumn( "store_page20_new_asin_num_percent", F.round(F.col("store_page20_new_asin_total_num") / F.col("store_page20_asin_total_num"), 4)) self.df_user_store_collections_info = self.df_user_store_collections_info.withColumn( "store_page20_old_asin_num_percent", F.round(F.col("store_page20_old_asin_total_num") / F.col("store_page20_asin_total_num"), 4)) df_store_raise_asin = self.df_store_asin_detail.filter("is_raise_asin=1").groupby(['seller_id']).agg( F.concat_ws(',', F.collect_list("asin")).alias("store_page20_raise_asin") ) df_store_popular_asin = self.df_store_asin_detail.filter("is_popular_asin=1").groupby(['seller_id']).agg( F.concat_ws(',', F.collect_list("asin")).alias("store_page20_popular_asin") ) df_store_high_quantity_asin = self.df_store_asin_detail.filter("is_high_quantity_asin=1").groupby( ['seller_id']).agg( F.concat_ws(',', F.collect_list("asin")).alias("store_page20_high_quantity_asin") ) df_store_new_asin_sales_surge = self.df_store_asin_detail.filter( (F.col("is_sales_surge_asin") == 1) & (F.col("asin_type") == 1)).groupby(['seller_id']).agg( F.concat_ws(',', F.collect_list("asin")).alias("store_page20_new_asin_sales_surge") ) df_store_old_asin_sales_surge = self.df_store_asin_detail.filter( (F.col("is_sales_surge_asin") == 1) & (F.col("asin_type") == 2)).groupby(['seller_id']).agg( F.concat_ws(',', F.collect_list("asin")).alias("store_page20_old_asin_sales_surge") ) df_store_price_raise_asin = self.df_store_asin_detail.filter("is_asin_price_raise=1").groupby( ['seller_id']).agg( F.concat_ws(',', F.collect_list("asin")).alias("store_page20_price_raise_asin") ) df_store_price_decline_asin = self.df_store_asin_detail.filter("is_asin_price_decline=1").groupby( ['seller_id']).agg( F.concat_ws(',', F.collect_list("asin")).alias("store_page20_price_decline_asin") ) df_store_rank_raise_asin = self.df_store_asin_detail.filter("is_asin_rank_raise=1").groupby( ['seller_id']).agg( F.concat_ws(',', F.collect_list("asin")).alias("store_page20_rank_raise_asin") ) df_store_rank_decline_asin = self.df_store_asin_detail.filter("is_asin_rank_decline=1").groupby( ['seller_id']).agg( F.concat_ws(',', F.collect_list("asin")).alias("store_page20_rank_decline_asin") ) self.df_user_store_collections_info = self.df_user_store_collections_info.join( df_variant_ratio, on=['seller_id'], how='left' ).join( df_store_raise_asin, on=['seller_id'], how='left' ).join( df_store_popular_asin, on=['seller_id'], how='left' ).join( df_store_high_quantity_asin, on=['seller_id'], how='left' ).join( df_store_new_asin_sales_surge, on=['seller_id'], how='left' ).join( df_store_old_asin_sales_surge, on=['seller_id'], how='left' ).join( df_store_price_raise_asin, on=['seller_id'], how='left' ).join( df_store_price_decline_asin, on=['seller_id'], how='left' ).join( df_store_rank_raise_asin, on=['seller_id'], how='left' ).join( df_store_rank_decline_asin, on=['seller_id'], how='left' ) df_store_seller_num_info = self.df_user_store_collections_info.select("seller_id", "store_page20_asin_total_num") df_store_asin_category_id_info = self.df_store_asin_detail.filter("category_id is not null").groupby( ['seller_id', 'category_id']).agg( F.count("asin").alias("asin_count"), F.first("category_name").alias("en_name") ) df_store_asin_category_id_info = df_store_asin_category_id_info.join( df_store_seller_num_info, on=['seller_id'], how='left' ) df_store_asin_category_id_info = df_store_asin_category_id_info.withColumn( "asin_percent", F.round(F.col("asin_count") / F.col("store_page20_asin_total_num"), 4)) df_store_category_id_agg = df_store_asin_category_id_info.groupby(['seller_id']).agg( F.collect_list( F.struct(F.col("category_id"), F.col("en_name"), F.col("asin_percent"), F.col("asin_count"))).alias( "category_value") ) df_store_category_id_agg = df_store_category_id_agg.withColumn("store_current_category_percent", F.to_json("category_value")) df_store_category_id_agg = df_store_category_id_agg.drop("category_value") df_store_asin_category_first_id_info = self.df_store_asin_detail.filter( "category_first_id is not null").groupby( ['seller_id', 'category_first_id']).agg( F.count("asin").alias("asin_count"), F.first("category_first_name").alias("en_name") ) df_store_asin_category_first_id_info = df_store_asin_category_first_id_info.join( df_store_seller_num_info, on=['seller_id'], how='left' ) df_store_asin_category_first_id_info = df_store_asin_category_first_id_info.withColumn( "asin_percent", F.round(F.col("asin_count") / F.col("store_page20_asin_total_num"), 4)) df_store_category_first_id_agg = df_store_asin_category_first_id_info.groupby(['seller_id']).agg( F.collect_list(F.struct(F.col("category_first_id"), F.col("en_name"), F.col("asin_percent"), F.col("asin_count"))).alias( "category_first_vale") ) df_store_category_first_id_agg = df_store_category_first_id_agg.withColumn("store_first_category_percent", F.to_json("category_first_vale")) df_store_category_first_id_agg = df_store_category_first_id_agg.drop("category_first_vale") self.df_user_store_collections_info = self.df_user_store_collections_info.join( df_store_category_id_agg, on=['seller_id'], how='left' ).join( df_store_category_first_id_agg, on=['seller_id'], how='left' ) self.df_user_store_collections_info = self.df_user_store_collections_info.withColumn( "store_new_flag", F.when(F.col("store_page20_new_asin_num_percent") >= 0.5, F.lit(1)).when( F.col("store_page20_old_asin_num_percent") >= 0.5, F.lit(2)).otherwise(F.lit(0))) def handle_data_complete(self): self.df_save = self.df_user_store_collections_info if self.run_type != 'real_time': self.df_save = self.df_save.withColumn("created_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \ withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')) self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) def handle_data(self): self.handle_asin_change() self.handle_data_group() self.handle_data_complete() def save_data(self): if self.run_type == 'real_time': engine = DBUtil.get_db_engine("postgresql", self.site_name) pg_con_info = DBUtil.get_connection_info("postgresql", self.site_name) export_tmp_tb = 'user_store_collections_info_tmp' export_tb = 'user_store_collections_info' sql = f""" truncate table {export_tmp_tb}; """ DBUtil.engine_exec_sql(engine, sql) connection_properties = { "user": pg_con_info["username"], "password": pg_con_info["pwd"], "driver": "org.postgresql.Driver" } pg_url = pg_con_info["url"] df_save = self.df_save df_save.write.jdbc(url=pg_url, table=export_tmp_tb, mode="overwrite", properties=connection_properties) after_sql = f""" insert into {export_tb}(seller_id, store_asin_total_num, store_page20_asin_total_num, store_page20_variant_rate, store_page20_new_asin_total_num, store_page20_new_asin_num_percent, store_page20_old_asin_total_num, store_page20_old_asin_num_percent, store_page20_asin_avg_price, store_page20_asin_avg_rating, store_page20_asin_avg_comments, store_page20_raise_asin_num, store_page20_raise_asin, store_page20_popular_asin_num, store_page20_popular_asin, store_page20_high_quantity_asin_num, store_page20_high_quantity_asin, store_page20_new_asin_sales_surge_num, store_page20_new_asin_sales_surge, store_page20_old_asin_sales_surge_num, store_page20_old_asin_sales_surge, store_location, store_crawl_time, store_first_category_percent, store_current_category_percent, store_label_type, store_name, store_page20_price_raise_asin_num, store_page20_price_raise_asin, store_page20_price_decline_asin_num, store_page20_price_decline_asin, store_page20_rank_raise_asin_num, store_page20_rank_raise_asin, store_page20_rank_decline_asin_num, store_page20_rank_decline_asin, store_new_flag) select seller_id, store_asin_total_num, store_page20_asin_total_num, store_page20_variant_rate, store_page20_new_asin_total_num, store_page20_new_asin_num_percent, store_page20_old_asin_total_num, store_page20_old_asin_num_percent, store_page20_asin_avg_price, store_page20_asin_avg_rating, store_page20_asin_avg_comments, store_page20_raise_asin_num, store_page20_raise_asin, store_page20_popular_asin_num, store_page20_popular_asin, store_page20_high_quantity_asin_num, store_page20_high_quantity_asin, store_page20_new_asin_sales_surge_num, store_page20_new_asin_sales_surge, store_page20_old_asin_sales_surge_num, store_page20_old_asin_sales_surge, store_location, store_crawl_time, store_first_category_percent, store_current_category_percent, store_label_type, store_name, store_page20_price_raise_asin_num, store_page20_price_raise_asin, store_page20_price_decline_asin_num, store_page20_price_decline_asin, store_page20_rank_raise_asin_num, store_page20_rank_raise_asin, store_page20_rank_decline_asin_num, store_page20_rank_decline_asin, store_new_flag from {export_tmp_tb} ON CONFLICT (seller_id) DO UPDATE SET store_asin_total_num = excluded.store_asin_total_num, store_page20_asin_total_num = excluded.store_page20_asin_total_num, store_page20_variant_rate = excluded.store_page20_variant_rate, store_page20_new_asin_total_num = excluded.store_page20_new_asin_total_num, store_page20_new_asin_num_percent = excluded.store_page20_new_asin_num_percent, store_page20_old_asin_total_num = excluded.store_page20_old_asin_total_num, store_page20_old_asin_num_percent = excluded.store_page20_old_asin_num_percent, store_page20_asin_avg_price = excluded.store_page20_asin_avg_price, store_page20_asin_avg_rating = excluded.store_page20_asin_avg_rating, store_page20_asin_avg_comments = excluded.store_page20_asin_avg_comments, store_page20_raise_asin_num = excluded.store_page20_raise_asin_num, store_page20_raise_asin = excluded.store_page20_raise_asin, store_page20_popular_asin_num = excluded.store_page20_popular_asin_num, store_page20_popular_asin = excluded.store_page20_popular_asin, store_page20_high_quantity_asin_num = excluded.store_page20_high_quantity_asin_num, store_page20_high_quantity_asin = excluded.store_page20_high_quantity_asin, store_page20_new_asin_sales_surge_num = excluded.store_page20_new_asin_sales_surge_num, store_page20_new_asin_sales_surge = excluded.store_page20_new_asin_sales_surge, store_page20_old_asin_sales_surge_num = excluded.store_page20_old_asin_sales_surge_num, store_page20_old_asin_sales_surge = excluded.store_page20_old_asin_sales_surge, store_location = excluded.store_location, store_crawl_time = excluded.store_crawl_time, store_first_category_percent = excluded.store_first_category_percent, store_current_category_percent = excluded.store_current_category_percent, store_label_type = excluded.store_label_type, store_name = excluded.store_name, store_page20_price_raise_asin_num = excluded.store_page20_price_raise_asin_num, store_page20_price_raise_asin = excluded.store_page20_price_raise_asin, store_page20_price_decline_asin_num = excluded.store_page20_price_decline_asin_num, store_page20_price_decline_asin = excluded.store_page20_price_decline_asin, store_page20_rank_raise_asin_num = excluded.store_page20_rank_raise_asin_num, store_page20_rank_raise_asin = excluded.store_page20_rank_raise_asin, store_page20_rank_decline_asin_num = excluded.store_page20_rank_decline_asin_num, store_page20_rank_decline_asin = excluded.store_page20_rank_decline_asin, store_new_flag = excluded.store_new_flag, created_time = now(), updated_time = now(); """ DBUtil.engine_exec_sql(engine, after_sql) else: Templates.save_data(self) def run(self): self.read_data() self.handle_data() self.save_data() if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 run_type = sys.argv[4] seller_id_tuple = sys.argv[5] assert site_name is not None, "site_name 不能为空!" assert date_type is not None, "date_type 不能为空!" assert date_info is not None, "date_info 不能为空!" obj = DwtUserStoreCollectionsInfo(site_name=site_name, date_type=date_type, date_info=date_info, run_type=run_type, seller_id_tuple=seller_id_tuple) obj.run()