import os import sys import re sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql import functions as F from pyspark.sql.window import Window from pyspark.sql.types import StructType, StructField, IntegerType, StringType class DwtTop100(Templates): def __init__(self, site_name='us', date_type="month", date_info='2024-01'): # super().__init__() super(DwtTop100, self).__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info # 初始化self.spark对 self.db_save = 'dwt_top100' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") print(f"参数信息: {self.site_name}, {self.date_type}, {self.date_info}") self.date_info = date_info self.df_save = self.spark.sql("select 1+1;") self.df_asin_measure = self.spark.sql("select 1+1;") self.df_asin_bs_info = self.spark.sql("select 1+1;") self.df_bs_category = self.spark.sql("select 1+1;") self.df_flow_asin = self.spark.sql("select 1+1;") def read_data(self): sql = f""" select asin, asin_type, bsr_orders, category_first_id, category_id, first_category_rank, current_category_rank, asin_price, asin_rating, asin_buy_box_seller_type, asin_is_new, asin_total_comments, asin_launch_time, asin_launch_time_type, asin_brand_name, is_brand_label, asin_bought_month as buy_data_bought_month from dwt_flow_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' """ print(f"1. 读取dwt_flow_asin数据: sql -- {sql}") self.df_flow_asin = self.spark.sql(sqlQuery=sql).cache() self.df_flow_asin.show(10, truncate=False) def read_data_old(self): sql = f"""SELECT asin, asin_bsr_orders from dwd_asin_measure where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}';""" # and date_info>='2023-15' -- and asin_bsr_orders >0 print(f"1. 读取dwd_asin_measure数据: sql -- {sql}") self.df_asin_measure = self.spark.sql(sqlQuery=sql).cache() self.df_asin_measure.show(10, truncate=False) sql = f"""SELECT asin, asin_bs_cate_1_id, asin_bs_cate_1_rank, asin_bs_cate_current_id from dim_asin_bs_info where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' and asin_bs_cate_1_id is not null;""" # and date_info>='2023-15' print(f"2. 读取dim_asin_bs_info数据: sql -- {sql}") self.df_asin_bs_info = self.spark.sql(sqlQuery=sql).cache() self.df_asin_bs_info.show(10, truncate=False) sql = f"""SELECT nodes_num, category_parent_id, category_id as asin_bs_cate_current_id, CASE WHEN redirect_first_id IS NOT NULL THEN redirect_first_id ELSE category_first_id END as asin_bs_cate_1_id FROM ods_bs_category WHERE site_name = '{self.site_name}';""" # and date_info>='2023-15' print(f"3. 读取ods_bs_category数据: sql -- {sql}") self.df_bs_category = self.spark.sql(sqlQuery=sql).cache() self.df_bs_category.show(10, truncate=False) sql = f"select asin, asin_type, asin_price, asin_rating, asin_buy_box_seller_type, asin_is_new, asin_total_comments, asin_launch_time, asin_launch_time_type from dim_asin_detail " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';" print(f"3. 读取dim_asin_detail数据: sql -- {sql}") self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache() self.df_asin_detail.show(10, truncate=False) def handle_data_old(self): self.df_save = self.df_asin_measure.join( self.df_asin_bs_info, on='asin', how='inner' ).join( self.df_bs_category, on=['asin_bs_cate_current_id', 'asin_bs_cate_1_id'] ).join( self.df_asin_detail, on=['asin'] ) self.df_save = self.df_save.drop_duplicates() self.df_save.show(10, truncate=False) def save_data(self): self.df_save = self.df_flow_asin self.df_save = self.df_save.toPandas() self.df_save.to_csv(f"/root/asin_bsr_{self.site_name}_{self.date_info}.csv", index=False) def save_data_old(self): self.df_save = self.df_save.toPandas() self.df_save.to_csv(f"/root/asin_bsr_{self.site_name}_{self.date_info}.csv", index=False) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 handle_obj = DwtTop100(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()