import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates_test import Templates from pyspark.sql.types import StringType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F class DwtAbaYearWeek(Templates): def __init__(self, site_name='us', date_type="week", date_info='2022-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dwt_aba_year_week' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") # self.df_date = self.get_year_week_tuple() # pandas的df对象 self.df_st = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select * from {self.db_save} where site_name='{self.site_name}' limit 0") self.partitions_by = ['site_name'] self.reset_partitions(10) def read_data(self): sql = f"select search_term, st_search_sum, st_rank, st_bsr_cate_1_id, date_info from dim_st_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and " \ f"date_info between '2022-01' and '2022-52'" self.df_st = self.spark.sql(sql).cache() def handle_data(self): df_save = self.df_st.groupby(['search_term']).pivot('date_info').agg( F.max('st_rank'), F.max('st_search_sum'), F.max('st_bsr_cate_1_id'), ) print(df_save.columns) cols_list = df_save.columns for col in cols_list: if col != 'search_term': new_col = "st_" + col.replace('2022-0', '').replace('2022-', '').replace('max(st_', '').replace(')', '') print("new_col, col:", new_col, col) df_save = df_save.withColumnRenamed(col, new_col) df_save.show(10, truncate=False) print(df_save.columns) df_save = df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.unionByName(df_save, allowMissingColumns=True) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 handle_obj = DwtAbaYearWeek(site_name=site_name) handle_obj.run()