import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates_test import Templates from pyspark.sql.types import StringType, IntegerType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F class AmazonBsrSelection(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dwt_bsr_analytics_temp' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.get_date_info_tuple() def read_data(self): print("1.1 读取dim_st_asin_info表") sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}" print("sql:", sql) self.df_st_asin = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin.show(10, truncate=False) print("2.1 读取dim_st_detail和ods_brand_analytics表") sql = f"select search_term, st_rank, st_search_sum from dim_st_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info ='{self.date_info}';" print("sql:", sql) self.df_st = self.spark.sql(sqlQuery=sql).cache() self.df_st.show(10, truncate=False) print("3.1 读取dim_cal_asin_detail_history表") sql = f"select asin, asin_rank, bsr_cate_1_id, asin_title " \ f"from dim_cal_asin_history_detail where site_name='{self.site_name}';" self.df_asin = self.spark.sql(sql) self.df_asin = self.spark.sql(sqlQuery=sql).cache() print("4.1 读取dim_cal_asin_detail_history表") sql = f"select asin, cate_current_id, bsr_rank, rating, total_comments " \ f"from ods_bs_category_top100_asin where site_name='{self.site_name}';" self.df_asin_top100 = self.spark.sql(sql) self.df_asin_top100 = self.spark.sql(sqlQuery=sql).cache() print("5.1 读取ods_bs_category表") sql = f"select id as cate_current_id, one_category_id, en_name as cate_current_id_en_name " \ f"from ods_bs_category where site_name='{self.site_name}';" self.df_bs_category = self.spark.sql(sql) self.df_bs_category = self.spark.sql(sqlQuery=sql).cache() def handle_data(self): pass if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:day/week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1 handle_obj = AmazonBsrSelection(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()